A hands-on guide to building multiagent AI systems for manufacturing supply chain automation covering agent design, orchestration, data pipelines, and real-world deployment
Manufacturing is undergoing a seismic shift, AI-driven multiagent systems are at the heart of this transformation. By leveraging specialized, cooperating agents, modern manufacturing supply chains can achieve real-time decisioning, deep process insight, automated document enrichment, and on-demand upskilling of employees, all on a robust, cloud-hosted, and scalable stack.
This article provides a practical, technically deep guide to building such a multi-agent system using the latest open frameworks: PydanticAI for composable agent orchestration, ClickHouse for scalable tabular and vector data storage, and the Mistral Magistral Small LLM for advanced reasoning.
We’ll ground this in a supply chain scenario, demonstrating four collaborating agents:
By the end of this article, you will have a scaffold by which to create multi-agent systems, especially in the manufacturing sector, where supply chain automation can dramatically improve productivity.
Let’s dive in.
Before we begin, let’s take a quick look at what AI agents are.
An AI agent is a self-contained, goal-driven computational entity that perceives its environment, processes observations, and autonomously executes actions to achieve specific objectives.
Technically, an AI agent is characterized by its policy, a mapping from perceptual inputs (which may include structured data, sensor readings, unstructured documents, or API responses) to actions or outputs. This mapping can be stateless (simple rules) or stateful, incorporating memory (episodic, semantic, or vector-based) to maintain context over time.
Modern AI agents often combine symbolic reasoning (e.g., rules, logic), machine learning (pattern recognition, embeddings), and tool use (via APIs or plug-ins) to make complex, context-aware decisions.
What distinguishes advanced AI agents in contemporary systems is their ability to operate in dynamic, open-ended environments, integrating multiple AI capabilities, such as language understanding (LLMs), perception (CV, sensor fusion), and retrieval (vector search, knowledge graphs), under a unified control loop.
Technically, agents interact through explicit interfaces, exchanging typed messages or function calls, and can be orchestrated into multiagent systems for collaborative or competitive problem-solving.
Orchestration frameworks like PydanticAI formalize agent inputs/outputs using strongly typed models, enabling safe inter-agent communication and composability, while supporting external tool invocation, long-term memory, and hierarchical agent composition for complex, multi-stage tasks.
So, to summarize, AI agents encapsulate:
By combining these components, AI agents become modular, composable units of intelligence that can be orchestrated into powerful multiagent systems, each agent specializing in distinct tasks, sharing context, and collaborating to solve complex, end-to-end business problems in manufacturing and beyond.
Let’s now look at how multiagent systems work.
If you are able to build capable agents, you can orchestrate them to create multiagent systems that complete entire tasks on their own. This can be invaluable in scenarios where you have a lot of legacy data and systems that have been a challenge to modernize.
In domains like manufacturing, where workflows are heterogeneous (from HR to logistics to QA), a multiagent setup allows:
With this workflow, you can scale the number of agents incrementally without disrupting existing processes.
Let’s take a simple example. Most manufacturing companies have manuals that workers are supposed to follow for each process step, whether it’s machine calibration, safety procedures, or quality checks. In reality, those manuals are often scattered across PDFs, paper binders, or outdated intranet systems.
By deploying a multiagent system, you can assign one agent to extract and structure the content from these manuals, another to cross-reference them with live sensor data, and a third to answer real-time worker queries (or help train workers) or flag compliance violations as they happen. All of this occurs autonomously, without you having to manually orchestrate the flow.
What’s powerful here is that you don’t have to overhaul your entire tech stack at once. You can start with one or two specialized agents, perhaps an invoice validation agent and a training agent, and as your needs evolve, incrementally add new agents for analytics, quality assurance, predictive maintenance, or supply chain optimization.
In short, multiagent systems let you break down complex, end-to-end workflows into composable, intelligent building blocks, making your entire manufacturing operation more adaptive, resilient, and future-ready.
There are several agent orchestration frameworks available today, including LangChain, Microsoft’s Autogen, OpenAI Agents SDK, and CrewAI. While these frameworks can greatly simplify agent orchestration, you may find that too much “framework magic” can make it difficult to debug and deeply understand your agentic workflows. If your goal is high accuracy and quality, it often pays to keep the orchestration layer as thin and transparent as possible. By doing so, you can focus your efforts on optimizing the underlying data architecture and refining your prompt strategies, ensuring that you extract the best possible performance and reliability from your multiagent system.
In this article, we’ll use PydanticAI as the orchestration layer. PydanticAI serves as a foundational building block for many agentic frameworks, providing type-safe, modular orchestration without unnecessary complexity.
For the data layer, we’ll leverage ClickHouse, a high-performance, cloud-native columnar database capable of handling massive-scale SQL queries and efficient vector search. Since many manufacturing environments already rely on SQL databases, ClickHouse offers a seamless way to scale data retrieval and analytics, letting you modernize your workflows without extensive ETL or replatforming efforts. Also, ClickHouse is already used for analytics in many of the modern manufacturing companies for analytics.
Magistral Small is Mistral AI’s compact, high-performance reasoning model, a 24 billion‑parameter LLM designed for transparent, chain‑of‑thought reasoning and multilingual understanding. It’s an open‑weight model released under the Apache 2.0 license, allowing full commercial and non‑commercial use while supporting deployability on local hardware, including a single RTX 4090 GPU or a 32 GB‑RAM MacBook once quantized. Fine‑tuned using supervised traces from the larger Magistral Medium and reinforced via RL, it excels at multi‑step logical tasks, providing detailed intermediate reasoning steps rather than opaque answers.
So, here’s our stack:
Let’s now break down the agents we will create. We’ll create three of them in this guide.
Let’s get on with the implementation of the agent now.
First, we need to install ClickHouse and insert some data. On Ubuntu-based systems:
# Add Yandex APT repo and key
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4
echo "deb https://packages.clickhouse.com/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list
# Install ClickHouse server + client
sudo apt update
sudo apt install clickhouse-server clickhouse-client
# Start the service
sudo systemctl enable --now clickhouse-server
These steps configure and launch the ClickHouse service and client tools
Open the ClickHouse client:
clickhouse-client --user default --password
Then create a table to store the training manual chunks (we will use this to store the chunks from various training manuals):
CREATE TABLE training_manual_chunks (
chunk_id UUID,
manual_id String,
manual_title String,
section String,
page_number UInt16,
content String,
embedding Array(Float32),
upload_date DateTime
) ENGINE = MergeTree()
ORDER BY (manual_id, page_number, chunk_id);
Let’s also create the table for the supply chain events:
CREATE TABLE supply_chain_events (
event_id UUID,
timestamp DateTime,
product_id String,
product_name String,
location String,
inventory_level UInt32,
reorder_threshold UInt32,
anomaly_score Nullable(Float32),
forecasted_shortage Nullable(UInt8), -- 1 = yes, 0 = no
recommended_action Nullable(String), -- e.g., "Reorder", "Redirect"
vector Array(Float32) -- Embedding for similarity search
) ENGINE = MergeTree()
ORDER BY (product_id, timestamp);
Finally, let’s create the invoice enrichment agent table:
CREATE TABLE invoice_data (
invoice_id String,
invoice_date Date,
supplier_id String,
supplier_name String,
po_id String, -- Linked Purchase Order
total_amount Float32,
currency String,
line_item_count UInt16,
line_items_json String, -- JSON array with detailed line items
validated UInt8, -- 1 = valid, 0 = flagged
flagged_fields Nullable(String), -- Comma-separated list of flagged items
extracted_entities Nullable(String), -- Key entities extracted (e.g., tax IDs)
enrichment_notes Nullable(String), -- Human/AI feedback or auto-tags
erp_sync_status String, -- e.g., "pending", "synced", "error"
last_checked DateTime
) ENGINE = MergeTree()
ORDER BY (invoice_id, invoice_date);
Now create a Python virtual environment, and install these:
pip install clickhouse-connect pydantic-ai openai sentence-transformers
To interact with models like Magistral Small through OpenRouter, you’ll need an API key. Get that from OpenRouter platform, and then:
export OPENROUTER_API_KEY='your_generated_api_key_here'
Next, let’s ingest some data into these tables.
During data ingestion, we need to pick the key columns (or text from the training manual) and create vector embeddings - and insert that with the data. Here’s how you can do it for the training manuals:
You can use Python libraries like PyMuPDF (fitz) or pdfplumber to extract text from each page or section. In a complex manual, however, you should use VLMs (Vision Language Models) to parse the data. In this tutorial, we will use fitz:
import fitz # PyMuPDF
def extract_chunks_from_pdf(pdf_path, chunk_size=300):
chunks = []
try:
doc = fitz.open(pdf_path)
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
if not text:
continue
for start in range(0, len(text), chunk_size):
chunks.append({
"page_number": page_num + 1,
"content": text[start:start+chunk_size]
})
return chunks
except Exception as e:
print(f"Error: Could not read the PDF file '{pdf_path}'. Please ensure it exists and is not corrupted.")
print(f"Details: {e}")
return None
Use a sentence embedding model (e.g., from sentence-transformers):
from sentence_transformers import SentenceTransformer
# Load the pre-trained model
model = SentenceTransformer('all-MiniLM-L6-v2')
def get_embedding(text):
"""Generates a vector embedding for the given text."""
# The .tolist() converts the NumPy array to a standard Python list
return model.encode(text).tolist()
Use the clickhouse-connect Python client to insert enriched data:
import uuid
import datetime
import sys
from clickhouse_connect import get_client
# Establish connection to the database
client = get_client(host='localhost', database='default')
def insert_chunks_to_clickhouse(pdf_path, manual_id, manual_title):
print(f"Attempting to extract chunks from '{pdf_path}'...")
chunks = extract_chunks_from_pdf(pdf_path)
if chunks is None:
print("Halting ingestion due to file error.")
sys.exit(1)
if not chunks:
print("Warning: No text could be extracted from the PDF. Nothing to insert.")
return
print(f"Extracted {len(chunks)} chunks. Now building SQL command...")
# Start the SQL command. We specify the columns to ensure correct order.
sql_command_start = 'INSERT INTO training_manual_chunks (chunk_id, manual_id, manual_title, section, page_number, content, embedding, upload_date) VALUES '
list_of_value_strings = []
for chunk in chunks:
# Escape single quotes in any text fields
safe_manual_title = manual_title.replace("'", "''")
safe_content = chunk['content'].replace("'", "''").replace("\\", "\\\\")
# Create a string for each value in the row
value_string = f"('{uuid.uuid4()}', '{manual_id}', '{safe_manual_title}', 'N/A', {chunk['page_number']}, '{safe_content}', {get_embedding(chunk['content'])}, '{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}')"
list_of_value_strings.append(value_string)
# Join all the value strings together, separated by commas
full_sql_command = sql_command_start + ',\n'.join(list_of_value_strings)
try:
print("Executing final insert command...")
# Execute the single, complete SQL string. This has no extra arguments.
client.command(full_sql_command)
print(f"Successfully inserted {len(list_of_value_strings)} chunks for {manual_title}")
except Exception as e:
print("\n--- An error occurred during the database insert ---")
print(f"Error details: {e}")
Let’s now also ingest some data into the supply_chain_events and invoice_data tables.
You can use a Python script like this to insert data into the supply_chain_events table.
import uuid
import datetime
from clickhouse_connect import get_client
from sentence_transformers import SentenceTransformer
import sys
# --- Client and Model Setup ---
client = get_client(host='localhost', database='default')
embedder = SentenceTransformer('all-MiniLM-L6-v2')
# --- Sample Data ---
supply_chain_events = [
{"product_id": "prod_001", "product_name": "Hydraulic Pump", "location": "plant_1_warehouse", "inventory_level": 58, "reorder_threshold": 60, "anomaly_score": 0.0, "forecasted_shortage": 0, "recommended_action": None},
{"product_id": "prod_002", "product_name": "Valve Assembly", "location": "plant_2_shopfloor", "inventory_level": 200, "reorder_threshold": 150, "anomaly_score": 0.0, "forecasted_shortage": 0, "recommended_action": None},
]
print(f"Preparing to insert {len(supply_chain_events)} supply chain events...")
# --- SQL Command Preparation ---
sql_command_start = 'INSERT INTO supply_chain_events (event_id, timestamp, product_id, product_name, location, inventory_level, reorder_threshold, anomaly_score, forecasted_shortage, recommended_action, vector) VALUES '
list_of_value_strings = []
for event in supply_chain_events:
# Generate an embedding for a descriptive string of the event
event_description = f"{event['product_name']} at {event['location']}, Inventory: {event['inventory_level']}"
vector = embedder.encode(event_description).tolist()
# Escape single quotes and handle potential None values for SQL insertion
safe_product_id = event['product_id'].replace("'", "''")
safe_product_name = event['product_name'].replace("'", "''")
safe_location = event['location'].replace("'", "''")
if event['recommended_action'] is None:
recommended_action_sql = 'NULL'
else:
safe_recommended_action = event['recommended_action'].replace("'", "''")
recommended_action_sql = f"'{safe_recommended_action}'"
# Construct the VALUES string for the current row
value_string = (
f"('{uuid.uuid4()}', "
f"'{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}', "
f"'{safe_product_id}', "
f"'{safe_product_name}', "
f"'{safe_location}', "
f"{event['inventory_level']}, "
f"{event['reorder_threshold']}, "
f"{event['anomaly_score']}, "
f"{event['forecasted_shortage']}, "
f"{recommended_action_sql}, "
f"{vector}"
f")"
)
list_of_value_strings.append(value_string)
# Combine the start of the command with all the value strings
full_sql_command = sql_command_start + ',\n'.join(list_of_value_strings)
# --- Database Execution ---
try:
print("Executing insert command for supply chain events...")
client.command(full_sql_command)
print(f"Successfully inserted {len(list_of_value_strings)} supply chain events!")
except Exception as e:
print("\n--- An error occurred during the database insert ---")
print(f"Error details: {e}")
sys.exit(1)
In actual production setup, your embedding text will be based on the underlying data structure. For some of the fields, you can also use sparse neural retrieval (like the product_name).
Finally, let’s insert some data into the invoice_data table - which the agent will enrich.
import datetime
import json
from clickhouse_connect import get_client
import sys
# --- Client Setup ---
client = get_client(host='localhost', database='default')
# --- Sample Data ---
sample_invoices = [
{
"invoice_id": "INV-2025-010", "invoice_date": datetime.date(2025, 7, 10), "supplier_id": "SUP-2001", "supplier_name": "Precision Bearings Co.", "po_id": "PO-8501", "total_amount": 18500.00, "currency": "USD", "line_item_count": 2,
"line_items_json": json.dumps([{"item": "Ball Bearing", "qty": 50, "unit_price": 350}, {"item": "Seal", "qty": 100, "unit_price": 10}]),
"validated": False, "flagged_fields": None, "extracted_entities": None, "enrichment_notes": None, "erp_sync_status": "pending", "last_checked": datetime.datetime.now()
},
{
"invoice_id": "INV-2025-011", "invoice_date": datetime.date(2025, 7, 11), "supplier_id": "SUP-2002", "supplier_name": "Hydro Lubes Pvt Ltd", "po_id": "PO-8502", "total_amount": 9200.00, "currency": "USD", "line_item_count": 3,
"line_items_json": json.dumps([{"item": "Hydraulic Oil", "qty": 20, "unit_price": 350}, {"item": "Coolant", "qty": 10, "unit_price": 130}, {"item": "Filter", "qty": 5, "unit_price": 200}]),
"validated": False, "flagged_fields": None, "extracted_entities": None, "enrichment_notes": None, "erp_sync_status": "pending", "last_checked": datetime.datetime.now()
},
# ... (additional invoices) ...
]
print(f"Preparing to insert {len(sample_invoices)} invoices...")
# --- SQL Command Preparation ---
sql_command_start = (
'INSERT INTO invoice_data '
'(invoice_id, invoice_date, supplier_id, supplier_name, po_id, total_amount, currency, line_item_count, line_items_json, validated, flagged_fields, extracted_entities, enrichment_notes, erp_sync_status, last_checked) VALUES '
)
list_of_value_strings = []
for inv in sample_invoices:
# Escape single quotes for safe SQL insertion
safe_invoice_id = inv['invoice_id'].replace("'", "''")
safe_supplier_name = inv['supplier_name'].replace("'", "''")
safe_po_id = inv['po_id'].replace("'", "''")
safe_line_items_json = inv['line_items_json'].replace("'", "''")
# Handle fields that can be None, formatting them as SQL NULL
flagged_fields_sql = f"'{inv['flagged_fields']}'" if inv['flagged_fields'] is not None else 'NULL'
extracted_entities_sql = f"'{inv['extracted_entities']}'" if inv['extracted_entities'] is not None else 'NULL'
enrichment_notes_sql = f"'{inv['enrichment_notes']}'" if inv['enrichment_notes'] is not None else 'NULL'
# Format dates and datetimes to strings
invoice_date_sql = inv['invoice_date'].strftime('%Y-%m-%d')
last_checked_sql = inv['last_checked'].strftime('%Y-%m-%d %H:%M:%S')
# Construct the VALUES string for the current row
value_string = (
f"('{safe_invoice_id}', '{invoice_date_sql}', '{inv['supplier_id']}', '{safe_supplier_name}', '{safe_po_id}', "
f"{inv['total_amount']}, '{inv['currency']}', {inv['line_item_count']}, '{safe_line_items_json}', {inv['validated']}, "
f"{flagged_fields_sql}, {extracted_entities_sql}, {enrichment_notes_sql}, '{inv['erp_sync_status']}', '{last_checked_sql}')"
)
list_of_value_strings.append(value_string)
# Combine the start of the command with all the value strings
full_sql_command = sql_command_start + ',\n'.join(list_of_value_strings)
# --- Database Execution ---
try:
print("Executing insert command for invoices...")
client.command(full_sql_command)
print(f"Successfully inserted {len(list_of_value_strings)} invoices!")
except Exception as e:
print("\n--- An error occurred during the database insert ---")
print(f"Error details: {e}")
sys.exit(1)
Now that we have the data ingestion sorted, let’s start building the agents.
Now that you’ve set up your data and ingested realistic records into ClickHouse, you’re ready to start building the core agents that will power your manufacturing automation system. Each agent will encapsulate a specific workflow, connecting to both your data layer and language model for reasoning, enrichment, and decision support.
In the following sections, you’ll build three agents, Employee Training Agent, Supply Chain Analysis Agent, and Invoice Enrichment Agent, using PydanticAI as your orchestration framework. Each agent will expose clearly typed interfaces for inputs and outputs, leverage the appropriate tables in ClickHouse, and, where needed, call the Magistral Small LLM for deep semantic reasoning or content generation.
Let’s begin by designing the core structure of these agents with PydanticAI, starting with the Employee Training Agent.
Here’s the workflow that we will use:
Agent Workflow
To start, let’s define Pydantic models for type safety:
from pydantic import BaseModel
from typing import List, Optional
class TrainingGapQuery(BaseModel):
employee_id: str
class TrainingGapResponse(BaseModel):
required_courses: List[str]
next_due: Optional[str]
class TrainingQAQuery(BaseModel):
employee_id: str
question: str
class TrainingQAResponse(BaseModel):
answer: str
source_chunks: List[str]
class CompletionFeedback(BaseModel):
employee_id: str
course_id: str
feedback: Optional[str]
score: Optional[float]
Now - let’s create an agent function to find training gaps and answer training questions:
from pydantic_ai.agent import Agent, agent_function
from clickhouse_connect import get_client
client = get_client(host='localhost', database='default')
class EmployeeTrainingAgent(Agent):
@agent_function
def find_training_gaps(self, query: TrainingGapQuery) -> TrainingGapResponse:
# Query ClickHouse for incomplete trainings
sql = f"""
SELECT course_id, course_due_date
FROM employee_training
WHERE employee_id = '{query.employee_id}' AND course_completed = 0
ORDER BY course_due_date ASC
"""
result = client.query(sql)
required_courses = [row['course_id'] for row in result.result_rows]
next_due = result.result_rows[0]['course_due_date'] if result.result_rows else None
return TrainingGapResponse(required_courses=required_courses, next_due=next_due)
@agent_function
def answer_training_question(self, query: TrainingQAQuery) -> TrainingQAResponse:
# Embed the question and find relevant manual chunks
question_embedding = get_embedding(query.question) # Use your embedding model/API here
sql = f"""
SELECT content
FROM training_manual_chunks
ORDER BY cosineDistance(embedding, {question_embedding})
LIMIT 3
"""
result = client.query(sql)
source_chunks = [row['content'] for row in result.result_rows]
# Compose the answer with Magistral Small (LLM)
context = "\n".join(source_chunks)
answer = magistral_small_llm.generate_answer(query.question, context)
return TrainingQAResponse(answer=answer, source_chunks=source_chunks)
@agent_function
def record_completion_feedback(self, feedback: CompletionFeedback):
# Update feedback and score for completed training
sql = f"""
ALTER TABLE employee_training
UPDATE feedback_score = {feedback.score}, completion_date = now()
WHERE employee_id = '{feedback.employee_id}' AND course_id = '{feedback.course_id}'
"""
client.command(sql)
The Supply Chain Events Agent continuously monitors inventory, detects anomalies, forecasts bottlenecks, and recommends actions such as reordering or rerouting. It leverages ClickHouse for storing and analyzing high-frequency supply chain data, and uses vector embeddings for semantic pattern detection and retrieval. When needed, it can call on Magistral Small for complex reasoning or generating summary reports.
Let’s define the Pydantic schemas:
from pydantic import BaseModel
from typing import List, Optional
class SupplyChainQuery(BaseModel):
product_id: str
lookback_days: int = 7
class SupplyChainAnomalyResponse(BaseModel):
bottleneck_locations: List[str]
risk_score: float
flagged_events: List[str]
recommendations: Optional[List[str]]
Now let’s implement the agent logic:
from pydantic_ai.agent import Agent, agent_function
from clickhouse_connect import get_client
client = get_client(host='localhost', database='default')
class SupplyChainAgent(Agent):
@agent_function
def analyze_supply_chain(self, query: SupplyChainQuery) -> SupplyChainAnomalyResponse:
# 1. Fetch recent events for product
sql = f"""
SELECT timestamp, location, inventory_level, reorder_threshold, anomaly_score
FROM supply_chain_events
WHERE product_id = '{query.product_id}' AND timestamp >= today() - INTERVAL {query.lookback_days} DAY
ORDER BY timestamp DESC
"""
result = client.query(sql)
events = result.result_rows
# 2. Flag low inventory locations and calculate average risk
bottleneck_locations = []
risk_sum = 0.0
flagged_events = []
recommendations = []
count = 0
for row in events:
count += 1
risk_sum += row['anomaly_score'] or 0
if row['inventory_level'] < row['reorder_threshold']:
bottleneck_locations.append(row['location'])
flagged_events.append(f"{row['timestamp']} at {row['location']}")
recommendations.append(f"Reorder at {row['location']}")
risk_score = (risk_sum / count) if count else 0.0
return SupplyChainAnomalyResponse(
bottleneck_locations=bottleneck_locations,
risk_score=round(risk_score, 2),
flagged_events=flagged_events,
recommendations=recommendations if recommendations else None
)
We can also create an agent function to analyze current events and compare them to historical events:
@agent_function
def find_similar_events(self, event_embedding: list, top_n: int = 3):
sql = f"""
SELECT timestamp, location, inventory_level
FROM supply_chain_events
ORDER BY cosineDistance(vector, {event_embedding})
LIMIT {top_n}
"""
result = client.query(sql)
return result.result_rows
The Invoice Enrichment Agent automates the process of validating, tagging, and augmenting incoming invoices—minimizing manual effort, reducing errors, and accelerating accounts processing. It reads raw invoice data from ClickHouse, cross-references purchase orders (POs), parses and extracts key entities, flags anomalies, and writes enrichment results back for audit and downstream ERP integration.
First, let’s create the schemas:
from pydantic import BaseModel
from typing import Optional, List
class InvoiceEnrichmentQuery(BaseModel):
invoice_id: str
class InvoiceEnrichmentResult(BaseModel):
invoice_id: str
validated: Optional[bool]
flagged_fields: Optional[List[str]]
extracted_entities: Optional[dict]
enrichment_notes: Optional[str]
Next, let’s implement the agent logic:
from pydantic_ai.agent import Agent, agent_function
from clickhouse_connect import get_client
import json
client = get_client(host='localhost', database='default')
class InvoiceEnrichmentAgent(Agent):
@agent_function
def enrich_invoice(self, query: InvoiceEnrichmentQuery) -> InvoiceEnrichmentResult:
# 1. Fetch raw invoice
sql = f"""
SELECT * FROM invoice_data
WHERE invoice_id = '{query.invoice_id}'
LIMIT 1
"""
result = client.query(sql)
if not result.result_rows:
return InvoiceEnrichmentResult(invoice_id=query.invoice_id)
invoice = result.result_rows[0]
# 2. Parse and extract entities
line_items = json.loads(invoice['line_items_json'])
entities = {
"supplier_name": invoice['supplier_name'],
"total_items": invoice['line_item_count'],
"po_id": invoice['po_id'],
# You could add LLM-based extraction for tax ID, terms, etc.
}
# 3. Validation example: check amounts match line items
calculated_total = sum(item["qty"] * item["unit_price"] for item in line_items)
flagged_fields = []
if abs(calculated_total - invoice['total_amount']) > 0.01:
flagged_fields.append("total_amount")
# 4. Optional: LLM for semantic validation/anomaly detection
# enrichment_notes = magistral_small_llm.classify_invoice(invoice, line_items)
enrichment_notes = None # Placeholder for LLM output
# 5. Write enrichment back to ClickHouse
update_sql = f"""
ALTER TABLE invoice_data
UPDATE validated = {1 if not flagged_fields else 0},
flagged_fields = '{",".join(flagged_fields)}',
extracted_entities = '{json.dumps(entities)}',
enrichment_notes = '{enrichment_notes if enrichment_notes else ""}',
last_checked = now()
WHERE invoice_id = '{query.invoice_id}'
"""
client.command(update_sql)
return InvoiceEnrichmentResult(
invoice_id=query.invoice_id,
validated=(len(flagged_fields) == 0),
flagged_fields=flagged_fields or None,
extracted_entities=entities,
enrichment_notes=enrichment_notes
)
To streamline user interaction, you’ll use a centralized orchestrator (a “hub” or “router”) that exposes all available agents under a common interface. The user can specify the task or agent type, provide the relevant query, and receive structured results, making it easy to scale and extend as new agents are added.
Using pydantic-ai, you can implement an orchestrator class that registers all your agents and dispatches incoming requests:
from typing import Dict, Any
from pydantic_ai.hub import AgentHub
# Instantiate your agents (see previous sections)
employee_agent = EmployeeTrainingAgent()
supply_chain_agent = SupplyChainAgent()
invoice_agent = InvoiceEnrichmentAgent()
# Register agents in a hub
hub = AgentHub([
employee_agent,
supply_chain_agent,
invoice_agent
])
# Mapping for user-friendly agent names (optional)
AGENT_NAME_MAP = {
"employee_training": employee_agent,
"supply_chain": supply_chain_agent,
"invoice_enrichment": invoice_agent
}
Next, let’s create an API for the orchestration:
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class AgentInvocation(BaseModel):
agent: str
payload: Dict[str, Any]
@app.post("/invoke")
def invoke_agent(invocation: AgentInvocation):
agent = AGENT_NAME_MAP.get(invocation.agent)
if not agent:
return {"error": "Unknown agent"}
if invocation.agent == "employee_training":
query = TrainingGapQuery(**invocation.payload)
result = agent.find_training_gaps(query)
elif invocation.agent == "supply_chain":
query = SupplyChainQuery(**invocation.payload)
result = agent.analyze_supply_chain(query)
elif invocation.agent == "invoice_enrichment":
query = InvoiceEnrichmentQuery(**invocation.payload)
result = agent.enrich_invoice(query)
else:
return {"error": "Unsupported action"}
return result
# Now POST {"agent": "invoice_enrichment", "payload": {"invoice_id": "INV-2025-010"}} etc.
Integration-Ready: Ready for use in backend APIs, UIs, chatbots, or batch pipelines.
With your agents orchestrated and data flowing through ClickHouse, you now have a modular AI-driven system capable of transforming manufacturing operations. Here’s a snapshot of real outputs generated by each agent in your stack:
Query:
Find training gaps for employee ID EMP-1234
{
"required_courses": ["Safety_101", "Machine_Handling_Basics"],
"next_due": "2025-07-20"
}
Query:
Analyze supply chain for product ID prod_001 (Hydraulic Pump)
{
"bottleneck_locations": ["plant_1_warehouse"],
"risk_score": 0.22,
"flagged_events": ["2025-07-10 08:00:00 at plant_1_warehouse"],
"recommendations": ["Reorder at plant_1_warehouse"]
}
Query:
Enrich invoice with ID INV-2025-010
{
"invoice_id": "INV-2025-010",
"validated": true,
"flagged_fields": null,
"extracted_entities": {
"supplier_name": "Precision Bearings Co.",
"total_items": 2,
"po_id": "PO-8501"
},
"enrichment_notes": null
}
The architecture and workflow you’ve implemented form a foundation that is both modular and highly extensible, designed not just for today’s manufacturing automation, but to support tomorrow’s digital transformation as well. As you look to the future, here are key areas to consider for further evolution:
By building a multiagent system using open, composable tools like PydanticAI, ClickHouse, and Magistral Small, you have transformed your manufacturing processes from static, manual, and error-prone to dynamic, data-driven, and scalable. This system is:
The multiagent pattern isn’t just a technical trend—it’s a strategic lever for competitiveness in the era of Industry 4.0. As you continue to experiment, refine, and extend your system, you’ll not only automate but also elevate human expertise and business performance throughout your manufacturing enterprise.
Ready to take the next step?
Consider piloting advanced agent workflows, integrating real-time data streams, and driving feedback-based model improvement to unlock even more value from your AI-powered manufacturing backbone. Get in touch with our team to learn how we can help!