#4268 Ollama & Crew-AI AI-Agent Workflow

#4268 Ollama & Crew-AI AI-Agent Workflow

#4268 Provide a python application using Ollama supported Granite-4-h-small and Crew-AI to do the following: """1) Identify opportunities in industries and user groups to provide innovative solutions for new and improved systems, products, services and processes.

2) Rank the opportunities by feasibility, practicality, value add and "bang-for-the-buck".

3) Apply innovative thinking methods (any of approximately 30) to suggest new and improvements to systems, products, services and processes.

4) Rank the suggestions by feasibility, practicality, value add and "bang-for-the-buck".

5) Provide step-by-step instructions on how to implement the suggested improvements.

6) Provide an iterative prototype, test, evaluation and improve cycle for verifying and validating the implementation. 7) All steps should be saved to a database for follow-up."""

This application uses CrewAI to orchestrate a team of agents powered by IBM Granite 4 Small-h (32B/9B MoE) via Ollama.

The design features a structured pipeline where each step of your request is handled by a specialized agent, with the results ultimately saved to a SQLite database for persistence.

Prerequisites

  1. Install Ollama and pull the specific model: ollama pull granite4:small-h

  2. Install Python Dependencies: pip install crewai langchain_community sqlite3 pydantic


The Python Application (innovation_agent.py)

import sqlite3
import json
from datetime import datetime
from crewai import Agent, Task, Crew, Process, LLM
from pydantic import BaseModel, Field
from typing import List

# 1. Configuration: Connect CrewAI to Ollama (Granite 4 Small-h)
# Granite 4 Small-h is excellent for this because of its high 
# instruction-following scores (89.8% on IFEval).
granite_llm = LLM(
    model="ollama/granite4:small-h",
    base_url="http://localhost:11434"
)

# 2. Database Setup (Requirement #7)
def save_to_db(step_name, data):
    conn = sqlite3.connect('innovation_vault.db')
    cursor = conn.cursor()
    cursor.execute('''CREATE TABLE IF NOT EXISTS records 
                      (id INTEGER PRIMARY KEY, timestamp TEXT, step TEXT, content TEXT)''')
    cursor.execute("INSERT INTO records (timestamp, step, content) VALUES (?, ?, ?)",
                   (datetime.now().isoformat(), step_name, str(data)))
    conn.commit()
    conn.close()

# 3. Agent Definitions
ideator = Agent(
    role='Industry Visionary',
    goal='Identify high-impact opportunities for new systems and services.',
    backstory='Expert in market gap analysis and emerging technology trends.',
    llm=granite_llm,
    verbose=True
)

ranker = Agent(
    role='Strategic Analyst',
    goal='Rank opportunities and suggestions based on feasibility and value.',
    backstory='Specialist in ROI calculation and technical feasibility studies.',
    llm=granite_llm,
    verbose=True
)

innovator = Agent(
    role='Innovation Methodologist',
    goal='Apply formal innovation methods like TRIZ, SCAMPER, or Design Thinking.',
    backstory='Master of 30+ innovation frameworks to pivot and improve processes.',
    llm=granite_llm,
    verbose=True
)

architect = Agent(
    role='Implementation Lead',
    goal='Provide step-by-step technical guides and iterative testing cycles.',
    backstory='Expert in DevOps, Agile prototyping, and QA validation.',
    llm=granite_llm,
    verbose=True
)

# 4. Task Definitions (Requirements #1 - #6)
task1 = Task(
    description="Identify 3-5 opportunities in the Renewable Energy and Healthcare sectors for new systems.",
    agent=ideator,
    expected_output="A list of industry opportunities and specific user groups."
)

task2 = Task(
    description="Rank the identified opportunities by feasibility, practicality, value add, and 'bang-for-the-buck'.",
    agent=ranker,
    expected_output="A ranked table of opportunities with scoring."
)

task3 = Task(
    description="Apply the SCAMPER or TRIZ method to suggest specific improvements for the top-ranked opportunity.",
    agent=innovator,
    expected_output="Innovative suggestions for products or processes."
)

task4 = Task(
    description="Rank these innovation suggestions by practicality and value.",
    agent=ranker,
    expected_output="A ranked list of the best specific improvements."
)

task5 = Task(
    description="Provide a 10-step implementation guide for the #1 suggestion.",
    agent=architect,
    expected_output="Detailed step-by-step technical and operational instructions."
)

task6 = Task(
    description="Design an iterative 'Prototype-Test-Evaluate-Improve' cycle for this implementation.",
    agent=architect,
    expected_output="A detailed validation plan including KPIs and testing stages."
)

# 5. Execution & Persistence
innovation_crew = Crew(
    agents=[ideator, ranker, innovator, architect],
    tasks=[task1, task2, task3, task4, task5, task6],
    process=Process.sequential
)

print("### Starting Innovation Cycle...")
result = innovation_crew.kickoff()

# Save final consolidated report (Requirement #7)
save_to_db("Full Innovation Cycle", result.raw)

print("\n\n########################")
print("## INNOVATION REPORT ##")
print("########################\n")
print(result.raw)
print("\n[✔] Data saved to innovation_vault.db")

Why this setup works for your requirements:

  1. Model Choice (Granite-4-h-small): Unlike standard dense models, the Hybrid-MoE architecture of Granite 4 allows it to run a 32B-class reasoning engine on consumer hardware with significantly lower VRAM requirements. It excels at the "instruction following" needed for multi-step tasks.

  2. Sequential Processing: By using Process.sequential, we ensure that the Ranker (Step 2) actually has the data from the Ideator (Step 1) before it starts calculating feasibility.

  3. Persistence: The save_to_db function creates a local SQLite file. In a production environment, you would likely replace this with a PostgreSQL instance to allow your agents to "read" previous steps for future follow-up sessions.

  4. Specialization: * Agent 3 specifically uses your requirement for "30 innovation methods" (I've prompted it for TRIZ/SCAMPER).

    • Agent 4 performs the "bang-for-the-buck" analysis, which requires a separate logical pass to avoid "enthusiasm bias" from the creator agent.

Would you like me to refine the prompt for the "Innovation Methodologist" to include a specific list of the 30 methods you prefer?


Can I implement a list of agents and a list of tasks, each list element (agent or task) to be initiated by text from a file?

Yes, you can absolutely do this. In fact, this is a "best practice" for building scalable agent systems because it separates your logic (Python code) from your configuration (YAML or JSON files).

Using YAML is generally preferred over plain text because it allows you to define structured attributes (like role, goal, and backstory) clearly.

1. Create your configuration files

Create a folder named config/ and add these two files:

config/agents.yaml

industry_analyst:
  role: "Industry Visionary"
  goal: "Identify opportunities in {sector} for {target_group}."
  backstory: "Expert in market gap analysis and emerging technology trends."

strategic_ranker:
  role: "Strategic Analyst"
  goal: "Rank opportunities by feasibility and 'bang-for-the-buck'."
  backstory: "Specialist in ROI calculation and technical feasibility."

config/tasks.yaml

identify_opps:
  description: "Identify 3-5 opportunities in {sector} for {target_group}."
  expected_output: "A detailed list of industry opportunities."
  agent: "industry_analyst"

rank_opps:
  description: "Rank the previously identified opportunities based on ROI."
  expected_output: "A ranked table with scoring metrics."
  agent: "strategic_ranker"

2. The Python Implementation

This script reads the files, dynamically creates the objects, and maps the tasks to the correct agents using their keys.

import yaml
from crewai import Agent, Task, Crew, LLM

# 1. Setup Ollama / Granite
granite_llm = LLM(model="ollama/granite4:small-h", base_url="http://localhost:11434")

# 2. Helper to load YAML
def load_config(file_path):
    with open(file_path, 'r') as f:
        return yaml.safe_load(f)

agents_config = load_config('config/agents.yaml')
tasks_config = load_config('config/tasks.yaml')

# 3. Dynamically create Agents
agents_dict = {}
for agent_id, data in agents_config.items():
    agents_dict[agent_id] = Agent(
        role=data['role'],
        goal=data['goal'],
        backstory=data['backstory'],
        llm=granite_llm,
        verbose=True
    )

# 4. Dynamically create Tasks and link to Agents
tasks_list = []
for task_id, data in tasks_config.items():
    # We find the agent object using the string key defined in the YAML
    assigned_agent = agents_dict.get(data['agent'])
    
    tasks_list.append(Task(
        description=data['description'],
        expected_output=data['expected_output'],
        agent=assigned_agent
    ))

# 5. Kickoff the Crew
# You can pass inputs to fill the {brackets} in your YAML
crew = Crew(
    agents=list(agents_dict.values()),
    tasks=tasks_list
)

result = crew.kickoff(inputs={'sector': 'Renewable Energy', 'target_group': 'Small Farms'})
print(result)

Why this is better:

  • No Code Changes: If you want to change an agent's "personality" or add a new step to the innovation cycle, you just edit the YAML file. You don't touch the Python script.

  • Variable Injection: Notice the {sector} tags in the YAML. CrewAI's kickoff(inputs=...) automatically injects your real-time data into those placeholders.

  • Scalability: You can easily add all 30 innovation methods into the tasks.yaml file without making the Python file 500 lines long.

Would you like me to show you how to set up a "Watcher" script that automatically re-runs the agents whenever you save changes to your text files?


You said

I imagine adding two text inputs for agents.yaml and tasks.yaml file names would be useful as well

That is a smart architectural move. By parameterizing the filenames, you turn your script into a reusable engine that can run different "playbooks" (e.g., one for healthcare innovation, another for software debugging) just by pointing to different files.

Here is the updated script. I've added a simple CLI interface using input() so you can specify your source files at runtime, along with a bit of error handling to make sure the files actually exist.

The Updated Innovation Engine

import yaml
import os
import sqlite3
from datetime import datetime
from crewai import Agent, Task, Crew, LLM

# 1. Setup Local LLM (Granite-4-h-small via Ollama)
granite_llm = LLM(
    model="ollama/granite4:small-h", 
    base_url="http://localhost:11434"
)

# 2. Database Utility (Requirement #7)
def save_to_db(session_name, results):
    conn = sqlite3.connect('innovation_history.db')
    cursor = conn.cursor()
    cursor.execute('''CREATE TABLE IF NOT EXISTS logs 
                      (id INTEGER PRIMARY KEY, timestamp TEXT, session TEXT, output TEXT)''')
    cursor.execute("INSERT INTO logs (timestamp, session, output) VALUES (?, ?, ?)",
                   (datetime.now().isoformat(), session_name, str(results)))
    conn.commit()
    conn.close()

# 3. Dynamic Configuration Loader
def run_innovation_session():
    print("--- 🛠️ CrewAI Dynamic Config Loader ---")
    
    # User inputs for filenames
    agents_file = input("Enter agents filename (e.g., agents.yaml): ") or "agents.yaml"
    tasks_file = input("Enter tasks filename (e.g., tasks.yaml): ") or "tasks.yaml"

    if not os.path.exists(agents_file) or not os.path.exists(tasks_file):
        print("❌ Error: One or both configuration files not found.")
        return

    # Load YAML Data
    with open(agents_file, 'r') as f:
        agents_data = yaml.safe_load(f)
    with open(tasks_file, 'r') as f:
        tasks_data = yaml.safe_load(f)

    # 4. Instantiate Agents
    agents_dict = {}
    for agent_id, config in agents_data.items():
        agents_dict[agent_id] = Agent(
            role=config['role'],
            goal=config['goal'],
            backstory=config['backstory'],
            llm=granite_llm,
            verbose=True
        )

    # 5. Instantiate Tasks and Link to Agents
    tasks_list = []
    for task_id, config in tasks_data.items():
        # Retrieve the agent object based on the string key in the task YAML
        target_agent = agents_dict.get(config['agent'])
        
        if target_agent:
            tasks_list.append(Task(
                description=config['description'],
                expected_output=config['expected_output'],
                agent=target_agent
            ))
        else:
            print(f"⚠️ Warning: Agent '{config['agent']}' not found for task '{task_id}'")

    # 6. Execution
    print(f"\n🚀 Launching Crew with {len(agents_dict)} agents and {len(tasks_list)} tasks...")
    
    # We can still pass dynamic inputs here if your YAML uses {placeholders}
    crew = Crew(
        agents=list(agents_dict.values()),
        tasks=tasks_list,
        # Defaulting to sequential for complex step-by-step logic
    )

    result = crew.kickoff()

    # 7. Persistence
    save_to_db(f"{agents_file} + {tasks_file}", result.raw)
    print("\n✅ Session Complete. Results saved to SQLite.")
    print("\n--- FINAL OUTPUT ---\n")
    print(result.raw)

if __name__ == "__main__":
    run_innovation_session()

Key Features of This Version:

  • Runtime Flexibility: You can now keep a library of files like medical_agents.yaml, retail_tasks.yaml, and fintech_agents.yaml and swap them without stopping the program.

  • Validation: It checks if the files exist before trying to load them, preventing a Python crash.

  • Agent Mapping: The line agents_dict.get(config['agent']) is critical—it bridges the gap between the text-based name in your task file and the actual Python object created from your agent file.

  • Combined Database Logging: It logs which specific file combination produced the output, making it easier to audit your "innovation runs" later.

A Small Tip for Your YAML Structure

To make this work flawlessly, ensure the agent: field in your tasks.yaml matches the top-level key in your agents.yaml.

Example: If agents.yaml has:

innovation_specialist:  # This is the key
  role: "Methodologist"
  ...

Your tasks.yaml must use:

apply_triz:
  agent: "innovation_specialist" # Matches the key above
  ...

Would you like me to create a "Batch Mode" version that can loop through a list of file pairs and run them all automatically overnight?


Yes

To turn your application into an automated "Batch Mode" engine, we will use a CSV or JSON control file to define pairs of agent/task configurations. This allows the system to run sequentially through multiple "projects" while you're away.

1. Create a Control File (batch_list.json)

This file tells the script which combinations to run. You can add as many pairs as you like.

[
    {
        "session_id": "Healthcare_Innovation",
        "agents": "medical_agents.yaml",
        "tasks": "medical_tasks.yaml",
        "inputs": {"sector": "Telemedicine", "user_group": "Rural Elderly"}
    },
    {
        "session_id": "GreenTech_Cycle",
        "agents": "energy_agents.yaml",
        "tasks": "energy_tasks.yaml",
        "inputs": {"sector": "Solar Storage", "user_group": "Off-grid Homeowners"}
    }
]

2. The Batch Innovation Engine

This script loops through the JSON list, initializes the specific agents/tasks for each entry, and saves the unique results to your database.

import yaml
import json
import os
import sqlite3
from datetime import datetime
from crewai import Agent, Task, Crew, LLM

# Setup local Granite via Ollama
granite_llm = LLM(model="ollama/granite4:small-h", base_url="http://localhost:11434")

def save_to_db(session_id, agents_file, tasks_file, result):
    conn = sqlite3.connect('batch_innovation.db')
    cursor = conn.cursor()
    cursor.execute('''CREATE TABLE IF NOT EXISTS batch_logs 
                      (id INTEGER PRIMARY KEY, timestamp TEXT, session_id TEXT, 
                       config_used TEXT, output TEXT)''')
    
    config_info = f"Agents: {agents_file} | Tasks: {tasks_file}"
    cursor.execute("INSERT INTO batch_logs (timestamp, session_id, config_used, output) VALUES (?, ?, ?, ?)",
                   (datetime.now().isoformat(), session_id, config_info, str(result)))
    conn.commit()
    conn.close()

def run_batch(control_file):
    if not os.path.exists(control_file):
        print(f"❌ Control file {control_file} not found.")
        return

    with open(control_file, 'r') as f:
        batches = json.load(f)

    for job in batches:
        print(f"\n--- 🚀 Starting Batch: {job['session_id']} ---")
        
        # 1. Load specific YAMLs for this job
        with open(job['agents'], 'r') as f:
            agents_data = yaml.safe_load(f)
        with open(job['tasks'], 'r') as f:
            tasks_data = yaml.safe_load(f)

        # 2. Build Agents
        agents_map = {}
        for uid, cfg in agents_data.items():
            agents_map[uid] = Agent(
                role=cfg['role'],
                goal=cfg['goal'],
                backstory=cfg['backstory'],
                llm=granite_llm
            )

        # 3. Build Tasks
        tasks_list = []
        for uid, cfg in tasks_data.items():
            tasks_list.append(Task(
                description=cfg['description'],
                expected_output=cfg['expected_output'],
                agent=agents_map.get(cfg['agent'])
            ))

        # 4. Execute Crew
        crew = Crew(agents=list(agents_map.values()), tasks=tasks_list)
        result = crew.kickoff(inputs=job.get('inputs', {}))

        # 5. Persistent Logging
        save_to_db(job['session_id'], job['agents'], job['tasks'], result.raw)
        print(f"✅ Finished {job['session_id']}. Results saved.")

if __name__ == "__main__":
    # You can pass the control file name here
    run_batch("batch_list.json")

Why this Batch Mode is powerful:

  • Variable Injection: Note the inputs key in the JSON. This allows you to use the same tasks.yaml for different sectors (e.g., using {sector} as a variable in your task description).

  • Sequential Reliability: CrewAI waits for the first "Project" to fully finish (and save to the DB) before starting the next one. This prevents your local Ollama instance from being overwhelmed by multiple models running at once.

  • Audit Trail: Your SQLite database now acts as a professional ledger, recording exactly which configurations produced which innovation reports.

Pro-Tip for 2026:

Since you are using Granite-4-h-small, if you run this overnight, make sure your machine’s "Sleep" settings are disabled. Local LLM inference can take 5–15 minutes per complex "Crew" run depending on your GPU.

Would you like me to add a "Resume" feature that checks the database and skips projects that have already been completed in case of a power outage?


Yes

Adding a "Resume" feature is a common requirement for local LLM workflows, as large-scale agent runs can be interrupted by power flickers, system updates, or model crashes.

This updated script checks the batch_logs table before starting a project. If the session_id already exists, it skips it and moves to the next one in your batch_list.json.

Updated Batch Engine with Resume Logic

import yaml
import json
import os
import sqlite3
from datetime import datetime
from crewai import Agent, Task, Crew, LLM

# 1. Setup local Granite via Ollama
granite_llm = LLM(
    model="ollama/granite4:small-h", 
    base_url="http://localhost:11434"
)

# 2. Database with Resume Check
def get_db_connection():
    conn = sqlite3.connect('batch_innovation.db')
    cursor = conn.cursor()
    cursor.execute('''CREATE TABLE IF NOT EXISTS batch_logs 
                      (id INTEGER PRIMARY KEY, timestamp TEXT, session_id TEXT, 
                       config_used TEXT, output TEXT)''')
    conn.commit()
    return conn

def is_already_completed(session_id):
    conn = get_db_connection()
    cursor = conn.cursor()
    # Check if this specific session_id has an entry
    cursor.execute("SELECT 1 FROM batch_logs WHERE session_id = ? LIMIT 1", (session_id,))
    exists = cursor.fetchone() is not None
    conn.close()
    return exists

def save_to_db(session_id, agents_file, tasks_file, result):
    conn = get_db_connection()
    cursor = conn.cursor()
    config_info = f"Agents: {agents_file} | Tasks: {tasks_file}"
    cursor.execute("INSERT INTO batch_logs (timestamp, session_id, config_used, output) VALUES (?, ?, ?, ?)",
                   (datetime.now().isoformat(), session_id, config_info, str(result)))
    conn.commit()
    conn.close()

# 3. Main Batch Logic
def run_batch(control_file):
    if not os.path.exists(control_file):
        print(f"❌ Control file {control_file} not found.")
        return

    with open(control_file, 'r') as f:
        batches = json.load(f)

    print(f"--- 📂 Batch Queue Loaded: {len(batches)} projects found ---")

    for job in batches:
        session_id = job['session_id']

        # RESUME CHECK
        if is_already_completed(session_id):
            print(f"⏩ Skipping {session_id}: Already found in database.")
            continue

        print(f"\n--- 🚀 Starting Project: {session_id} ---")
        
        try:
            with open(job['agents'], 'r') as f:
                agents_data = yaml.safe_load(f)
            with open(job['tasks'], 'r') as f:
                tasks_data = yaml.safe_load(f)

            # Build Agents
            agents_map = {uid: Agent(
                role=cfg['role'], 
                goal=cfg['goal'], 
                backstory=cfg['backstory'], 
                llm=granite_llm
            ) for uid, cfg in agents_data.items()}

            # Build Tasks
            tasks_list = [Task(
                description=cfg['description'],
                expected_output=cfg['expected_output'],
                agent=agents_map.get(cfg['agent'])
            ) for uid, cfg in tasks_data.items()]

            # Execute
            crew = Crew(agents=list(agents_map.values()), tasks=tasks_list)
            result = crew.kickoff(inputs=job.get('inputs', {}))

            # Save
            save_to_db(session_id, job['agents'], job['tasks'], result.raw)
            print(f"✅ Successfully finished {session_id}.")

        except Exception as e:
            print(f"❌ Error during {session_id}: {str(e)}")
            # We don't save to DB here, so it will retry next time you run the script

if __name__ == "__main__":
    run_batch("batch_list.json")

How the Resume Feature works:

  1. Durable State: The is_already_completed function acts as a "gatekeeper." It runs a quick SQL query (SELECT 1) to see if the project name is already in your "finished" pile.

  2. Graceful Failures: If the LLM crashes or your internet cuts out during a task, the try/except block catches the error. Because save_to_db is only called after a successful kickoff(), that specific project remains "incomplete" in the database.

  3. Automatic Retry: The next time you run python script.py, the code will skip the successful ones and automatically pick up exactly where it left off.

Pro-Tip: "Clearing" the Queue

If you want to re-run a project (perhaps you improved your prompt in the YAML file), you don't need to change the code. Just open your database and delete the row for that session_id:

  • Quick Command (Terminal): sqlite3 batch_innovation.db "DELETE FROM batch_logs WHERE session_id='Healthcare_Innovation';"

Would you like me to show you how to add a Progress Bar or a Slack/Discord notification so it pings your phone when a batch finishes?


From <https://gemini.google.com/app/>  Google Gemini 3.0 (Fast)

Comments

Popular posts from this blog

#2892 Example of ChatGPT o1 Prompt

#3105 Federal and State Spending Idiocy