A powerful, pure Python workflow automation engine designed for simplicity and scalability.
OmniTask is a modern workflow automation engine that allows you to build complex data processing pipelines with ease. It provides a clean API for defining tasks, managing dependencies, and executing workflows with features like streaming, caching, and parallel execution.
- Pure Python: Built entirely in Python with no external dependencies beyond PyYAML
- Async/Await Support: Full asynchronous execution for maximum performance
- Streaming Tasks: Real-time data processing with streaming capabilities
- Intelligent Caching: Memory, file-based, and Redis distributed caching for optimized performance
- Dependency Management: Automatic dependency resolution and execution ordering
- Task Groups: Parallel execution with configurable concurrency limits
- Conditional Execution: Execute tasks based on conditions and previous results
- Progress Tracking: Built-in progress monitoring and callbacks
- Error Handling: Comprehensive error handling with retry mechanisms
- Workflow Validation: Static analysis and validation of workflow definitions
- YAML Configuration: Define workflows using simple YAML files
pip install omniTask
from omniTask import Workflow, Task, TaskResult
class DataProcessorTask(Task):
task_name = "data_processor"
async def execute(self):
data = self.config.get('data', [])
result = sum(data)
return TaskResult(success=True, output={"sum": result})
async def main():
# Create workflow
workflow = Workflow("example")
workflow.registry.register(DataProcessorTask)
# Add task
task = workflow.create_task("data_processor", "process", {"data": [1, 2, 3, 4, 5]})
# Execute
results = await workflow.run()
print(results["process"].output) # {"sum": 15}
if __name__ == "__main__":
import asyncio
asyncio.run(main())
name: data_pipeline
tasks:
load_data:
type: file_reader
config:
file_path: "data.csv"
process_data:
type: data_processor
config:
operation: "aggregate"
save_results:
type: file_writer
config:
file_path: "results.json"
dependencies:
process_data:
- load_data
save_results:
- process_data
from omniTask import WorkflowTemplate
template = WorkflowTemplate("workflow.yaml")
workflow = template.create_workflow()
results = await workflow.run()
- Minimal Learning Curve: Start with basic tasks and grow into complex workflows
- Intuitive API: Clean, Pythonic interface that feels natural
- Flexible Configuration: Support for both code and YAML-based workflow definitions
- Async-First Design: Handle thousands of concurrent tasks efficiently
- Smart Caching: Avoid redundant computations with intelligent caching
- Streaming Support: Process data in real-time as it becomes available
- Comprehensive Testing: Extensive test coverage and validation
- Error Resilience: Built-in retry mechanisms and error handling
- Monitoring: Progress tracking and detailed logging
- Validation: Static analysis of workflow definitions
OmniTask follows a clean, modular architecture:
- Core Engine: Task execution, dependency resolution, and workflow management
- Task System: Extensible task framework with streaming support
- Caching Layer: Pluggable caching system for performance optimization
- Validation System: Static analysis and runtime validation
- Utility Layer: Logging, path parsing, and helper functions
For comprehensive documentation, visit our documentation site:
- User Guide - Learn how to use OmniTask
- API Reference - Complete API documentation
- Architecture Guide - Deep dive into system architecture
- Examples - Real-world examples and use cases
- Developer Guide - Advanced features and customization
Explore our comprehensive examples:
- Basic Workflows - Simple task chaining
- Streaming Processing - Real-time data processing
- Conditional Logic - Decision-based execution
- Caching Demo - Performance optimization
- Redis Cache - Distributed caching with Redis
- Parallel Processing - Concurrent task execution
OmniTask is released under the MIT License. See LICENSE for details.
- Issues: Report bugs and request features on GitHub Issues
- Documentation: Full documentation at https://sarpers-organization.gitbook.io/omnitask
- Examples: Working examples in the examples/ directory