Batch¶
Batch processing utilities for workflows.
Classes¶
- class arshai.workflows.extensions.batch.BatchProcessor[source]¶
Bases:
objectUtility for batch processing with workflows.
Example
processor = BatchProcessor()
# Process batch of items states = [IWorkflowState(user_context=…, workflow_data=item) for item in items] result = await processor.execute_batch(
workflow=my_workflow, states=states, batch_size=10, parallel=True
)
print(f”Success rate: {result.success_rate:.2%}”)
- async static execute_batch(workflow, states, batch_size=10, parallel=True, continue_on_error=True, progress_callback=None)[source]¶
Execute workflow on batch of states.
- Parameters:
workflow (
BaseWorkflowOrchestrator) – Workflow to executestates (
List[IWorkflowState]) – List of states to processbatch_size (
int) – Number of items per batchparallel (
bool) – Execute batch items in parallelcontinue_on_error (
bool) – Continue processing on errorsprogress_callback (
Optional[Callable[[int,int],None]]) – Optional callback for progress updates
- Return type:
- Returns:
BatchResult with successful and failed items
- async static execute_batch_with_retries(workflow, states, batch_size=10, max_retries=3, retry_delay=1.0, parallel=True, progress_callback=None)[source]¶
Execute batch with automatic retries for failed items.
- Parameters:
workflow (
BaseWorkflowOrchestrator) – Workflow to executestates (
List[IWorkflowState]) – List of states to processbatch_size (
int) – Number of items per batchmax_retries (
int) – Maximum number of retries for failed itemsretry_delay (
float) – Delay in seconds between retriesparallel (
bool) – Execute batch items in parallelprogress_callback (
Optional[Callable[[int,int],None]]) – Optional callback for progress updates
- Return type:
- Returns:
BatchResult with final successful and failed items