π¨π³ δΈζηζ¬ | πΊπΈ English
A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation. Define complex AI workflows through YAML configuration files and enable collaborative work between multiple LLM models.
- π§ DSL Workflow Definition - Define complex LLM workflows using YAML format
- π DAG Dependency Management - Support directed acyclic graph node dependencies and parallel execution
- π Placeholder Resolution - Use
${node.output}
syntax for inter-node data passing - π€ Multi-Model Support - Support calling different LLM models and result aggregation
- βοΈ Flexible Configuration - Custom model configuration and parameter management
- β‘ Async Execution - Efficient asynchronous task processing and error retry
- π Result Aggregation - Built-in various result merging and analysis functions
- π§ Extensible Architecture - Support custom functions and model adapters
- Python 3.8+
- aiohttp >= 3.8.0
- pyyaml >= 6.0
- loguru >= 0.7.0
pip install llm-flow-engine
import asyncio
from llm_flow_engine import FlowEngine, ModelConfigProvider
async def main():
# 1. Configure models (auto-discovery)
provider = await ModelConfigProvider.from_host_async(
api_host="http://127.0.0.1:11434",
platform="ollama"
)
# 2. Create engine
engine = FlowEngine(provider)
# 3. Execute workflow
dsl_content = """
metadata:
version: "1.0"
description: "Simple Q&A workflow"
input:
type: "start"
name: "workflow_input"
data:
question: ""
executors:
- name: answer_step
type: task
func: llm_simple_call
custom_vars:
user_input: "${workflow_input.question}"
model: "gpt-oss-20b"
output:
type: "end"
name: "workflow_output"
data:
answer: "${answer_step.output}"
"""
result = await engine.execute_dsl(
dsl_content,
inputs={"workflow_input": {"question": "What is AI?"}}
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
llm_flow_engine/
βββ __init__.py # Main package initialization
βββ flow_engine.py # Main engine entry point
βββ dsl_loader.py # DSL parser
βββ workflow.py # Unified workflow management
βββ executor.py # Task executor
βββ executor_result.py # Execution result wrapper
βββ builtin_functions.py # Built-in function library
βββ model_config.py # Model configuration management
βββ utils.py # Utility functions
examples/
βββ demo_example.py # Complete example demo
βββ demo_qa.yaml # Workflow DSL example
βββ model_config_demo.py # Model configuration demo
# Auto-discover Ollama models
provider = await ModelConfigProvider.from_host_async(
api_host="http://127.0.0.1:11434",
platform="ollama"
)
# Create provider and add models manually
model_provider = ModelConfigProvider()
platform = "openai"
# free model proxy on cn
demo_host = "https://ai-proxy.4ba-cn.co/openrouter/v1/chat/completions"
# free model proxy
# demo_host = "https://openrouter.ai/v1/chat/completions"
demo_free_key = "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"
model_list = ["openai/gpt-oss-20b:free", "moonshotai/kimi-k2:free", "google/gemma-3-12b-it:free", "z-ai/glm-4.5-air:free"]
for model in model_list:
model_provider.add_single_model(model_name=model, platform=platform,
api_url=demo_host, api_key=demo_free_key)
metadata:
version: "1.0"
description: "Workflow description"
input:
type: "start"
name: "workflow_input"
data:
key: "value"
executors:
- name: task1
type: task
func: function_name
custom_vars:
param1: "${input.key}"
param2: "static_value"
depends_on: [] # Dependencies
timeout: 30 # Timeout in seconds
retry: 2 # Retry count
output:
type: "end"
name: "workflow_output"
data:
result: "${task1.output}"
metadata:
version: "1.0"
description: "Multi-model Q&A with analysis"
input:
type: "start"
name: "workflow_input"
data:
question: ""
executors:
# Parallel model calls
- name: model1_answer
type: task
func: llm_simple_call
custom_vars:
user_input: "${workflow_input.question}"
model: "llama2"
timeout: 30
- name: model2_answer
type: task
func: llm_simple_call
custom_vars:
user_input: "${workflow_input.question}"
model: "mistral"
timeout: 30
# Analysis step (depends on both models)
- name: analysis
type: task
func: llm_simple_call
custom_vars:
user_input: "Compare these answers: 1) ${model1_answer.output} 2) ${model2_answer.output}"
model: "llama2"
depends_on: ["model1_answer", "model2_answer"]
output:
type: "end"
name: "workflow_output"
data:
original_question: "${workflow_input.question}"
model1_response: "${model1_answer.output}"
model2_response: "${model2_answer.output}"
analysis: "${analysis.output}"
llm_simple_call
- Basic LLM model calltext_process
- Text preprocessing and formattingresult_summary
- Multi-result summarizationdata_transform
- Data format transformation
# Basic usage demo
python examples/demo_example.py
# Model configuration demo
python examples/model_config_demo.py
# Package usage demo
python examples/package_demo.py
- Ollama - Local LLM models
- OpenAI - GPT series models
- OpenAI Compatible - Any OpenAI-compatible API
- Anthropic - Claude series models
- Custom - Custom API endpoints
git clone https://github.com/liguobao/llm-flow-engine.git
cd llm-flow-engine
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Format code
black .
This project is licensed under the MIT License - see the LICENSE file for details.
Contributions are welcome! Please feel free to submit a Pull Request.
- π Issues: GitHub Issues
- π Documentation: GitHub Wiki
If you find this project helpful, please consider giving it a star! β
Special thanks to our generous sponsors who help this project run smoothly:
- vccddd - For providing free API keys and hosting the AIProxy service that enables seamless AI model access