Batch

Batch processing utilities for workflows.

Classes

class arshai.workflows.extensions.batch.BatchProcessor[source]

Bases: object

Utility for batch processing with workflows.

Example

processor = BatchProcessor()

# Process batch of items states = [IWorkflowState(user_context=…, workflow_data=item) for item in items] result = await processor.execute_batch(

workflow=my_workflow, states=states, batch_size=10, parallel=True

)

print(f”Success rate: {result.success_rate:.2%}”)

async static execute_batch(workflow, states, batch_size=10, parallel=True, continue_on_error=True, progress_callback=None)[source]

Execute workflow on batch of states.

Parameters:
Return type:

BatchResult

Returns:

BatchResult with successful and failed items

async static execute_batch_with_retries(workflow, states, batch_size=10, max_retries=3, retry_delay=1.0, parallel=True, progress_callback=None)[source]

Execute batch with automatic retries for failed items.

Parameters:
  • workflow (BaseWorkflowOrchestrator) – Workflow to execute

  • states (List[IWorkflowState]) – List of states to process

  • batch_size (int) – Number of items per batch

  • max_retries (int) – Maximum number of retries for failed items

  • retry_delay (float) – Delay in seconds between retries

  • parallel (bool) – Execute batch items in parallel

  • progress_callback (Optional[Callable[[int, int], None]]) – Optional callback for progress updates

Return type:

BatchResult

Returns:

BatchResult with final successful and failed items

static create_progress_logger(name='batch_progress')[source]

Create a simple progress logging callback.

Parameters:

name (str) – Name for the logger

Return type:

Callable[[int, int], None]

Returns:

Progress callback function

class arshai.workflows.extensions.batch.BatchResult(successful, failed, total, success_rate)[source]

Bases: object

Result from batch processing.

successful: List[IWorkflowState]
failed: List[Tuple[IWorkflowState, Exception]]
total: int
success_rate: float
__init__(successful, failed, total, success_rate)