Workflow System¶
The Arshai workflow system provides a reference implementation for orchestrating multiple agents and nodes into a coordinated multi-step process. This is one way to build agentic systems in Arshai - not the way.
Important
This is a Reference Implementation
The workflow system is provided as an example of how to orchestrate agents in Arshai. You are encouraged to:
Use it as-is if it fits your needs
Adapt it for your specific use case
Build your own custom orchestration from scratch
The framework provides the building blocks - you create the solution that works for you.
Overview¶
The workflow system consists of three main components:
WorkflowRunner - Manages workflow execution and state
WorkflowOrchestrator - Coordinates nodes and routing logic
WorkflowConfig - Defines workflow structure and dependencies
Key Features:
Multi-step agent coordination
State management across workflow steps
Dynamic routing based on input
Node-based execution graph
Error handling and recovery
Callback support for custom logic
Core Concepts¶
Workflow Architecture¶
The workflow system follows a node-based execution model:
[Input] → [WorkflowRunner] → [WorkflowOrchestrator]
↓
[Node 1] → [Node 2] → [Node 3]
↓ ↓ ↓
[State] ← [State] ← [State]
↓
[Output]
Workflow State
Carries information across the entire workflow:
User context (ID, interaction count, etc.)
Current step and processing path
Agent-specific data
Working memories
Errors and notifications
Nodes
Individual processing units that:
Receive workflow state as input
Perform specific operations (agent processing, data transformation, etc.)
Return updated state
Determine next node via routing logic
Getting Started¶
Basic Workflow Example¶
Here’s a simple workflow with three nodes:
from arshai.workflows.workflow_config import WorkflowConfig
from arshai.workflows.workflow_runner import BaseWorkflowRunner
from arshai.workflows.node import Node
from arshai.core.interfaces.iworkflow import IWorkflowState
from typing import Dict, Any
# Step 1: Create workflow nodes
class GreetingNode(Node):
"""Node that greets the user."""
async def run(self, state: IWorkflowState, input_data: Dict[str, Any]) -> IWorkflowState:
# Add greeting to state
state.agent_data["greeting"] = f"Hello, {state.user_context.user_id}!"
return state
class ProcessingNode(Node):
"""Node that processes user input."""
def __init__(self, agent):
self.agent = agent
async def run(self, state: IWorkflowState, input_data: Dict[str, Any]) -> IWorkflowState:
# Process message with agent
message = input_data.get("message", "")
response = await self.agent.process(message)
state.agent_data["response"] = response
return state
class ResponseNode(Node):
"""Node that formats final response."""
async def run(self, state: IWorkflowState, input_data: Dict[str, Any]) -> IWorkflowState:
# Combine greeting and response
greeting = state.agent_data.get("greeting", "")
response = state.agent_data.get("response", "")
state.agent_data["final_response"] = f"{greeting}\n{response}"
return state
# Step 2: Create workflow configuration
class SimpleWorkflowConfig(WorkflowConfig):
def __init__(self, agent):
super().__init__()
self.agent = agent
def _configure_workflow(self, workflow):
# Create nodes
greeting = GreetingNode(name="greeting")
processing = ProcessingNode(name="processing", agent=self.agent)
response = ResponseNode(name="response")
# Add nodes to workflow
workflow.add_node("greeting", greeting)
workflow.add_node("processing", processing)
workflow.add_node("response", response)
# Define edges (node connections)
workflow.add_edge("greeting", "processing")
workflow.add_edge("processing", "response")
# Set entry point
workflow.set_entry_point("greeting")
def _route_input(self, input_data: Dict[str, Any]) -> str:
# Route all input to greeting node
return "greeting"
# Step 3: Use the workflow
async def main():
from arshai.llms.openai import OpenAIClient
from arshai.core.interfaces.illm import ILLMConfig
from arshai.agents.base import BaseAgent
from arshai.core.interfaces.iagent import IAgentInput
from arshai.core.interfaces.illm import ILLMInput
# Create agent
llm_config = ILLMConfig(model="gpt-3.5-turbo")
llm_client = OpenAIClient(llm_config)
class SimpleAgent(BaseAgent):
async def process(self, message: str) -> str:
llm_input = ILLMInput(
system_prompt=self.system_prompt,
user_message=message
)
result = await self.llm_client.chat(llm_input)
return result.get('llm_response', '')
agent = SimpleAgent(llm_client, "You are a helpful assistant")
# Create workflow
config = SimpleWorkflowConfig(agent)
runner = BaseWorkflowRunner(config)
# Execute workflow
result = await runner.execute_workflow(
user_id="user_123",
input_data={"message": "Tell me about AI"}
)
print(result['state'].agent_data['final_response'])
Advanced Patterns¶
Conditional Routing¶
Route to different nodes based on input conditions:
class ConditionalWorkflowConfig(WorkflowConfig):
def __init__(self, simple_agent, complex_agent):
super().__init__()
self.simple_agent = simple_agent
self.complex_agent = complex_agent
def _configure_workflow(self, workflow):
# Create nodes for different complexity levels
simple_node = SimpleProcessingNode(self.simple_agent)
complex_node = ComplexProcessingNode(self.complex_agent)
analysis_node = AnalysisNode()
workflow.add_node("analyze", analysis_node)
workflow.add_node("simple", simple_node)
workflow.add_node("complex", complex_node)
# Conditional edges based on analysis result
workflow.add_conditional_edges(
"analyze",
lambda state: "simple" if state.agent_data.get("complexity") == "low" else "complex"
)
workflow.set_entry_point("analyze")
def _route_input(self, input_data: Dict[str, Any]) -> str:
return "analyze"
Agent Integration¶
Integrate multiple agents in a workflow:
class MultiAgentWorkflow(WorkflowConfig):
def __init__(self, triage_agent, support_agent, escalation_agent):
super().__init__()
self.triage = triage_agent
self.support = support_agent
self.escalation = escalation_agent
def _configure_workflow(self, workflow):
# Create nodes wrapping agents
triage_node = AgentNode("triage", self.triage)
support_node = AgentNode("support", self.support)
escalation_node = AgentNode("escalation", self.escalation)
workflow.add_node("triage", triage_node)
workflow.add_node("support", support_node)
workflow.add_node("escalation", escalation_node)
# Route based on triage decision
workflow.add_conditional_edges(
"triage",
lambda state: state.agent_data.get("next_agent", "support")
)
workflow.add_edge("support", "END")
workflow.add_edge("escalation", "END")
workflow.set_entry_point("triage")
def _route_input(self, input_data: Dict[str, Any]) -> str:
return "triage"
Memory-Enabled Workflows¶
Workflows with conversation memory:
from arshai.agents.working_memory import WorkingMemoryAgent
from arshai.memory.working_memory.in_memory_manager import InMemoryManager
class MemoryWorkflowConfig(WorkflowConfig):
def __init__(self, llm_client, memory_manager):
super().__init__()
self.llm_client = llm_client
self.memory_manager = memory_manager
def _configure_workflow(self, workflow):
# Create memory-enabled agent
agent = WorkingMemoryAgent(
llm_client=self.llm_client,
memory_manager=self.memory_manager
)
# Create node with memory agent
chat_node = AgentNode("chat", agent)
workflow.add_node("chat", chat_node)
workflow.set_entry_point("chat")
def _route_input(self, input_data: Dict[str, Any]) -> str:
return "chat"
# Usage
async def use_memory_workflow():
llm_client = OpenAIClient(ILLMConfig(model="gpt-3.5-turbo"))
memory_manager = InMemoryManager()
config = MemoryWorkflowConfig(llm_client, memory_manager)
runner = BaseWorkflowRunner(config)
# First interaction
result1 = await runner.execute_workflow(
user_id="user_123",
input_data={
"message": "My name is Alice",
"conversation_id": "conv_123"
}
)
# Second interaction (agent remembers)
result2 = await runner.execute_workflow(
user_id="user_123",
input_data={
"message": "What's my name?",
"conversation_id": "conv_123"
}
)
State Management¶
Workflow State Structure¶
from arshai.core.interfaces.iworkflow import IWorkflowState, IUserContext
# Workflow state contains:
state = IWorkflowState(
user_context=IUserContext(
user_id="user_123",
last_active=datetime.utcnow(),
interaction_count=0
),
current_step="processing",
step_count=3,
processing_path="simple",
agent_data={
"response": "...",
"metadata": {...}
},
working_memories={
"conversation_id": "..."
},
errors=[]
)
Accessing State in Nodes¶
class CustomNode(Node):
async def run(self, state: IWorkflowState, input_data: Dict[str, Any]) -> IWorkflowState:
# Access user context
user_id = state.user_context.user_id
# Read from agent_data
previous_response = state.agent_data.get("previous_response", "")
# Update agent_data
state.agent_data["new_data"] = "some value"
# Track errors
if some_error:
state.errors.append("Error description")
# Increment step count
state.step_count += 1
return state
Error Handling¶
Handling Errors in Workflows¶
class ErrorHandlingNode(Node):
async def run(self, state: IWorkflowState, input_data: Dict[str, Any]) -> IWorkflowState:
try:
# Attempt processing
result = await self.process(input_data)
state.agent_data["result"] = result
except Exception as e:
# Log error
state.errors.append(f"Processing error: {str(e)}")
# Set error state
state.agent_data["has_error"] = True
state.agent_data["error_message"] = str(e)
return state
# Configure error handling in workflow
class RobustWorkflowConfig(WorkflowConfig):
def _configure_workflow(self, workflow):
processing_node = ErrorHandlingNode("processing")
error_node = ErrorRecoveryNode("error_recovery")
success_node = SuccessNode("success")
workflow.add_node("processing", processing_node)
workflow.add_node("error_recovery", error_node)
workflow.add_node("success", success_node)
# Conditional routing based on errors
workflow.add_conditional_edges(
"processing",
lambda state: "error_recovery" if state.agent_data.get("has_error") else "success"
)
workflow.set_entry_point("processing")
Callbacks¶
Using Callbacks for Custom Logic¶
async def execute_with_callbacks():
def on_node_start(node_name: str):
print(f"Starting node: {node_name}")
def on_node_complete(node_name: str, state: IWorkflowState):
print(f"Completed node: {node_name}")
print(f"Current step: {state.current_step}")
callbacks = {
"on_node_start": on_node_start,
"on_node_complete": on_node_complete
}
result = await runner.execute_workflow(
user_id="user_123",
input_data={"message": "Hello"},
callbacks=callbacks
)
Best Practices¶
Single Responsibility Nodes Each node should have one clear purpose.
Immutable State Create new state objects rather than modifying existing ones when possible.
Error Handling Always handle errors in nodes to prevent workflow crashes.
State Validation Validate state before and after node execution.
Testing Test nodes independently before integrating into workflows.
Documentation Document node purpose, inputs, and outputs clearly.
Example: Complete Customer Service Workflow¶
class CustomerServiceWorkflow(WorkflowConfig):
def __init__(self, llm_client, memory_manager, knowledge_base):
super().__init__()
self.llm_client = llm_client
self.memory_manager = memory_manager
self.knowledge_base = knowledge_base
def _configure_workflow(self, workflow):
# Create specialized agents
triage_agent = TriageAgent(self.llm_client)
support_agent = SupportAgent(self.llm_client, self.knowledge_base)
escalation_agent = EscalationAgent(self.llm_client)
# Create nodes
triage_node = AgentNode("triage", triage_agent)
support_node = AgentNode("support", support_agent)
escalation_node = AgentNode("escalation", escalation_agent)
memory_node = MemoryNode("memory", self.memory_manager)
# Add to workflow
workflow.add_node("triage", triage_node)
workflow.add_node("support", support_node)
workflow.add_node("escalation", escalation_node)
workflow.add_node("memory", memory_node)
# Define routing
workflow.add_conditional_edges(
"triage",
lambda state: state.agent_data.get("route", "support")
)
workflow.add_edge("support", "memory")
workflow.add_edge("escalation", "memory")
workflow.add_edge("memory", "END")
workflow.set_entry_point("triage")
def _route_input(self, input_data: Dict[str, Any]) -> str:
return "triage"
When to Use Workflows¶
Use the workflow system when:
You have multi-step processes with clear stages
You need state management across steps
You want declarative orchestration
You have conditional routing requirements
Consider alternatives when:
You have simple sequential processing (use pipelines)
You need dynamic, LLM-driven orchestration (build custom)
You have very complex routing (build custom)
The workflow structure is too rigid for your needs
Next Steps¶
Build Your Own: See Building Your Own Orchestration for custom orchestration patterns
Examples: Check the examples/ directory for complete workflow examples
Framework Patterns: Review Building Systems (Layer 3) for system building concepts
Remember: This is one way to orchestrate agents in Arshai. The framework empowers you to build the solution that fits your specific needs.