Workflow Orchestration System¶
The EVOSEAL Workflow Orchestration System provides comprehensive end-to-end orchestration for evolution pipelines, including checkpointing, state persistence, recovery strategies, and execution flow optimization.
Overview¶
The orchestration system is designed to manage complex, long-running evolution workflows with robust error handling, resource monitoring, and state management capabilities. It ensures reliable execution even in the face of failures, resource constraints, and system interruptions.
Architecture¶
The system is built with a modular architecture consisting of four main components:
1. Core Orchestrator (WorkflowOrchestrator
)¶
- Central coordination of workflow execution
- Integration with all other components
- Execution strategy management
- Event handling and publishing
2. Checkpoint Manager (CheckpointManager
)¶
- Automatic and manual checkpoint creation
- State serialization and persistence
- Checkpoint metadata management
- Recovery point management
3. Recovery Manager (RecoveryManager
)¶
- Multi-level error recovery strategies
- Retry logic with exponential backoff
- Component restart capabilities
- Custom recovery action support
4. Resource Monitor (ResourceMonitor
)¶
- Real-time system resource monitoring
- Configurable threshold alerting
- Resource usage history tracking
- Automatic checkpoint triggers
Key Features¶
Comprehensive State Management¶
- Orchestration States: IDLE, INITIALIZING, RUNNING, PAUSED, RECOVERING, CHECKPOINTING, COMPLETED, FAILED, CANCELLED
- Execution Context: Complete workflow state tracking including iterations, stages, and metadata
- Step Results: Detailed tracking of individual workflow step execution
Advanced Checkpointing¶
- Automatic Checkpoints: Based on iteration intervals or resource thresholds
- Manual Checkpoints: User-triggered checkpoints at any time
- Recovery Checkpoints: Created before and after recovery attempts
- Milestone Checkpoints: Mark significant workflow achievements
- Error Recovery Checkpoints: Capture state during error conditions
Robust Recovery Strategies¶
- Retry with Backoff: Configurable retry attempts with exponential backoff
- Checkpoint Rollback: Restore from previous successful state
- Component Restart: Restart failed components
- State Validation: Verify and repair workflow state
- Custom Recovery Actions: User-defined recovery procedures
Resource Monitoring and Management¶
- CPU Monitoring: Track CPU usage with configurable thresholds
- Memory Monitoring: Monitor memory consumption and availability
- Disk Monitoring: Track disk usage and free space
- Network Monitoring: Monitor network I/O (when available)
- Alert System: Configurable alerts for threshold violations
Flexible Execution Strategies¶
- Sequential Execution: Steps executed in dependency order
- Parallel Execution: Independent steps executed concurrently
- Adaptive Execution: Automatically choose optimal strategy
- Priority-Based Execution: Execute based on step priorities
Usage Guide¶
Basic Setup¶
from evoseal.core.orchestration import (
WorkflowOrchestrator,
ExecutionStrategy,
RecoveryStrategy,
ResourceThresholds,
)
# Create orchestrator
orchestrator = WorkflowOrchestrator(
workspace_dir=".evoseal",
checkpoint_interval=5,
execution_strategy=ExecutionStrategy.ADAPTIVE,
)
Workflow Configuration¶
workflow_config = {
"workflow_id": "evolution_workflow_001",
"experiment_id": "exp_001",
"iterations": 10,
"steps": [
{
"name": "analyze",
"component": "analyzer",
"operation": "analyze_code",
"parameters": {"depth": "full"},
"critical": True,
"retry_count": 3,
"timeout": 300.0,
},
{
"name": "generate",
"component": "generator",
"operation": "generate_improvements",
"parameters": {"count": 5},
"dependencies": ["analyze"],
"critical": True,
"retry_count": 2,
},
{
"name": "evaluate",
"component": "evaluator",
"operation": "evaluate_changes",
"parameters": {"metrics": ["performance", "quality"]},
"dependencies": ["generate"],
"critical": False,
},
],
}
Execution¶
# Initialize workflow
success = await orchestrator.initialize_workflow(workflow_config)
if success:
# Execute workflow
result = await orchestrator.execute_workflow(pipeline_instance)
print(f"Workflow completed: {result.success_count}/{len(result.iterations)} iterations successful")
print(f"Total execution time: {result.total_execution_time:.2f}s")
print(f"Checkpoints created: {result.checkpoints_created}")
Advanced Configuration¶
Recovery Strategy¶
recovery_strategy = RecoveryStrategy(
max_retries=3,
retry_delay=5.0,
exponential_backoff=True,
backoff_multiplier=2.0,
max_retry_delay=300.0,
checkpoint_rollback=True,
component_restart=True,
state_validation=True,
recovery_timeout=600.0,
)
orchestrator = WorkflowOrchestrator(
recovery_strategy=recovery_strategy,
# ... other parameters
)
Resource Monitoring¶
resource_thresholds = ResourceThresholds(
memory_warning=0.7, # 70% memory usage warning
memory_critical=0.85, # 85% memory usage critical
cpu_warning=0.8, # 80% CPU usage warning
cpu_critical=0.9, # 90% CPU usage critical
disk_warning=0.8, # 80% disk usage warning
disk_critical=0.9, # 90% disk usage critical
)
orchestrator = WorkflowOrchestrator(
resource_thresholds=resource_thresholds,
monitoring_interval=30.0, # Check every 30 seconds
# ... other parameters
)
Workflow Control¶
# Pause workflow
await orchestrator.pause_workflow()
# Resume workflow
await orchestrator.resume_workflow()
# Cancel workflow
await orchestrator.cancel_workflow()
# Get status
status = orchestrator.get_workflow_status()
print(f"Current state: {status['state']}")
print(f"Current iteration: {status['execution_context']['current_iteration']}")
Checkpoint Management¶
# Create manual checkpoint
checkpoint_id = await orchestrator._create_checkpoint(CheckpointType.MANUAL)
# Resume from checkpoint
result = await orchestrator.execute_workflow(
pipeline_instance,
resume_from_checkpoint=checkpoint_id
)
# List checkpoints
checkpoints = orchestrator.checkpoint_manager.list_checkpoints(limit=10)
for cp in checkpoints:
print(f"{cp.checkpoint_id}: {cp.checkpoint_type.value} at {cp.timestamp}")
# Cleanup old checkpoints
deleted_count = orchestrator.checkpoint_manager.cleanup_old_checkpoints(
max_age_days=7,
max_count=50,
keep_milestone=True
)
Workflow Step Configuration¶
Step Parameters¶
Parameter | Type | Description | Default |
---|---|---|---|
step_id |
str | Unique identifier for the step | Auto-generated |
name |
str | Human-readable step name | Required |
component |
str | Component name to execute | Required |
operation |
str | Method/operation to call | Required |
dependencies |
List[str] | List of step IDs that must complete first | [] |
parameters |
Dict | Parameters to pass to the operation | {} |
timeout |
float | Maximum execution time in seconds | None |
retry_count |
int | Number of retry attempts | 3 |
retry_delay |
float | Delay between retries in seconds | 1.0 |
critical |
bool | Whether step failure should stop workflow | True |
parallel_group |
str | Group for parallel execution | None |
priority |
int | Priority for priority-based execution | 0 |
Dependency Management¶
Steps can depend on other steps using the dependencies
field:
{
"name": "step_c",
"dependencies": ["step_a", "step_b"], # Runs after both step_a and step_b complete
# ... other parameters
}
The orchestrator automatically handles: - Topological sorting of steps based on dependencies - Circular dependency detection - Parallel execution of independent steps
Parallel Execution¶
Steps can be grouped for parallel execution:
[
{
"name": "analyze_performance",
"parallel_group": "analysis",
"dependencies": ["setup"],
},
{
"name": "analyze_quality",
"parallel_group": "analysis",
"dependencies": ["setup"],
},
{
"name": "generate_improvements",
"dependencies": ["analyze_performance", "analyze_quality"],
},
]
Event Integration¶
The orchestration system publishes events throughout execution:
Event Types¶
- State Changes: Workflow state transitions
- Progress Updates: Iteration and step progress
- Error Events: Failures and recovery attempts
- Resource Alerts: Threshold violations
- Pipeline Stage Events: Stage start/completion/failure
Event Handling¶
from evoseal.core.events import event_bus
@event_bus.subscribe("orchestration.checkpoint_created")
async def on_checkpoint_created(event):
print(f"Checkpoint created: {event.data['checkpoint_id']}")
@event_bus.subscribe("orchestration.recovery_initiated")
async def on_recovery_initiated(event):
print(f"Recovery initiated: {event.data['error_type']}")
Monitoring and Metrics¶
Workflow Status¶
status = orchestrator.get_workflow_status()
# Returns:
{
"state": "running",
"execution_context": {
"workflow_id": "workflow_001",
"current_iteration": 3,
"total_iterations": 10,
"current_stage": "generate",
},
"resource_usage": {
"memory_percent": 65.2,
"cpu_percent": 45.8,
"disk_percent": 78.1,
},
"active_alerts": [],
"checkpoint_count": 2,
"recovery_attempts": 0,
}
Resource Statistics¶
resource_stats = orchestrator.resource_monitor.get_resource_statistics()
# Returns detailed resource usage statistics
Recovery Statistics¶
recovery_stats = orchestrator.recovery_manager.get_recovery_statistics()
# Returns recovery attempt history and success rates
Checkpoint Statistics¶
checkpoint_stats = orchestrator.checkpoint_manager.get_checkpoint_statistics()
# Returns checkpoint counts, types, and storage information
Error Handling and Recovery¶
Recovery Strategies¶
The system provides multiple recovery strategies that are applied in sequence:
- Retry with Backoff: Simple retry with configurable delays
- Checkpoint Rollback: Restore from previous successful state
- Component Restart: Restart failed components
- State Validation: Verify and repair workflow state
- Custom Actions: User-defined recovery procedures
Custom Recovery Actions¶
async def custom_recovery_action(error, execution_context, iteration, step_id):
"""Custom recovery action."""
logger.info(f"Attempting custom recovery for {error}")
# Implement custom recovery logic
return True # Return True if recovery successful
# Add to recovery strategy
recovery_strategy.custom_recovery_actions.append(custom_recovery_action)
Error Types and Handling¶
Error Type | Recovery Strategy | Description |
---|---|---|
TimeoutError |
Retry with backoff | Step execution timeout |
ConnectionError |
Component restart | Network/service connection issues |
MemoryError |
Checkpoint rollback | Out of memory conditions |
RuntimeError |
Custom actions | General runtime errors |
ValidationError |
State validation | Data validation failures |
Best Practices¶
Workflow Design¶
- Define Clear Dependencies: Ensure proper step ordering
- Set Appropriate Timeouts: Prevent hanging operations
- Mark Critical Steps: Identify steps that must succeed
- Use Parallel Groups: Optimize execution time
- Configure Retries: Handle transient failures
Resource Management¶
- Set Conservative Thresholds: Avoid resource exhaustion
- Monitor Long-Running Workflows: Track resource trends
- Clean Up Checkpoints: Manage storage usage
- Use Appropriate Intervals: Balance monitoring overhead
Error Handling¶
- Implement Custom Recovery: Handle domain-specific errors
- Test Recovery Scenarios: Verify recovery mechanisms
- Monitor Recovery Success: Track recovery effectiveness
- Log Recovery Actions: Maintain audit trail
Performance Optimization¶
- Choose Appropriate Strategy: Sequential vs parallel execution
- Optimize Step Dependencies: Minimize blocking
- Tune Checkpoint Intervals: Balance safety and performance
- Monitor Resource Usage: Identify bottlenecks
Integration with EVOSEAL Pipeline¶
The orchestration system integrates seamlessly with the EVOSEAL evolution pipeline:
from evoseal.core.evolution_pipeline import EvolutionPipeline
from evoseal.core.orchestration import WorkflowOrchestrator
# Create pipeline
pipeline = EvolutionPipeline(config)
# Create orchestrator
orchestrator = WorkflowOrchestrator()
# Define evolution workflow
workflow_config = {
"workflow_id": "evolution_001",
"iterations": 20,
"steps": [
{
"name": "analyze",
"component": "_analyze_current_version",
"operation": "__call__",
},
{
"name": "generate",
"component": "_generate_improvements",
"operation": "__call__",
"dependencies": ["analyze"],
},
{
"name": "adapt",
"component": "_adapt_improvements",
"operation": "__call__",
"dependencies": ["generate"],
},
{
"name": "evaluate",
"component": "_evaluate_version",
"operation": "__call__",
"dependencies": ["adapt"],
},
{
"name": "validate",
"component": "_validate_improvement",
"operation": "__call__",
"dependencies": ["evaluate"],
},
],
}
# Execute orchestrated evolution
await orchestrator.initialize_workflow(workflow_config)
result = await orchestrator.execute_workflow(pipeline)
Troubleshooting¶
Common Issues¶
Workflow Fails to Initialize¶
- Check workflow step configuration
- Verify component and operation names
- Validate dependencies
Steps Fail Repeatedly¶
- Check component availability
- Verify parameters
- Review timeout settings
- Check resource constraints
Recovery Fails¶
- Review recovery strategy configuration
- Check checkpoint availability
- Verify component restart capability
- Review custom recovery actions
High Resource Usage¶
- Adjust resource thresholds
- Increase monitoring frequency
- Review step resource requirements
- Consider parallel execution limits
Debugging¶
Enable detailed logging:
Check orchestrator status:
status = orchestrator.get_workflow_status()
print(f"State: {status['state']}")
print(f"Active alerts: {status['active_alerts']}")
Review checkpoint history:
checkpoints = orchestrator.checkpoint_manager.list_checkpoints()
for cp in checkpoints:
print(f"{cp.checkpoint_id}: {cp.checkpoint_type.value} - {cp.timestamp}")
API Reference¶
WorkflowOrchestrator¶
Main orchestrator class for workflow management.
Methods¶
__init__(workspace_dir, checkpoint_interval, execution_strategy, recovery_strategy, resource_thresholds, monitoring_interval)
initialize_workflow(workflow_config) -> bool
execute_workflow(pipeline_instance, resume_from_checkpoint=None) -> WorkflowResult
pause_workflow() -> bool
resume_workflow() -> bool
cancel_workflow() -> bool
get_workflow_status() -> Dict[str, Any]
CheckpointManager¶
Manages workflow checkpoints and state persistence.
Methods¶
create_checkpoint(checkpoint_type, execution_context, workflow_steps, step_results, state, resource_usage, custom_metadata=None) -> str
get_checkpoint(checkpoint_id) -> Optional[Dict[str, Any]]
list_checkpoints(checkpoint_type=None, experiment_id=None, limit=None) -> List[CheckpointMetadata]
delete_checkpoint(checkpoint_id) -> bool
cleanup_old_checkpoints(max_age_days=30, max_count=100, keep_milestone=True) -> int
RecoveryManager¶
Handles error recovery and retry strategies.
Methods¶
attempt_recovery(error, execution_context, iteration, step_id=None) -> bool
get_recovery_statistics() -> Dict[str, Any]
add_custom_recovery_action(action) -> None
remove_custom_recovery_action(action) -> bool
ResourceMonitor¶
Monitors system resources and provides alerts.
Methods¶
start_monitoring() -> None
stop_monitoring() -> None
get_current_usage() -> Optional[ResourceSnapshot]
get_usage_history(hours=1) -> List[ResourceSnapshot]
get_active_alerts() -> List[ResourceAlert]
add_alert_callback(callback) -> None
Conclusion¶
The EVOSEAL Workflow Orchestration System provides a comprehensive solution for managing complex evolution workflows with robust error handling, resource monitoring, and state management. Its modular architecture and flexible configuration options make it suitable for a wide range of evolution scenarios, from simple sequential workflows to complex parallel processing pipelines.
The system's emphasis on reliability, observability, and recoverability ensures that long-running evolution processes can complete successfully even in challenging environments with resource constraints and intermittent failures.
Created: 2025-07-19