Academy
Updated on
Jul 14, 2025

A Guide to Building AI-Powered Multiagent Systems for Manufacturing

A hands-on guide to building multiagent AI systems for manufacturing supply chain automation covering agent design, orchestration, data pipelines, and real-world deployment

A Guide to Building  AI-Powered Multiagent Systems for Manufacturing
Ready to engage our team? Schedule a free consultation today.

Manufacturing is undergoing a seismic shift, AI-driven multiagent systems are at the heart of this transformation. By leveraging specialized, cooperating agents, modern manufacturing supply chains can achieve real-time decisioning, deep process insight, automated document enrichment, and on-demand upskilling of employees, all on a robust, cloud-hosted, and scalable stack.

This article provides a practical, technically deep guide to building such a multi-agent system using the latest open frameworks: PydanticAI for composable agent orchestration, ClickHouse for scalable tabular and vector data storage, and the Mistral Magistral Small LLM for advanced reasoning. 

We’ll ground this in a supply chain scenario, demonstrating four collaborating agents:

  1. Employee Training Agent (continuous upskilling and compliance checks)
  2. Supply Chain Analysis Agent (detects bottlenecks and optimizes flow)
  3. Invoice Enrichment Agent (auto-validates and tags supplier invoices)
  4. Quality Assurance Agent (detects anomalies in production and triggers alerts)

By the end of this article, you will have a scaffold by which to create multi-agent systems, especially in the manufacturing sector, where supply chain automation can dramatically improve productivity. 

Let’s dive in.




What is an AI Agent?

Before we begin, let’s take a quick look at what AI agents are. 

An AI agent is a self-contained, goal-driven computational entity that perceives its environment, processes observations, and autonomously executes actions to achieve specific objectives.

Technically, an AI agent is characterized by its policy, a mapping from perceptual inputs (which may include structured data, sensor readings, unstructured documents, or API responses) to actions or outputs. This mapping can be stateless (simple rules) or stateful, incorporating memory (episodic, semantic, or vector-based) to maintain context over time. 

Modern AI agents often combine symbolic reasoning (e.g., rules, logic), machine learning (pattern recognition, embeddings), and tool use (via APIs or plug-ins) to make complex, context-aware decisions.

What distinguishes advanced AI agents in contemporary systems is their ability to operate in dynamic, open-ended environments, integrating multiple AI capabilities, such as language understanding (LLMs), perception (CV, sensor fusion), and retrieval (vector search, knowledge graphs), under a unified control loop. 

Technically, agents interact through explicit interfaces, exchanging typed messages or function calls, and can be orchestrated into multiagent systems for collaborative or competitive problem-solving. 

Orchestration frameworks like PydanticAI formalize agent inputs/outputs using strongly typed models, enabling safe inter-agent communication and composability, while supporting external tool invocation, long-term memory, and hierarchical agent composition for complex, multi-stage tasks.

So, to summarize, AI agents encapsulate: 

  • Data Access: Agents interface with structured and unstructured data sources, SQL databases, APIs, IoT sensors, and document repositories, to ingest, contextualize, and act upon real-time information.
  • Tool Use: Beyond basic computation, agents dynamically invoke external tools and APIs (e.g., for retrieval, enrichment, or actuation), enabling them to extend their capabilities far beyond their internal logic.
  • Reasoning Engine: At their core, agents contain embedded or external reasoning modules, ranging from symbolic logic to deep learning models (such as LLMs or graph neural networks)m which process observations, synthesize insights, and generate actionable outputs.
  • Guardrails: Robust agents incorporate guardrails—type-validated interfaces, policy constraints, and safety checks, to ensure reliable, secure, and domain-compliant behavior, especially in mission-critical applications like manufacturing.
  • Memory and State: Effective agents maintain both short-term and long-term memory, enabling them to reference past interactions, learn from experience, and optimize future actions in dynamic environments.
  • Communication Interface: Agents expose explicit, structured interfaces for safe and reliable communication with other agents or orchestrators, supporting both direct messaging and function call-based workflows.

By combining these components, AI agents become modular, composable units of intelligence that can be orchestrated into powerful multiagent systems, each agent specializing in distinct tasks, sharing context, and collaborating to solve complex, end-to-end business problems in manufacturing and beyond.

Let’s now look at how multiagent systems work. 




Why Multiagent Systems? 

If you are able to build capable agents, you can orchestrate them to create multiagent systems that complete entire tasks on their own. This can be invaluable in scenarios where you have a lot of legacy data and systems that have been a challenge to modernize.

In domains like manufacturing, where workflows are heterogeneous (from HR to logistics to QA), a multiagent setup allows:

  • Task Specialization: Each agent encapsulates domain logic and memory.
  • Parallel Processing: Independent agents can process data in parallel (critical for throughput).
  • Orchestrated Reasoning: Orchestrators (via PydanticAI) route queries, persist shared context, and enable coordination.
  • Incremental Modernization: New agents can be introduced without disrupting legacy flows.

With this workflow, you can scale the number of agents incrementally without disrupting existing processes. 

Let’s take a simple example. Most manufacturing companies have manuals that workers are supposed to follow for each process step, whether it’s machine calibration, safety procedures, or quality checks. In reality, those manuals are often scattered across PDFs, paper binders, or outdated intranet systems. 

By deploying a multiagent system, you can assign one agent to extract and structure the content from these manuals, another to cross-reference them with live sensor data, and a third to answer real-time worker queries (or help train workers) or flag compliance violations as they happen. All of this occurs autonomously, without you having to manually orchestrate the flow.

What’s powerful here is that you don’t have to overhaul your entire tech stack at once. You can start with one or two specialized agents, perhaps an invoice validation agent and a training agent, and as your needs evolve, incrementally add new agents for analytics, quality assurance, predictive maintenance, or supply chain optimization. 

In short, multiagent systems let you break down complex, end-to-end workflows into composable, intelligent building blocks, making your entire manufacturing operation more adaptive, resilient, and future-ready.




Technology Stack for Building Multiagent Systems

There are several agent orchestration frameworks available today, including LangChain, Microsoft’s Autogen, OpenAI Agents SDK, and CrewAI. While these frameworks can greatly simplify agent orchestration, you may find that too much “framework magic” can make it difficult to debug and deeply understand your agentic workflows. If your goal is high accuracy and quality, it often pays to keep the orchestration layer as thin and transparent as possible. By doing so, you can focus your efforts on optimizing the underlying data architecture and refining your prompt strategies, ensuring that you extract the best possible performance and reliability from your multiagent system.

PydanticAI for Agent Orchestration

In this article, we’ll use PydanticAI as the orchestration layer. PydanticAI serves as a foundational building block for many agentic frameworks, providing type-safe, modular orchestration without unnecessary complexity. 

  • Structured inputs/outputs (via pydantic models).
  • Inter-agent message passing (enables chain-of-thought and collaboration).
  • Plug-and-play with any LLM/tool backend.

ClickHouse for Tabular and Vector Search

For the data layer, we’ll leverage ClickHouse, a high-performance, cloud-native columnar database capable of handling massive-scale SQL queries and efficient vector search. Since many manufacturing environments already rely on SQL databases, ClickHouse offers a seamless way to scale data retrieval and analytics, letting you modernize your workflows without extensive ETL or replatforming efforts. Also, ClickHouse is already used for analytics in many of the modern manufacturing companies for analytics. 

  • Tabular: High-throughput ingestion for events, logs, invoices, training history.
  • Vector Search: Use ClickHouse Vector Search for semantic search across documents, training modules, supplier data, etc.

Magistral Small 

Magistral Small is Mistral AI’s compact, high-performance reasoning model, a 24 billion‑parameter LLM designed for transparent, chain‑of‑thought reasoning and multilingual understanding. It’s an open‑weight model released under the Apache 2.0 license, allowing full commercial and non‑commercial use while supporting deployability on local hardware, including a single RTX 4090 GPU or a 32 GB‑RAM MacBook once quantized. Fine‑tuned using supervised traces from the larger Magistral Medium and reinforced via RL, it excels at multi‑step logical tasks, providing detailed intermediate reasoning steps rather than opaque answers. 

 So, here’s our stack: 

Layer Tech Role
Agent Framework PydanticAI Agent definition & orchestration
Reasoning LLM Magistral Small (Mistral) Reasoning, language understanding
Data Layer ClickHouse Tabular + vector data (via vector engine)
Hosting/Infra Cloud-native (K8s/VM) Scale, resilience, secure access



Agent Design: Roles and Responsibilities

Let’s now break down the agents we will create. We’ll create three of them in this guide. 

1. Employee Training Agent

  • Queries training gaps based on compliance rules.
  • Generates on-demand microlearning content.
  • Tracks completion and provides feedback.

2. Supply Chain Analysis Agent

  • Ingests real-time supply and inventory data.
  • Detects anomalies, forecasts bottlenecks.
  • Recommends dynamic reordering, alternate routing.

3. Invoice Enrichment Agent

  • Parses invoices, validates line items.
  • Extracts entities, cross-checks with POs.
  • Tags/flags discrepancies and syncs with ERP.

Let’s get on with the implementation of the agent now. 




Prerequisites

1- Install ClickHouse

First, we need to install ClickHouse and insert some data. On Ubuntu-based systems:

# Add Yandex APT repo and key
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4
echo "deb https://packages.clickhouse.com/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list

# Install ClickHouse server + client
sudo apt update
sudo apt install clickhouse-server clickhouse-client

# Start the service
sudo systemctl enable --now clickhouse-server

These steps configure and launch the ClickHouse service and client tools

2- Create database and tables

Open the ClickHouse client:

clickhouse-client --user default --password



Then create a table to store the training manual chunks (we will use this to store the chunks from various training manuals): 

CREATE TABLE training_manual_chunks (
    chunk_id        UUID,
    manual_id       String,
    manual_title    String,
    section         String,
    page_number     UInt16,
    content         String,
    embedding       Array(Float32),
    upload_date     DateTime
) ENGINE = MergeTree()
ORDER BY (manual_id, page_number, chunk_id);



Let’s also create the table for the supply chain events: 

CREATE TABLE supply_chain_events (
    event_id             UUID,
    timestamp            DateTime,
    product_id           String,
    product_name         String,
    location             String,
    inventory_level      UInt32,
    reorder_threshold    UInt32,
    anomaly_score        Nullable(Float32),
    forecasted_shortage  Nullable(UInt8),    -- 1 = yes, 0 = no
    recommended_action   Nullable(String),   -- e.g., "Reorder", "Redirect"
    vector               Array(Float32)      -- Embedding for similarity search
) ENGINE = MergeTree()
ORDER BY (product_id, timestamp);

Finally, let’s create the invoice enrichment agent table: 

CREATE TABLE invoice_data (
    invoice_id              String,
    invoice_date            Date,
    supplier_id             String,
    supplier_name           String,
    po_id                   String,           -- Linked Purchase Order
    total_amount            Float32,
    currency                String,
    line_item_count         UInt16,
    line_items_json         String,           -- JSON array with detailed line items
    validated               UInt8,            -- 1 = valid, 0 = flagged
    flagged_fields          Nullable(String), -- Comma-separated list of flagged items
    extracted_entities      Nullable(String), -- Key entities extracted (e.g., tax IDs)
    enrichment_notes        Nullable(String), -- Human/AI feedback or auto-tags
    erp_sync_status         String,           -- e.g., "pending", "synced", "error"
    last_checked            DateTime
) ENGINE = MergeTree()
ORDER BY (invoice_id, invoice_date);



3- Install Python dependencies

Now create a Python virtual environment, and install these:

pip install clickhouse-connect pydantic-ai openai sentence-transformers

4- Get an API Key from OpenRouter

To interact with models like Magistral Small through OpenRouter, you’ll need an API key. Get that from OpenRouter platform, and then: 

export OPENROUTER_API_KEY='your_generated_api_key_here'

Next, let’s ingest some data into these tables. 




Data Ingestion 

During data ingestion, we need to pick the key columns (or text from the training manual) and create vector embeddings - and insert that with the data. Here’s how you can do it for the training manuals: 

Step 1: Extract Text from Training Manuals (PDFs)

You can use Python libraries like PyMuPDF (fitz) or pdfplumber to extract text from each page or section. In a complex manual, however, you should use VLMs (Vision Language Models) to parse the data. In this tutorial, we will use fitz:

import fitz  # PyMuPDF


def extract_chunks_from_pdf(pdf_path, chunk_size=300):
    chunks = []
    try:
        doc = fitz.open(pdf_path)
        for page_num in range(len(doc)):
            page = doc[page_num]
            text = page.get_text()
            if not text:
                continue
            for start in range(0, len(text), chunk_size):
                chunks.append({
                    "page_number": page_num + 1,
                    "content": text[start:start+chunk_size]
                })
        return chunks
    except Exception as e:
        print(f"Error: Could not read the PDF file '{pdf_path}'. Please ensure it exists and is not corrupted.")
        print(f"Details: {e}")
        return None

Step 2: Generate Vector Embeddings for Each Chunk

Use a sentence embedding model (e.g., from sentence-transformers):

from sentence_transformers import SentenceTransformer


# Load the pre-trained model
model = SentenceTransformer('all-MiniLM-L6-v2')


def get_embedding(text):
    """Generates a vector embedding for the given text."""
    # The .tolist() converts the NumPy array to a standard Python list
    return model.encode(text).tolist()



Step 3: Insert Chunks and Embeddings into ClickHouse

Use the clickhouse-connect Python client to insert enriched data:

import uuid
import datetime
import sys
from clickhouse_connect import get_client


# Establish connection to the database
client = get_client(host='localhost', database='default')


def insert_chunks_to_clickhouse(pdf_path, manual_id, manual_title):
    print(f"Attempting to extract chunks from '{pdf_path}'...")
    chunks = extract_chunks_from_pdf(pdf_path)


    if chunks is None:
        print("Halting ingestion due to file error.")
        sys.exit(1)


    if not chunks:
        print("Warning: No text could be extracted from the PDF. Nothing to insert.")
        return


    print(f"Extracted {len(chunks)} chunks. Now building SQL command...")


    # Start the SQL command. We specify the columns to ensure correct order.
    sql_command_start = 'INSERT INTO training_manual_chunks (chunk_id, manual_id, manual_title, section, page_number, content, embedding, upload_date) VALUES '
   
    list_of_value_strings = []
    for chunk in chunks:
        # Escape single quotes in any text fields
        safe_manual_title = manual_title.replace("'", "''")
        safe_content = chunk['content'].replace("'", "''").replace("\\", "\\\\")


        # Create a string for each value in the row
        value_string = f"('{uuid.uuid4()}', '{manual_id}', '{safe_manual_title}', 'N/A', {chunk['page_number']}, '{safe_content}', {get_embedding(chunk['content'])}, '{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}')"
        list_of_value_strings.append(value_string)


    # Join all the value strings together, separated by commas
    full_sql_command = sql_command_start + ',\n'.join(list_of_value_strings)


    try:
        print("Executing final insert command...")
        # Execute the single, complete SQL string. This has no extra arguments.
        client.command(full_sql_command)
        print(f"Successfully inserted {len(list_of_value_strings)} chunks for {manual_title}")
    except Exception as e:
        print("\n--- An error occurred during the database insert ---")
        print(f"Error details: {e}")

Best Practices

  • Chunk size: Choose chunk sizes (by characters or sentences) that balance semantic coherence with embedding efficiency, typically 200–500 words per chunk.
  • Deduplication: Ensure you don’t insert duplicate chunks during re-ingestion or manual updates.
  • Metadata: Store as much metadata as possible (manual title, section, page number) to aid downstream retrieval and traceability.
  • Embeddings Consistency: Always use the same embedding model for both ingestion and query-time search to ensure cosine similarity calculations are meaningful.

Let’s now also ingest some data into the supply_chain_events and invoice_data tables. 

Step 4: Insert data into supply_chain_events

You can use a Python script like this to insert data into the supply_chain_events table. 

import uuid
import datetime
from clickhouse_connect import get_client
from sentence_transformers import SentenceTransformer
import sys


# --- Client and Model Setup ---
client = get_client(host='localhost', database='default')
embedder = SentenceTransformer('all-MiniLM-L6-v2')


# --- Sample Data ---
supply_chain_events = [
    {"product_id": "prod_001", "product_name": "Hydraulic Pump", "location": "plant_1_warehouse", "inventory_level": 58, "reorder_threshold": 60, "anomaly_score": 0.0, "forecasted_shortage": 0, "recommended_action": None},
    {"product_id": "prod_002", "product_name": "Valve Assembly", "location": "plant_2_shopfloor", "inventory_level": 200, "reorder_threshold": 150, "anomaly_score": 0.0, "forecasted_shortage": 0, "recommended_action": None},
]


print(f"Preparing to insert {len(supply_chain_events)} supply chain events...")


# --- SQL Command Preparation ---
sql_command_start = 'INSERT INTO supply_chain_events (event_id, timestamp, product_id, product_name, location, inventory_level, reorder_threshold, anomaly_score, forecasted_shortage, recommended_action, vector) VALUES '


list_of_value_strings = []
for event in supply_chain_events:
    # Generate an embedding for a descriptive string of the event
    event_description = f"{event['product_name']} at {event['location']}, Inventory: {event['inventory_level']}"
    vector = embedder.encode(event_description).tolist()


    # Escape single quotes and handle potential None values for SQL insertion
    safe_product_id = event['product_id'].replace("'", "''")
    safe_product_name = event['product_name'].replace("'", "''")
    safe_location = event['location'].replace("'", "''")
   
    if event['recommended_action'] is None:
        recommended_action_sql = 'NULL'
    else:
        safe_recommended_action = event['recommended_action'].replace("'", "''")
        recommended_action_sql = f"'{safe_recommended_action}'"


    # Construct the VALUES string for the current row
    value_string = (
        f"('{uuid.uuid4()}', "
        f"'{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}', "
        f"'{safe_product_id}', "
        f"'{safe_product_name}', "
        f"'{safe_location}', "
        f"{event['inventory_level']}, "
        f"{event['reorder_threshold']}, "
        f"{event['anomaly_score']}, "
        f"{event['forecasted_shortage']}, "
        f"{recommended_action_sql}, "
        f"{vector}"
        f")"
    )
    list_of_value_strings.append(value_string)


# Combine the start of the command with all the value strings
full_sql_command = sql_command_start + ',\n'.join(list_of_value_strings)


# --- Database Execution ---
try:
    print("Executing insert command for supply chain events...")
    client.command(full_sql_command)
    print(f"Successfully inserted {len(list_of_value_strings)} supply chain events!")
except Exception as e:
    print("\n--- An error occurred during the database insert ---")
    print(f"Error details: {e}")
    sys.exit(1)

In actual production setup, your embedding text will be based on the underlying data structure. For some of the fields, you can also use sparse neural retrieval (like the product_name). 

Step 5: Insert sample data into invoice_data

Finally, let’s insert some data into the invoice_data table - which the agent will enrich. 

import datetime
import json
from clickhouse_connect import get_client
import sys


# --- Client Setup ---
client = get_client(host='localhost', database='default')


# --- Sample Data ---
sample_invoices = [
    {
        "invoice_id": "INV-2025-010", "invoice_date": datetime.date(2025, 7, 10), "supplier_id": "SUP-2001", "supplier_name": "Precision Bearings Co.", "po_id": "PO-8501", "total_amount": 18500.00, "currency": "USD", "line_item_count": 2,
        "line_items_json": json.dumps([{"item": "Ball Bearing", "qty": 50, "unit_price": 350}, {"item": "Seal", "qty": 100, "unit_price": 10}]),
        "validated": False, "flagged_fields": None, "extracted_entities": None, "enrichment_notes": None, "erp_sync_status": "pending", "last_checked": datetime.datetime.now()
    },
    {
        "invoice_id": "INV-2025-011", "invoice_date": datetime.date(2025, 7, 11), "supplier_id": "SUP-2002", "supplier_name": "Hydro Lubes Pvt Ltd", "po_id": "PO-8502", "total_amount": 9200.00, "currency": "USD", "line_item_count": 3,
        "line_items_json": json.dumps([{"item": "Hydraulic Oil", "qty": 20, "unit_price": 350}, {"item": "Coolant", "qty": 10, "unit_price": 130}, {"item": "Filter", "qty": 5, "unit_price": 200}]),
        "validated": False, "flagged_fields": None, "extracted_entities": None, "enrichment_notes": None, "erp_sync_status": "pending", "last_checked": datetime.datetime.now()
    },
    # ... (additional invoices) ...
]


print(f"Preparing to insert {len(sample_invoices)} invoices...")


# --- SQL Command Preparation ---
sql_command_start = (
    'INSERT INTO invoice_data '
    '(invoice_id, invoice_date, supplier_id, supplier_name, po_id, total_amount, currency, line_item_count, line_items_json, validated, flagged_fields, extracted_entities, enrichment_notes, erp_sync_status, last_checked) VALUES '
)


list_of_value_strings = []
for inv in sample_invoices:
    # Escape single quotes for safe SQL insertion
    safe_invoice_id = inv['invoice_id'].replace("'", "''")
    safe_supplier_name = inv['supplier_name'].replace("'", "''")
    safe_po_id = inv['po_id'].replace("'", "''")
    safe_line_items_json = inv['line_items_json'].replace("'", "''")
   
    # Handle fields that can be None, formatting them as SQL NULL
    flagged_fields_sql = f"'{inv['flagged_fields']}'" if inv['flagged_fields'] is not None else 'NULL'
    extracted_entities_sql = f"'{inv['extracted_entities']}'" if inv['extracted_entities'] is not None else 'NULL'
    enrichment_notes_sql = f"'{inv['enrichment_notes']}'" if inv['enrichment_notes'] is not None else 'NULL'


    # Format dates and datetimes to strings
    invoice_date_sql = inv['invoice_date'].strftime('%Y-%m-%d')
    last_checked_sql = inv['last_checked'].strftime('%Y-%m-%d %H:%M:%S')


    # Construct the VALUES string for the current row
    value_string = (
        f"('{safe_invoice_id}', '{invoice_date_sql}', '{inv['supplier_id']}', '{safe_supplier_name}', '{safe_po_id}', "
        f"{inv['total_amount']}, '{inv['currency']}', {inv['line_item_count']}, '{safe_line_items_json}', {inv['validated']}, "
        f"{flagged_fields_sql}, {extracted_entities_sql}, {enrichment_notes_sql}, '{inv['erp_sync_status']}', '{last_checked_sql}')"
    )
    list_of_value_strings.append(value_string)


# Combine the start of the command with all the value strings
full_sql_command = sql_command_start + ',\n'.join(list_of_value_strings)


# --- Database Execution ---
try:
    print("Executing insert command for invoices...")
    client.command(full_sql_command)
    print(f"Successfully inserted {len(list_of_value_strings)} invoices!")
except Exception as e:
    print("\n--- An error occurred during the database insert ---")
    print(f"Error details: {e}")
    sys.exit(1)

Now that we have the data ingestion sorted, let’s start building the agents. 




Building the Multiagent System

Now that you’ve set up your data and ingested realistic records into ClickHouse, you’re ready to start building the core agents that will power your manufacturing automation system. Each agent will encapsulate a specific workflow, connecting to both your data layer and language model for reasoning, enrichment, and decision support.

In the following sections, you’ll build three agents, Employee Training Agent, Supply Chain Analysis Agent, and Invoice Enrichment Agent, using PydanticAI as your orchestration framework. Each agent will expose clearly typed interfaces for inputs and outputs, leverage the appropriate tables in ClickHouse, and, where needed, call the Magistral Small LLM for deep semantic reasoning or content generation.

Let’s begin by designing the core structure of these agents with PydanticAI, starting with the Employee Training Agent.

1- Employee Training Agent

Here’s the workflow that we will use: 

Agent Workflow

  1. Gap Analysis:
    The agent queries the employee_training table to identify incomplete or upcoming training modules for a given employee.
  2. Semantic Retrieval:
    For content delivery or answering questions, the agent retrieves relevant training manual chunks from the training_manual_chunks table using vector similarity search.
  3. LLM-Powered Interaction:
    Uses Magistral Small to generate microlearning content or answer specific training-related questions, grounded in retrieved manual content.
  4. Progress Tracking:
    Updates the training completion status and collects employee feedback.

To start, let’s define Pydantic models for type safety: 

from pydantic import BaseModel
from typing import List, Optional

class TrainingGapQuery(BaseModel):
    employee_id: str

class TrainingGapResponse(BaseModel):
    required_courses: List[str]
    next_due: Optional[str]

class TrainingQAQuery(BaseModel):
    employee_id: str
    question: str

class TrainingQAResponse(BaseModel):
    answer: str
    source_chunks: List[str]

class CompletionFeedback(BaseModel):
    employee_id: str
    course_id: str
    feedback: Optional[str]
    score: Optional[float]

Now - let’s create an agent function to find training gaps and answer training questions: 

from pydantic_ai.agent import Agent, agent_function
from clickhouse_connect import get_client

client = get_client(host='localhost', database='default')

class EmployeeTrainingAgent(Agent):

    @agent_function
    def find_training_gaps(self, query: TrainingGapQuery) -> TrainingGapResponse:
        # Query ClickHouse for incomplete trainings
        sql = f"""
            SELECT course_id, course_due_date 
            FROM employee_training 
            WHERE employee_id = '{query.employee_id}' AND course_completed = 0
            ORDER BY course_due_date ASC
        """
        result = client.query(sql)
        required_courses = [row['course_id'] for row in result.result_rows]
        next_due = result.result_rows[0]['course_due_date'] if result.result_rows else None
        return TrainingGapResponse(required_courses=required_courses, next_due=next_due)

    @agent_function
    def answer_training_question(self, query: TrainingQAQuery) -> TrainingQAResponse:
        # Embed the question and find relevant manual chunks
        question_embedding = get_embedding(query.question)  # Use your embedding model/API here
        sql = f"""
            SELECT content 
            FROM training_manual_chunks 
            ORDER BY cosineDistance(embedding, {question_embedding}) 
            LIMIT 3
        """
        result = client.query(sql)
        source_chunks = [row['content'] for row in result.result_rows]
        # Compose the answer with Magistral Small (LLM)
        context = "\n".join(source_chunks)
        answer = magistral_small_llm.generate_answer(query.question, context)
        return TrainingQAResponse(answer=answer, source_chunks=source_chunks)

    @agent_function
    def record_completion_feedback(self, feedback: CompletionFeedback):
        # Update feedback and score for completed training
        sql = f"""
            ALTER TABLE employee_training
            UPDATE feedback_score = {feedback.score}, completion_date = now()
            WHERE employee_id = '{feedback.employee_id}' AND course_id = '{feedback.course_id}'
        """
        client.command(sql)

Key Points

  • Grounded QA: Answers to employee queries are always traceable to specific training manual content.
  • Flexible: You can extend this agent to schedule reminders, recommend next courses, or escalate gaps.
  • Composability: The agent can be used directly, or called by a master orchestrator agent in a larger multiagent system.

2- Supply Chain Events Agents

The Supply Chain Events Agent continuously monitors inventory, detects anomalies, forecasts bottlenecks, and recommends actions such as reordering or rerouting. It leverages ClickHouse for storing and analyzing high-frequency supply chain data, and uses vector embeddings for semantic pattern detection and retrieval. When needed, it can call on Magistral Small for complex reasoning or generating summary reports.

Agent Workflow

  1. Real-Time Event Ingestion:
    The agent ingests and logs new supply chain events, including inventory levels and locations.
  2. Anomaly Detection:
    It computes anomaly scores and flags events where inventory drops below reorder thresholds or where statistical outliers are observed.
  3. Bottleneck Forecasting:
    By analyzing recent trends, the agent predicts potential shortages and recommends preventive actions.
  4. Semantic Event Search:
    Uses vector search to find historical events similar to current situations for root cause analysis.
  5. LLM-Driven Summarization (optional):
    Generates human-readable summaries or decision-support reports using the LLM.

Let’s define the Pydantic schemas: 

from pydantic import BaseModel
from typing import List, Optional
class SupplyChainQuery(BaseModel):
    product_id: str
    lookback_days: int = 7
class SupplyChainAnomalyResponse(BaseModel):
    bottleneck_locations: List[str]
    risk_score: float
    flagged_events: List[str]
    recommendations: Optional[List[str]]

Now let’s implement the agent logic: 

from pydantic_ai.agent import Agent, agent_function
from clickhouse_connect import get_client

client = get_client(host='localhost', database='default')

class SupplyChainAgent(Agent):

    @agent_function
    def analyze_supply_chain(self, query: SupplyChainQuery) -> SupplyChainAnomalyResponse:
        # 1. Fetch recent events for product
        sql = f"""
            SELECT timestamp, location, inventory_level, reorder_threshold, anomaly_score
            FROM supply_chain_events
            WHERE product_id = '{query.product_id}' AND timestamp >= today() - INTERVAL {query.lookback_days} DAY
            ORDER BY timestamp DESC
        """
        result = client.query(sql)
        events = result.result_rows

        # 2. Flag low inventory locations and calculate average risk
        bottleneck_locations = []
        risk_sum = 0.0
        flagged_events = []
        recommendations = []
        count = 0

        for row in events:
            count += 1
            risk_sum += row['anomaly_score'] or 0
            if row['inventory_level'] < row['reorder_threshold']:
                bottleneck_locations.append(row['location'])
                flagged_events.append(f"{row['timestamp']} at {row['location']}")
                recommendations.append(f"Reorder at {row['location']}")

        risk_score = (risk_sum / count) if count else 0.0

        return SupplyChainAnomalyResponse(
            bottleneck_locations=bottleneck_locations,
            risk_score=round(risk_score, 2),
            flagged_events=flagged_events,
            recommendations=recommendations if recommendations else None
        )

We can also create an agent function to analyze current events and compare them to historical events: 

@agent_function
def find_similar_events(self, event_embedding: list, top_n: int = 3):
    sql = f"""
        SELECT timestamp, location, inventory_level
        FROM supply_chain_events
        ORDER BY cosineDistance(vector, {event_embedding})
        LIMIT {top_n}
    """
    result = client.query(sql)
    return result.result_rows

Key Points

  • Real-time detection and recommendation: The agent flags at-risk inventory before it causes bottlenecks.
  • Flexible querying: You can slice data by product, time, or even location.
  • Extendibility: Integrate with LLMs to generate automated reports, escalation emails, or predictive analytics.

3- Invoice Enrichment Agent

The Invoice Enrichment Agent automates the process of validating, tagging, and augmenting incoming invoices—minimizing manual effort, reducing errors, and accelerating accounts processing. It reads raw invoice data from ClickHouse, cross-references purchase orders (POs), parses and extracts key entities, flags anomalies, and writes enrichment results back for audit and downstream ERP integration.

Agent Workflow

  1. Invoice Retrieval:
    The agent pulls unprocessed or newly received invoices from the invoice_data table.
  2. Entity Extraction:
    It parses line items and metadata, using an LLM (Magistral Small or similar) for semantic entity extraction (such as tax IDs, payment terms, or vendor info).
  3. Validation and Cross-Checks:
    The agent compares invoice fields and line items against expected values or linked POs. Discrepancies, duplicates, or outliers are flagged for review.
  4. Anomaly Detection and Tagging:
    It uses rules, embeddings, or LLM-based classification to highlight potential errors, fraud, or compliance risks.
  5. Enrichment and Audit Trail:
    The agent writes results—such as validation status, extracted entities, flagged fields, and notes—back to ClickHouse, ensuring a traceable enrichment workflow.

First, let’s create the schemas:

from pydantic import BaseModel
from typing import Optional, List
class InvoiceEnrichmentQuery(BaseModel):
    invoice_id: str
class InvoiceEnrichmentResult(BaseModel):
    invoice_id: str
    validated: Optional[bool]
    flagged_fields: Optional[List[str]]
    extracted_entities: Optional[dict]
    enrichment_notes: Optional[str]

Next, let’s implement the agent logic: 

from pydantic_ai.agent import Agent, agent_function
from clickhouse_connect import get_client
import json

client = get_client(host='localhost', database='default')

class InvoiceEnrichmentAgent(Agent):

    @agent_function
    def enrich_invoice(self, query: InvoiceEnrichmentQuery) -> InvoiceEnrichmentResult:
        # 1. Fetch raw invoice
        sql = f"""
            SELECT * FROM invoice_data
            WHERE invoice_id = '{query.invoice_id}'
            LIMIT 1
        """
        result = client.query(sql)
        if not result.result_rows:
            return InvoiceEnrichmentResult(invoice_id=query.invoice_id)

        invoice = result.result_rows[0]

        # 2. Parse and extract entities
        line_items = json.loads(invoice['line_items_json'])
        entities = {
            "supplier_name": invoice['supplier_name'],
            "total_items": invoice['line_item_count'],
            "po_id": invoice['po_id'],
            # You could add LLM-based extraction for tax ID, terms, etc.
        }

        # 3. Validation example: check amounts match line items
        calculated_total = sum(item["qty"] * item["unit_price"] for item in line_items)
        flagged_fields = []
        if abs(calculated_total - invoice['total_amount']) > 0.01:
            flagged_fields.append("total_amount")

        # 4. Optional: LLM for semantic validation/anomaly detection
        # enrichment_notes = magistral_small_llm.classify_invoice(invoice, line_items)
        enrichment_notes = None  # Placeholder for LLM output

        # 5. Write enrichment back to ClickHouse
        update_sql = f"""
            ALTER TABLE invoice_data
            UPDATE validated = {1 if not flagged_fields else 0},
                   flagged_fields = '{",".join(flagged_fields)}',
                   extracted_entities = '{json.dumps(entities)}',
                   enrichment_notes = '{enrichment_notes if enrichment_notes else ""}',
                   last_checked = now()
            WHERE invoice_id = '{query.invoice_id}'
        """
        client.command(update_sql)

        return InvoiceEnrichmentResult(
            invoice_id=query.invoice_id,
            validated=(len(flagged_fields) == 0),
            flagged_fields=flagged_fields or None,
            extracted_entities=entities,
            enrichment_notes=enrichment_notes
        )

Key Points

  • Automated validation: Flags discrepancies between stated and computed totals, and can be extended for PO-matching, duplicate detection, or tax compliance.
  • Extensible enrichment: LLM integration enables semantic extraction, classification, and human-readable enrichment notes.
  • Traceability: All enrichment actions are written back to ClickHouse, ensuring a clear audit trail for every invoice.

4- Orchestrating the Agents

To streamline user interaction, you’ll use a centralized orchestrator (a “hub” or “router”) that exposes all available agents under a common interface. The user can specify the task or agent type, provide the relevant query, and receive structured results, making it easy to scale and extend as new agents are added.

Step 1: Define the Orchestrator

Using pydantic-ai, you can implement an orchestrator class that registers all your agents and dispatches incoming requests:

from typing import Dict, Any
from pydantic_ai.hub import AgentHub

# Instantiate your agents (see previous sections)
employee_agent = EmployeeTrainingAgent()
supply_chain_agent = SupplyChainAgent()
invoice_agent = InvoiceEnrichmentAgent()

# Register agents in a hub
hub = AgentHub([
    employee_agent,
    supply_chain_agent,
    invoice_agent
])

# Mapping for user-friendly agent names (optional)
AGENT_NAME_MAP = {
    "employee_training": employee_agent,
    "supply_chain": supply_chain_agent,
    "invoice_enrichment": invoice_agent
}

Next, let’s create an API for the orchestration: 

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class AgentInvocation(BaseModel):
    agent: str
    payload: Dict[str, Any]

@app.post("/invoke")
def invoke_agent(invocation: AgentInvocation):
    agent = AGENT_NAME_MAP.get(invocation.agent)
    if not agent:
        return {"error": "Unknown agent"}
    if invocation.agent == "employee_training":
        query = TrainingGapQuery(**invocation.payload)
        result = agent.find_training_gaps(query)
    elif invocation.agent == "supply_chain":
        query = SupplyChainQuery(**invocation.payload)
        result = agent.analyze_supply_chain(query)
    elif invocation.agent == "invoice_enrichment":
        query = InvoiceEnrichmentQuery(**invocation.payload)
        result = agent.enrich_invoice(query)
    else:
        return {"error": "Unsupported action"}
    return result

# Now POST {"agent": "invoice_enrichment", "payload": {"invoice_id": "INV-2025-010"}} etc.

Key Advantages of This Pattern

  • User-Friendly: Users or calling systems select the agent and pass structured queries—no direct knowledge of agent internals needed.
  • Extensible: Add new agents and update the orchestrator with minimal code changes.
  • Unified Interface: Centralizes logging, error handling, and access controls for all agents.

Integration-Ready: Ready for use in backend APIs, UIs, chatbots, or batch pipelines.




Results: What You Achieve with Multiagent Orchestration

With your agents orchestrated and data flowing through ClickHouse, you now have a modular AI-driven system capable of transforming manufacturing operations. Here’s a snapshot of real outputs generated by each agent in your stack:

1. Employee Training Agent

Query:

Find training gaps for employee ID EMP-1234

{
  "required_courses": ["Safety_101", "Machine_Handling_Basics"],
  "next_due": "2025-07-20"
}

2. Supply Chain Events Agent

Query:

Analyze supply chain for product ID prod_001 (Hydraulic Pump)

{
  "bottleneck_locations": ["plant_1_warehouse"],
  "risk_score": 0.22,
  "flagged_events": ["2025-07-10 08:00:00 at plant_1_warehouse"],
  "recommendations": ["Reorder at plant_1_warehouse"]
}

3. Invoice Enrichment Agent

Query:

Enrich invoice with ID INV-2025-010

{
  "invoice_id": "INV-2025-010",
  "validated": true,
  "flagged_fields": null,
  "extracted_entities": {
    "supplier_name": "Precision Bearings Co.",
    "total_items": 2,
    "po_id": "PO-8501"
  },
  "enrichment_notes": null
}

System-Level Impact

  • Speed: Tasks that once took hours (or days) can be completed in seconds.
  • Accuracy: AI-powered agents ground their responses in real data and documentation, drastically reducing manual error.
  • Scalability: The orchestrator framework allows you to add new agents or data sources with minimal effort—future-proofing your system.
  • Traceability: Every action and enrichment step is logged, making audits straightforward and transparent.



Future Notes and Conclusions

The architecture and workflow you’ve implemented form a foundation that is both modular and highly extensible, designed not just for today’s manufacturing automation, but to support tomorrow’s digital transformation as well. As you look to the future, here are key areas to consider for further evolution:

  • Agent Chaining and Complex Workflows:
    You can enable more sophisticated automation by allowing agents to trigger or delegate tasks to one another, forming multi-step workflows. For example, an anomaly detected by the Supply Chain Events Agent could automatically invoke the Employee Training Agent to schedule a refresher for relevant personnel.
  • Feedback Loops and Continuous Learning:
    With all actions and enrichment steps logged in ClickHouse, you can introduce reinforcement learning or human-in-the-loop workflows. Feedback from users—such as manual overrides or corrections—can be fed back into the system, improving LLM prompts, anomaly thresholds, or entity extraction logic.
  • Richer Multimodal and Multilingual Support:
    Incorporating images, sensor streams, or videos (e.g., equipment maintenance walkthroughs) will enable agents to reason over more than just text. With models like Magistral Small supporting multiple languages, you can also broaden your agent system’s applicability across global operations.
  • Security, Privacy, and Audit:
    As AI agents begin to impact critical business processes, robust authentication, role-based access control, and end-to-end audit logging become essential. You should plan for integration with your organization’s IAM and compliance stack.
  • Integration with External Systems:
    The agents described here can be extended to communicate directly with ERP, MES, or supplier management systems—making your automation both horizontally and vertically integrated across the manufacturing tech stack.
  • Edge and Real-Time Deployments:
    With lightweight, containerized agents and efficient embedding models, you can push certain agents closer to the factory floor, enabling real-time decisioning even in low-connectivity environments.


Conclusions

By building a multiagent system using open, composable tools like PydanticAI, ClickHouse, and Magistral Small, you have transformed your manufacturing processes from static, manual, and error-prone to dynamic, data-driven, and scalable. This system is:

  • Adaptable: Each agent can evolve independently, keeping pace with new regulations, products, or process changes.
  • Auditable: Every enrichment or decision step is fully traceable, meeting the demands of both regulators and auditors.
  • Proactive: Your organization can move from reactive firefighting to proactive, predictive operations.
  • Future-Proof: The modular approach makes it easy to swap out models, add new agents, or integrate the latest AI innovations without rearchitecting your core systems.

The multiagent pattern isn’t just a technical trend—it’s a strategic lever for competitiveness in the era of Industry 4.0. As you continue to experiment, refine, and extend your system, you’ll not only automate but also elevate human expertise and business performance throughout your manufacturing enterprise.

Ready to take the next step?

Consider piloting advanced agent workflows, integrating real-time data streams, and driving feedback-based model improvement to unlock even more value from your AI-powered manufacturing backbone. Get in touch with our team to learn how we can help! 

Authors

We use cookies to ensure the best experience on our website. Learn more