Example 5: Agent Composition¶
This example demonstrates advanced agent composition patterns for building multi-agent systems that solve complex problems through coordination and collaboration.
File: examples/agents/05_agent_composition.py (504 lines)
Focus: Multi-agent systems and orchestration
Best For: Understanding how agents work together in complex workflows
Overview¶
This example showcases four major composition patterns:
Orchestrator Pattern: Master agent coordinates specialized agents
Pipeline Pattern: Sequential processing through multiple agents
Mesh Pattern: Interconnected agents that communicate dynamically
Factory Pattern: Dynamic agent creation for specialized tasks
These patterns demonstrate how Arshai’s agent architecture enables sophisticated multi-agent coordination using LLM function calling as the coordination mechanism.
Key Concepts Demonstrated¶
- Agent Orchestration
How a master agent coordinates multiple specialized agents to complete complex tasks.
- Sequential Processing
Pipeline pattern where each agent processes the output of the previous agent.
- Dynamic Collaboration
Mesh pattern enabling agents to communicate and cross-reference information.
- Dynamic Agent Creation
Factory pattern for creating specialized agents on-demand.
- Function-Based Coordination
Using LLM function calls to enable agent-to-agent communication.
Code Walkthrough¶
1. Specialized Agent Definitions
The example starts by creating domain-specific agents:
class DataAnalysisAgent(BaseAgent):
"""Specialized agent for data analysis tasks."""
def __init__(self, llm_client: ILLM, **kwargs):
system_prompt = """You are a data analysis expert.
Analyze data patterns, provide insights, and make recommendations."""
super().__init__(llm_client, system_prompt, **kwargs)
async def process(self, input: IAgentInput) -> Dict[str, Any]:
llm_input = ILLMInput(
system_prompt=self.system_prompt,
user_message=f"Analyze this data: {input.message}"
)
result = await self.llm_client.chat(llm_input)
return {
"analysis_type": "data_pattern_analysis",
"insights": result.get('llm_response', ''),
"confidence": 0.85,
"recommendations": ["Consider trend analysis", "Look for outliers"],
"data_quality": "good"
}
class ReportGenerationAgent(BaseAgent):
"""Specialized agent for generating reports."""
async def process(self, input: IAgentInput) -> Dict[str, Any]:
# Process report generation
return {
"report_type": "analytical_report",
"content": result.get('llm_response', ''),
"sections": ["Executive Summary", "Analysis", "Recommendations"],
"word_count": len(result.get('llm_response', '').split()),
"status": "completed"
}
Key Points: - Each agent has a focused domain of expertise - Structured return formats enable composition - Clear interfaces facilitate coordination
2. Orchestrator Pattern
Master agent that coordinates multiple specialized agents:
class OrchestratorAgent(BaseAgent):
"""Master agent that orchestrates multiple specialized agents."""
def __init__(self, llm_client: ILLM, specialized_agents: Dict[str, IAgent], **kwargs):
system_prompt = """You are an intelligent orchestrator.
Available agents:
- data_analyst: Analyzes data patterns and provides insights
- report_generator: Creates structured reports
- knowledge_search: Searches knowledge base for information
- memory_manager: Manages conversation context
Decide which agents to use and coordinate their work."""
super().__init__(llm_client, system_prompt, **kwargs)
self.agents = specialized_agents
async def process(self, input: IAgentInput) -> Dict[str, Any]:
# Define agent functions for LLM to call
async def analyze_data(data_description: str) -> str:
"""Use the data analysis agent to analyze data."""
result = await self.agents['data_analyst'].process(
IAgentInput(message=data_description)
)
return json.dumps(result, indent=2)
async def generate_report(content: str) -> str:
"""Use the report generator agent to create reports."""
result = await self.agents['report_generator'].process(
IAgentInput(message=content)
)
return result['content']
async def search_knowledge(query: str) -> str:
"""Use the knowledge base agent to find information."""
result = await self.agents['knowledge_search'].process(
IAgentInput(message=query)
)
return str(result)
# Background task for memory management
async def update_memory(interaction: str) -> None:
"""Update conversation memory in background."""
if 'memory_manager' in self.agents:
await self.agents['memory_manager'].process(IAgentInput(
message=interaction,
metadata=input.metadata
))
# Create LLM input with agent functions
llm_input = ILLMInput(
system_prompt=self.system_prompt,
user_message=input.message,
regular_functions={
"analyze_data": analyze_data,
"generate_report": generate_report,
"search_knowledge": search_knowledge
},
background_tasks={
"update_memory": update_memory
}
)
result = await self.llm_client.chat(llm_input)
return {
"orchestrator_response": result.get('llm_response', ''),
"agents_available": list(self.agents.keys()),
"coordination_metadata": {
"complexity": "multi_agent_orchestration"
}
}
Key Points: - LLM decides which agents to use based on the request - Agents wrapped as functions for LLM tool calling - Background tasks enable memory coordination - Orchestrator manages complexity and coordination
3. Pipeline Pattern
Sequential processing through multiple agents:
class PipelineAgent(BaseAgent):
"""Agent that implements a processing pipeline pattern."""
def __init__(self, llm_client: ILLM, pipeline_agents: List[IAgent], **kwargs):
system_prompt = "You coordinate a processing pipeline of specialized agents."
super().__init__(llm_client, system_prompt, **kwargs)
self.pipeline = pipeline_agents
async def process(self, input: IAgentInput) -> Dict[str, Any]:
"""Process input through a pipeline of agents."""
pipeline_results = []
current_input = input.message
# Process through each agent in the pipeline
for i, agent in enumerate(self.pipeline):
print(f"Stage {i+1}: Processing with {agent.__class__.__name__}")
# Create input for this stage
stage_input = IAgentInput(
message=current_input,
metadata=input.metadata
)
# Process with current agent
stage_result = await agent.process(stage_input)
# Store result and prepare for next stage
stage_info = {
"stage": i + 1,
"agent": agent.__class__.__name__,
"input": current_input[:100] + "...",
"output": str(stage_result)[:200] + "..."
}
pipeline_results.append(stage_info)
# Extract content for next stage
if isinstance(stage_result, dict):
if 'content' in stage_result:
current_input = stage_result['content']
elif 'insights' in stage_result:
current_input = stage_result['insights']
else:
current_input = json.dumps(stage_result)
else:
current_input = str(stage_result)
return {
"final_result": current_input,
"pipeline_stages": len(self.pipeline),
"stage_results": pipeline_results,
"processing_complete": True
}
Key Points: - Sequential processing through predefined agent sequence - Output of each stage becomes input to the next - Complete audit trail of processing stages - Handles different output formats automatically
4. Mesh Pattern
Interconnected agents that can communicate dynamically:
class MeshCoordinatorAgent(BaseAgent):
"""Agent that coordinates a mesh of interconnected agents."""
def __init__(self, llm_client: ILLM, agent_mesh: Dict[str, IAgent], **kwargs):
system_prompt = """You coordinate a mesh of interconnected agents.
Agents can communicate with each other to solve complex problems collaboratively."""
super().__init__(llm_client, system_prompt, **kwargs)
self.mesh = agent_mesh
async def process(self, input: IAgentInput) -> Dict[str, Any]:
"""Coordinate agents in a mesh pattern."""
# Agents can call each other
async def get_analysis(data: str) -> str:
"""Get analysis from data analyst."""
result = await self.mesh['analyst'].process(IAgentInput(message=data))
return json.dumps(result)
async def search_info(query: str) -> str:
"""Search knowledge base."""
result = await self.mesh['searcher'].process(IAgentInput(message=query))
return str(result)
async def cross_reference(analysis: str, knowledge: str) -> str:
"""Cross-reference analysis with knowledge base."""
combined = f"Analysis: {analysis}\nKnowledge: {knowledge}"
result = await self.mesh['reporter'].process(IAgentInput(message=combined))
return result['content']
# All agents available to each other
mesh_functions = {
"get_analysis": get_analysis,
"search_info": search_info,
"cross_reference": cross_reference
}
llm_input = ILLMInput(
system_prompt=self.system_prompt + f"\nMesh functions: {list(mesh_functions.keys())}",
user_message=input.message,
regular_functions=mesh_functions
)
result = await self.llm_client.chat(llm_input)
return {
"mesh_response": result.get('llm_response', ''),
"mesh_agents": list(self.mesh.keys()),
"interconnected": True
}
Key Points: - Agents can dynamically call each other - Cross-referencing and collaboration capabilities - LLM decides communication patterns - Flexible, adaptive coordination
5. Factory Pattern
Dynamic agent creation for specialized tasks:
class AgentFactory:
"""Factory for creating agents dynamically."""
def __init__(self, llm_client: ILLM):
self.llm_client = llm_client
self.agent_cache = {}
def create_specialist_agent(self, specialty: str) -> IAgent:
"""Create a specialist agent for a specific domain."""
if specialty in self.agent_cache:
return self.agent_cache[specialty]
class SpecialistAgent(BaseAgent):
def __init__(self, llm_client, specialty):
prompt = f"You are a {specialty} specialist. Provide expert advice and analysis."
super().__init__(llm_client, prompt)
self.specialty = specialty
async def process(self, input: IAgentInput) -> str:
llm_input = ILLMInput(
system_prompt=self.system_prompt,
user_message=f"As a {self.specialty} expert, help with: {input.message}"
)
result = await self.llm_client.chat(llm_input)
return result.get('llm_response', '')
agent = SpecialistAgent(self.llm_client, specialty)
self.agent_cache[specialty] = agent
return agent
Key Points: - On-demand agent creation for specialized domains - Caching for performance optimization - Flexible specialization based on requirements - Scalable architecture for growing needs
Running the Example¶
Prerequisites:
export OPENROUTER_API_KEY=your_key_here
Run the example:
cd examples/agents
python 05_agent_composition.py
Expected Output:
The example demonstrates four composition patterns:
Orchestrator Pattern - Complex task coordination
Pipeline Pattern - Sequential processing workflows
Mesh Pattern - Dynamic agent collaboration
Factory Pattern - Specialized agent creation
Composition Patterns Deep Dive¶
When to Use Each Pattern
Orchestrator Pattern: - Complex tasks requiring multiple specialized capabilities - When you need intelligent task decomposition - Scenarios where the LLM should decide agent coordination - Master-slave coordination requirements
Pipeline Pattern: - Sequential processing workflows - Data transformation pipelines - Document processing chains - When each stage builds on the previous
Mesh Pattern: - Dynamic collaboration requirements - Cross-referencing and validation needs - When agents need to communicate peer-to-peer - Complex problem-solving requiring multiple perspectives
Factory Pattern: - Dynamic specialization requirements - When agent types aren’t known at startup - Microservice-style architectures - Performance optimization through caching
Combining Patterns
Patterns can be combined for sophisticated architectures:
class HybridSystem:
"""System combining multiple composition patterns."""
def __init__(self, llm_client):
self.llm_client = llm_client
self.factory = AgentFactory(llm_client)
# Core agents
self.core_agents = {
'orchestrator': None, # Created on demand
'pipeline': None, # Created on demand
'mesh': None # Created on demand
}
async def process_complex_request(self, request: str, pattern: str = "auto"):
"""Process request using appropriate composition pattern."""
if pattern == "auto":
pattern = self.determine_pattern(request)
if pattern == "orchestrator":
return await self.use_orchestrator_pattern(request)
elif pattern == "pipeline":
return await self.use_pipeline_pattern(request)
elif pattern == "mesh":
return await self.use_mesh_pattern(request)
else:
return await self.use_hybrid_pattern(request)
def determine_pattern(self, request: str) -> str:
"""Determine best pattern based on request analysis."""
# Analyze request characteristics
if "step by step" in request.lower():
return "pipeline"
elif "cross-reference" in request.lower():
return "mesh"
elif "complex" in request.lower():
return "orchestrator"
else:
return "hybrid"
Real-World Implementation Examples¶
Document Processing System:
class DocumentProcessingSystem:
"""Real-world document processing using composition patterns."""
def __init__(self, llm_client):
self.llm_client = llm_client
# Specialized document agents
self.agents = {
'extractor': DocumentExtractionAgent(llm_client),
'classifier': DocumentClassificationAgent(llm_client),
'analyzer': ContentAnalysisAgent(llm_client),
'summarizer': SummarizationAgent(llm_client),
'validator': ValidationAgent(llm_client)
}
# Create orchestrator for coordination
self.orchestrator = DocumentOrchestratorAgent(llm_client, self.agents)
async def process_document(self, document_path: str, processing_type: str = "full"):
"""Process document using appropriate agent composition."""
if processing_type == "full":
# Use orchestrator for complex processing
return await self.orchestrator.process(IAgentInput(
message=f"Process document: {document_path}",
metadata={"processing_type": "comprehensive"}
))
elif processing_type == "pipeline":
# Use pipeline for standard workflow
pipeline = PipelineAgent(self.llm_client, [
self.agents['extractor'],
self.agents['classifier'],
self.agents['analyzer'],
self.agents['summarizer']
])
return await pipeline.process(IAgentInput(message=document_path))
elif processing_type == "validation":
# Use mesh for cross-validation
mesh_agents = {
'analyzer': self.agents['analyzer'],
'validator': self.agents['validator'],
'classifier': self.agents['classifier']
}
mesh = MeshCoordinatorAgent(self.llm_client, mesh_agents)
return await mesh.process(IAgentInput(
message=f"Validate document analysis: {document_path}"
))
Customer Service System:
class CustomerServiceSystem:
"""Customer service system using agent composition."""
def __init__(self, llm_client, crm_client, knowledge_base):
self.llm_client = llm_client
# Create specialized customer service agents
self.agents = {
'triage': TriageAgent(llm_client),
'technical': TechnicalSupportAgent(llm_client),
'billing': BillingAgent(llm_client, crm_client),
'escalation': EscalationAgent(llm_client),
'knowledge': KnowledgeSearchAgent(llm_client, knowledge_base)
}
# Create orchestrator for request routing
self.orchestrator = CustomerServiceOrchestrator(llm_client, self.agents)
async def handle_customer_request(self, request: str, customer_id: str):
"""Handle customer request using agent composition."""
input_data = IAgentInput(
message=request,
metadata={
"customer_id": customer_id,
"timestamp": datetime.now().isoformat(),
"channel": "agent_system"
}
)
# Use orchestrator to coordinate response
result = await self.orchestrator.process(input_data)
# If escalation needed, use mesh pattern for collaboration
if result.get("requires_escalation"):
escalation_mesh = {
'technical': self.agents['technical'],
'escalation': self.agents['escalation'],
'knowledge': self.agents['knowledge']
}
mesh_coordinator = MeshCoordinatorAgent(self.llm_client, escalation_mesh)
escalation_result = await mesh_coordinator.process(input_data)
return {
"primary_response": result,
"escalation_response": escalation_result,
"coordination_pattern": "orchestrator_plus_mesh"
}
return {
"response": result,
"coordination_pattern": "orchestrator"
}
Testing Composition Systems¶
Unit Testing Individual Patterns:
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_orchestrator_pattern():
"""Test orchestrator coordination."""
mock_llm = AsyncMock()
mock_llm.chat.return_value = {
"llm_response": "Analysis completed using data agent",
"usage": {"total_tokens": 100}
}
# Mock specialized agents
mock_data_agent = AsyncMock()
mock_data_agent.process.return_value = {"insights": "test insights"}
mock_report_agent = AsyncMock()
mock_report_agent.process.return_value = {"content": "test report"}
agents = {
'data_analyst': mock_data_agent,
'report_generator': mock_report_agent
}
orchestrator = OrchestratorAgent(mock_llm, agents)
result = await orchestrator.process(IAgentInput(
message="Analyze data and create report"
))
# Verify orchestrator response
assert "orchestrator_response" in result
assert result["agents_available"] == ['data_analyst', 'report_generator']
# Verify LLM had access to agent functions
call_args = mock_llm.chat.call_args[0][0]
assert "regular_functions" in call_args.__dict__
assert "analyze_data" in call_args.regular_functions
assert "generate_report" in call_args.regular_functions
@pytest.mark.asyncio
async def test_pipeline_pattern():
"""Test pipeline sequential processing."""
# Create mock agents
mock_agent1 = AsyncMock()
mock_agent1.process.return_value = {"content": "stage1 output"}
mock_agent1.__class__.__name__ = "MockAgent1"
mock_agent2 = AsyncMock()
mock_agent2.process.return_value = "final output"
mock_agent2.__class__.__name__ = "MockAgent2"
mock_llm = AsyncMock()
pipeline = PipelineAgent(mock_llm, [mock_agent1, mock_agent2])
result = await pipeline.process(IAgentInput(message="test input"))
# Verify pipeline execution
assert result["processing_complete"] is True
assert result["pipeline_stages"] == 2
assert result["final_result"] == "final output"
assert len(result["stage_results"]) == 2
# Verify agent calls
mock_agent1.process.assert_called_once()
mock_agent2.process.assert_called_once()
Integration Testing:
@pytest.mark.integration
@pytest.mark.asyncio
async def test_real_composition_system():
"""Test composition with real LLM client."""
config = ILLMConfig(model="gpt-4o-mini", temperature=0.1)
llm_client = OpenAIClient(config)
# Create real agents
agents = {
'analyst': DataAnalysisAgent(llm_client),
'reporter': ReportGenerationAgent(llm_client)
}
orchestrator = OrchestratorAgent(llm_client, agents)
result = await orchestrator.process(IAgentInput(
message="Analyze sales data: Q1=$100K, Q2=$120K, Q3=$135K and create a report"
))
# Verify realistic output
assert "orchestrator_response" in result
assert len(result["orchestrator_response"]) > 0
assert "agents_available" in result
Performance Considerations¶
Optimizing Composition Systems:
Caching Strategy: ```python class CachedCompositionSystem:
- def __init__(self):
self.agent_cache = {} self.result_cache = {}
- async def get_cached_result(self, request_hash: str):
return self.result_cache.get(request_hash)
- async def cache_result(self, request_hash: str, result: Any):
self.result_cache[request_hash] = result
Parallel Execution: ```python async def parallel_agent_execution(self, tasks: List[Tuple[IAgent, IAgentInput]]):
“””Execute multiple agents in parallel when possible.””” agent_tasks = [
agent.process(input_data) for agent, input_data in tasks
] return await asyncio.gather(*agent_tasks)
Resource Management: ```python class ResourceManagedComposition:
- def __init__(self, max_concurrent_agents: int = 5):
self.semaphore = asyncio.Semaphore(max_concurrent_agents)
- async def execute_with_limits(self, agent: IAgent, input_data: IAgentInput):
- async with self.semaphore:
return await agent.process(input_data)
Key Takeaways¶
- Composition Enables Complexity
Complex problems solved through coordinated simple agents.
- LLM Function Calling as Coordination
Natural coordination mechanism using function calls between agents.
- Pattern Selection Matters
Choose composition patterns based on problem characteristics and requirements.
- Flexibility Through Interfaces
IAgent interface enables any agent to participate in composition patterns.
- Scalable Architecture
Composition patterns scale from simple coordination to complex multi-agent systems.
- Testing Is Critical
Complex systems require comprehensive testing at multiple levels.
Next Steps¶
After mastering agent composition:
Implement Testing: Test complex multi-agent systems with unit and integration tests
Build Production Systems: Apply composition patterns to real-world problems
Optimize Performance: Implement caching and parallel execution strategies
Monitor Systems: Add observability to composition systems
Related Documentation: - Agent Patterns - Additional patterns for agent design - Creating Agents - Building agents for composition - Tools and Callables - Understanding the coordination mechanism