Workflow Orchestrator

Classes

class arshai.workflows.workflow_orchestrator.BaseWorkflowOrchestrator(debug_mode=False)[source]

Bases: IWorkflowOrchestrator

Base implementation of workflow orchestrator.

This implementation follows the pattern from the previous project where: - Nodes are callable classes that operate on workflow state - State is passed between nodes and updated in each step - The orchestrator manages the flow between nodes using edges

__init__(debug_mode=False)[source]

Initialize workflow orchestrator.

add_node(name, node)[source]

Add a node to the workflow.

Return type:

None

add_edge(from_node, to_node)[source]

Add an edge between nodes.

Return type:

None

set_entry_points(router_func, entry_mapping)[source]

Set the workflow entry points with routing logic.

Return type:

None

async execute(input_data, callbacks=None, is_streaming=False)[source]

Execute the workflow with given input data.

This method: 1. Routes to the appropriate entry node based on input data 2. Executes each node in sequence based on edge connections 3. Passes the workflow state between nodes 4. Returns the final state and results

Parameters:
  • input_data (Dict[str, Any]) – Dictionary containing the input data including state

  • callbacks (Optional[Dict[str, Any]]) – Optional callback functions for nodes to use

Return type:

Dict[str, Any]

Returns:

Dictionary with final state and workflow results