From 0f9c8821083f6ae2de14c158e3c8a14b5be7a282 Mon Sep 17 00:00:00 2001 From: Mohamad Mohebifar Date: Fri, 7 Mar 2025 16:10:25 -0800 Subject: [PATCH 01/72] Initial commit --- .gitignore | 25 + Cargo.toml | 30 + README.md | 1244 ++++++++++++++++++++++++++++ crates/cli/Cargo.toml | 27 + crates/cli/src/main.rs | 614 ++++++++++++++ crates/core/Cargo.toml | 31 + crates/core/src/engine.rs | 1098 ++++++++++++++++++++++++ crates/core/src/error.rs | 49 ++ crates/core/src/lib.rs | 17 + crates/core/src/models/mod.rs | 21 + crates/core/src/models/node.rs | 60 ++ crates/core/src/models/runtime.rs | 42 + crates/core/src/models/state.rs | 62 ++ crates/core/src/models/step.rs | 39 + crates/core/src/models/strategy.rs | 25 + crates/core/src/models/task.rs | 131 +++ crates/core/src/models/template.rs | 78 ++ crates/core/src/models/trigger.rs | 27 + crates/core/src/models/variable.rs | 209 +++++ crates/core/src/models/workflow.rs | 90 ++ crates/core/src/utils.rs | 208 +++++ crates/models/Cargo.toml | 20 + crates/models/src/error.rs | 49 ++ crates/models/src/lib.rs | 28 + crates/models/src/node.rs | 63 ++ crates/models/src/runtime.rs | 42 + crates/models/src/state.rs | 62 ++ crates/models/src/state_diff.rs | 50 ++ crates/models/src/step.rs | 39 + crates/models/src/strategy.rs | 25 + crates/models/src/task.rs | 131 +++ crates/models/src/template.rs | 79 ++ crates/models/src/trigger.rs | 27 + crates/models/src/variable.rs | 209 +++++ crates/models/src/workflow.rs | 90 ++ crates/runners/Cargo.toml | 21 + crates/runners/src/lib.rs | 233 ++++++ crates/state/Cargo.toml | 23 + crates/state/src/lib.rs | 797 ++++++++++++++++++ examples/command-workflow.yaml | 52 ++ examples/i18n-codemod.yaml | 146 ++++ examples/manual-workflow.yaml | 36 + examples/matrix-workflow.yaml | 44 + examples/node_step_workflow.yaml | 113 +++ examples/script-workflow.yaml | 62 ++ examples/simple-workflow.yaml | 63 ++ examples/test-workflow.yaml | 24 + rustfmt.toml | 5 + 48 files changed, 6660 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 crates/cli/Cargo.toml create mode 100644 crates/cli/src/main.rs create mode 100644 crates/core/Cargo.toml create mode 100644 crates/core/src/engine.rs create mode 100644 crates/core/src/error.rs create mode 100644 crates/core/src/lib.rs create mode 100644 crates/core/src/models/mod.rs create mode 100644 crates/core/src/models/node.rs create mode 100644 crates/core/src/models/runtime.rs create mode 100644 crates/core/src/models/state.rs create mode 100644 crates/core/src/models/step.rs create mode 100644 crates/core/src/models/strategy.rs create mode 100644 crates/core/src/models/task.rs create mode 100644 crates/core/src/models/template.rs create mode 100644 crates/core/src/models/trigger.rs create mode 100644 crates/core/src/models/variable.rs create mode 100644 crates/core/src/models/workflow.rs create mode 100644 crates/core/src/utils.rs create mode 100644 crates/models/Cargo.toml create mode 100644 crates/models/src/error.rs create mode 100644 crates/models/src/lib.rs create mode 100644 crates/models/src/node.rs create mode 100644 crates/models/src/runtime.rs create mode 100644 crates/models/src/state.rs create mode 100644 crates/models/src/state_diff.rs create mode 100644 crates/models/src/step.rs create mode 100644 crates/models/src/strategy.rs create mode 100644 crates/models/src/task.rs create mode 100644 crates/models/src/template.rs create mode 100644 crates/models/src/trigger.rs create mode 100644 crates/models/src/variable.rs create mode 100644 crates/models/src/workflow.rs create mode 100644 crates/runners/Cargo.toml create mode 100644 crates/runners/src/lib.rs create mode 100644 crates/state/Cargo.toml create mode 100644 crates/state/src/lib.rs create mode 100644 examples/command-workflow.yaml create mode 100644 examples/i18n-codemod.yaml create mode 100644 examples/manual-workflow.yaml create mode 100644 examples/matrix-workflow.yaml create mode 100644 examples/node_step_workflow.yaml create mode 100644 examples/script-workflow.yaml create mode 100644 examples/simple-workflow.yaml create mode 100644 examples/test-workflow.yaml create mode 100644 rustfmt.toml diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..d5a2ec5c7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +# Generated by Cargo +/target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# IDE files +.idea/ +.vscode/ +*.iml + +# OS files +.DS_Store +Thumbs.db + +# Execution artifacts +butterflow-state/ +butterflow-workdir/ diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 000000000..b19a65bbf --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,30 @@ +[workspace] +members = ["crates/models", "crates/core", "crates/cli", "crates/runners", "crates/state"] +resolver = "2" + +[workspace.package] +authors = ["Butterflow Team"] +description = "A self-hostable workflow engine for code transformations" +documentation = "https://docs.rs/butterflow" +repository = "https://github.com/codemod-com/butterflow" +license = "MIT" +rust-version = "1.70" + +[workspace.dependencies] +tokio = { version = "1.36", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_yaml = "0.9" +thiserror = "2.0" +anyhow = "1.0" +uuid = { version = "1.6", features = ["v4", "serde"] } +chrono = { version = "0.4", features = ["serde"] } +log = "0.4" +async-trait = "0.1" +env_logger = "0.10" +reqwest = { version = "0.11", features = ["json"] } +bollard = "0.15" +regex = "1.10" +futures = "0.3" +futures-util = { version = "0.3", features = ["stream"] } +clap = { version = "4.5", features = ["derive"] } diff --git a/README.md b/README.md new file mode 100644 index 000000000..3005f0895 --- /dev/null +++ b/README.md @@ -0,0 +1,1244 @@ +# Butterflow: A Self-Hostable Workflow Engine + +Butterflow is a lightweight, self-hostable workflow engine designed for running large-scale code transformation jobs. Similar to GitHub Actions but with a focus on local execution and code transformations like codemods and grep operations. + +## Installation + +### Building from Source + +```bash +# Clone the repository +git clone https://github.com/codemod-com/butterflow.git +cd butterflow + +# Build the project +cargo build --release + +# The executable will be available at target/release/butterflow +``` + +## Quick Start + +1. Create a workflow file `workflow.yaml`: + +```yaml +version: "1" +nodes: + - id: hello-world + name: Hello World + type: automatic + runtime: + type: direct + steps: + - id: hello + name: Say Hello + commands: + - echo "Hello, World!" +``` + +2. Run the workflow: + +```bash +butterflow run -w workflow.yaml +``` + +## Core Features + +- **Self-hostable**: Run workflows on your own infrastructure +- **Local execution**: Execute workflows on your local machine +- **Lightweight**: Minimal resource requirements +- **Parallel execution**: Run nodes concurrently when dependencies allow +- **Manual triggers**: Some nodes can be manually triggered +- **Matrix execution**: Run the same node multiple times with different inputs +- **OCI container support**: Each task runs in its own container +- **Durable execution**: Workflow state is persisted with diff-based updates and can survive restarts +- **Flexible configuration**: Define workflows in either JSON or YAML format +- **Backend-agnostic state management**: Support for multiple state backends (local, API, database) +- **Resumable workflows**: Resume workflow execution from the last saved state +- **Reusable templates**: Define reusable workflow components +- **State schema definition**: Define the structure of workflow state + +## Architecture + +Butterflow consists of several components: + +1. **Workflow Engine**: Orchestrates the execution of workflows +2. **Task Runner**: Executes individual tasks using container runtimes +3. **State Manager**: Persists workflow state for durability with support for multiple backends +4. **Scheduler**: Determines which nodes can be executed based on dependencies +5. **State Synchronizer**: Manages state updates using diff-based synchronization +6. **Template Manager**: Manages reusable workflow templates + +## Workflow Definition + +Workflows can be defined in either JSON or YAML format. The engine automatically detects the format based on the file extension or attempts to parse both formats if the extension is not recognized. + +### YAML Format + +```yaml +version: "1" + +state: + schema: + - name: i18nShardsTs + type: array + items: + type: object + properties: + team: + type: string + shardId: + type: string + +templates: + - id: checkout-repo + name: Checkout Repository + description: Standard process for checking out a repository + runtime: + type: docker + image: alpine/git:latest + inputs: + - name: repo_url + type: string + required: true + description: "URL of the repository to checkout" + default: ${params.repo_url} + steps: + - id: clone + name: Clone repository + commands: + - git clone ${inputs.repo_url} repo + +nodes: + - id: evaluate-codeowners + name: Evaluate codeowners + description: Shard the Codemod run into smaller chunks based on the codeowners + type: automatic + runtime: + type: docker + image: node:18-alpine + steps: + - id: checkout-repo + name: Checkout repo + uses: + - template: checkout-repo + inputs: + repo_url: ${params.repo_url} + + - id: run-codemod-ts + name: I18n Codemod (TS) + description: Run the i18n codemod on the TS files + type: automatic + trigger: + type: manual + depends_on: + - evaluate-codeowners + strategy: + type: matrix + from_state: i18nShardsTs +``` + +### Workflow Components + +#### State Schema + +The `state` section defines the schema for workflow state: + +```yaml +state: + schema: + - name: stateName + type: array|object|string|number|boolean + items: # For array types + type: object + properties: + property1: + type: string +``` + +#### Templates + +Templates are reusable workflow components: + +```yaml +templates: + - id: template-id + name: Template Name + description: Template description + runtime: + type: docker + image: image:tag + inputs: + - name: input_name + type: string|number|boolean + required: true|false + description: "Input description" + default: default_value + steps: + - id: step-id + name: Step name + commands: + - command1 + - command2 + outputs: + - name: output_name + value: "output_value" +``` + +#### Nodes + +Nodes are the main execution units in a workflow: + +```yaml +nodes: + - id: node-id + name: Node Name + description: Node description + type: automatic|manual + trigger: + type: manual|automatic + depends_on: + - other-node-id + runtime: + type: docker + image: image:tag + strategy: + type: matrix + from_state: stateName + steps: + - id: step-id + name: Step name + uses: + - template: template-id + inputs: + input_name: value + - id: another-step + name: Another step + commands: + - command1 + - command2 + env: + ENV_VAR: value +``` + +### Node Configuration + +Each node in a workflow can have the following properties: + +| Property | Description | +| ------------- | ------------------------------------------------------------- | +| `id` | Unique identifier for the node | +| `name` | Human-readable name | +| `description` | Detailed description of what the node does | +| `type` | Either "automatic" or "manual" | +| `depends_on` | Array of node IDs that must complete before this node can run | +| `strategy` | Configuration for running multiple instances of this node | +| `trigger` | Configuration for how the node is triggered | +| `runtime` | Container runtime configuration | +| `steps` | Array of steps to execute within the node | +| `env` | Environment variables to inject into the container | + +### Step Configuration + +Steps within a node can have the following properties: + +| Property | Description | +| ------------- | ------------------------------------------ | +| `id` | Unique identifier for the step | +| `name` | Human-readable name | +| `description` | Detailed description of what the step does | +| `uses` | Template to use for this step | +| `commands` | Array of commands to run | + +## Node vs Task + +In Butterflow: + +- A **Node** is part of the workflow definition - it's a static component defined in your workflow YAML/JSON +- A **Task** is a runtime instance of a node that is executed - it's created dynamically during workflow execution + +This distinction is particularly important in two scenarios: + +1. **Matrix Strategies**: When a node uses a matrix strategy, multiple tasks are created from a single node definition - one for each combination in the matrix. Each task has its own unique UUID v4, status, and state. + +2. **Manual Triggers**: When a node requires manual triggering (either because it's a manual node type or has a manual trigger configuration), the workflow engine creates the task(s) for that node, marks them as `AwaitingTrigger`, and then provides the specific task UUIDs to the user. This allows users to trigger individual tasks rather than entire nodes. + +When a task is executed, it will run all the steps defined in its associated node. Tasks are always identified by UUID v4s to ensure uniqueness and proper tracking throughout the workflow execution. + +### Task Creation and Management + +Tasks are created as soon as the workflow runner has determined what needs to be executed next. This is particularly important for proper dependency tracking and execution planning: + +- **Regular nodes**: A single task is created for each regular node +- **Matrix nodes**: For matrix nodes, multiple tasks are created: + - One "master task" that represents the overall state of the node + - Individual tasks for each matrix combination + +The master task for a matrix node is not runnable itself - it exists solely to track the collective status of all matrix combination tasks. For example, if a matrix node has 10 combinations, the system will create 11 tasks: 1 master task plus 10 individual combination tasks. + +If the matrix input is not yet available (e.g., it depends on the output of a previous node), no tasks will be created for that node, and all dependencies of that node will be marked as blocked until the matrix input becomes available. + +### Example: Matrix Node with Master Task + +Consider a matrix node that processes different regions: + +```yaml +nodes: + - id: process-regions + name: Process Regions + type: automatic + strategy: + type: matrix + values: + - region: us-east + - region: us-west + - region: eu-central + # ... +``` + +When this workflow runs, it will: +1. Create a master task with a UUID v4 (e.g., `123e4567-e89b-12d3-a456-426614174000`) +2. Create three individual tasks with unique UUID v4s (e.g., + - `123e4567-e89b-12d3-a456-426614174001` + - `123e4567-e89b-12d3-a456-426614174002` + - `123e4567-e89b-12d3-a456-426614174003`) +3. Track the overall status via the master task +4. Execute each individual task according to the workflow rules + +The master task's status is derived from its child tasks: +- If all child tasks are `Completed`, the master task is `Completed` +- If any child task is `Failed`, the master task is `Failed` +- If some child tasks are `Running` and none have `Failed`, the master task is `Running` + +## Workflow vs Workflow Run + +It's important to distinguish between a workflow and a workflow run: + +- A **Workflow** is the definition of the process - the YAML/JSON file that describes nodes, dependencies, and execution rules +- A **Workflow Run** is a specific execution instance of a workflow - created when a user initiates the workflow + +Each workflow run is assigned a unique UUID v4 identifier that distinguishes it from other runs of the same workflow. + +### Key Differences + +| Workflow | Workflow Run | +| -------- | ------------ | +| Static definition | Runtime instance with UUID v4 identifier | +| Defined in YAML/JSON | Created when workflow is executed | +| Contains node definitions | Contains task instances with UUID v4s | +| No state | Has associated state | + +### State Management in Workflow Runs + +Each workflow run has its own state that includes: + +- A copy of the original workflow definition +- Current status of all tasks +- All resolved variables and parameters +- Execution history and logs +- Matrix configurations and generated tasks + +This state is tied specifically to the workflow run UUID, not to the workflow definition itself. This means multiple runs of the same workflow will each have their own independent state. + +## Variable Resolution + +Butterflow supports a powerful variable resolution system using the `${...}` syntax. This allows you to reference parameters, environment variables, and state values throughout your workflow definition. + +### Variable Types + +- **Parameters**: Accessed with `${params.name}` - values passed when starting the workflow +- **Environment Variables**: Accessed with `${env.NAME}` - environment variables from the execution environment +- **State Values**: Accessed with `${state.name}` - values stored in the workflow state +- **Task Outputs**: Accessed with `${tasks.node_id.outputs.name}` - outputs from previous tasks +- **Matrix Values**: In matrix tasks, matrix values are directly accessible as variables + +### Variable Resolution Examples + +```yaml +nodes: + - id: example-node + name: "Processing ${params.repo_name}" + description: "Running on branch ${params.branch}" + env: + REPO_URL: "${params.repo_url}" + DEBUG: "${env.CI}" + PREVIOUS_RESULT: "${tasks.previous-node.outputs.result}" + steps: + - id: process + name: Process Data + commands: + - echo "Processing data for ${params.repo_name}" +``` + +In matrix nodes, the matrix values are directly accessible in commands: + +```yaml +nodes: + - id: process-regions + strategy: + type: matrix + values: + - region: us-east + zone: zone-1 + - region: us-west + zone: zone-2 + steps: + - id: process + commands: + - echo "Processing region $region in zone $zone" +``` + +### Variable Resolution Order + +Variables are resolved in the following order: + +1. Matrix values (for matrix tasks) +2. Parameters passed to the workflow run +3. Environment variables +4. State values +5. Task outputs from completed tasks + +If a variable cannot be resolved, the workflow engine will either: +- Use a default value if specified +- Fail with an error if the variable is required and no default is provided + +### Example: Workflow with Parameters + +When running a workflow with parameters: + +```bash +butterflow run -w workflow.yaml --param repo_url=https://github.com/example/repo --param branch=main +``` + +This will generate a UUID v4 for the workflow run, which you can use to reference this specific execution later. + +### Example: Matrix Node with Manual Trigger + +Consider a node with a matrix strategy that generates 3 tasks, and requires manual triggering: + +```yaml +nodes: + - id: process-regions + name: Process Regions + type: automatic + trigger: + type: manual + strategy: + type: matrix + values: + - region: us-east + - region: us-west + - region: eu-central + # ... +``` + +When this workflow runs, it will: +1. Create three distinct tasks with unique UUID v4s +2. Mark each task as `AwaitingTrigger` +3. Provide these specific task UUIDs to the user + +The user can then choose to trigger individual tasks: +```bash +# Trigger a specific task using its UUID +butterflow resume -i 123e4567-e89b-12d3-a456-426614174000 -t 123e4567-e89b-12d3-a456-426614174001 +``` + +Or trigger all awaiting tasks: +```bash +# Trigger all awaiting tasks +butterflow resume -i 123e4567-e89b-12d3-a456-426614174000 --trigger-all +``` + +This granular control allows for more flexible workflow execution, especially in complex scenarios where different parts of a matrix might need to be triggered by different users or at different times. + +### Task Status Tracking + +During workflow execution, each task (node instance) can have one of the following statuses: + +| Status | Description | +| ------ | ----------- | +| `Pending` | Task hasn't started execution yet | +| `Running` | Task is currently being executed | +| `Completed` | Task has completed successfully | +| `Failed` | Task execution failed with an error | +| `AwaitingTrigger` | Task is waiting for a manual trigger | +| `Blocked` | Task is blocked by dependencies | +| `WontDo` | Task will not be executed (typically happens when re-running a workflow where matrix inputs have changed) | + +The workflow engine tracks these statuses in the state backend, allowing for: +- Resuming workflows after interruptions +- Visualizing workflow progress +- Identifying bottlenecks or failures +- Supporting partial re-runs of workflows + +For matrix nodes, each generated task has its own status, enabling fine-grained tracking of execution progress. + +### Manual Nodes and Triggers + +Butterflow supports two types of manual intervention in workflows: + +1. **Manual node type**: When a node is defined with `type: manual`, it will always pause execution and wait for a manual trigger, even if all dependencies are satisfied. Manual nodes allow you to insert human verification or decision points in your workflow. + +2. **Manual trigger**: Nodes defined with `trigger: { type: manual }` will pause execution until explicitly triggered, even if they're automatic nodes. This is useful for creating approval gates or scheduling specific parts of a workflow. + +When a workflow encounters a manual node or a node with a manual trigger, it: + +1. Pauses execution of that branch of the workflow +2. Marks the node as `AwaitingTrigger` +3. Continues executing other branches that aren't blocked +4. Persists the workflow state so it can be resumed later + +Resuming a workflow with manual nodes: + +```bash +# Trigger a specific node and resume +butterflow resume -i workflow-run-id -t task-id + +# Trigger all manual nodes and resume +butterflow resume -i workflow-run-id --trigger-all +``` + +When resumed, the workflow will continue execution from exactly where it left off, preserving all state and completed work. + +## Container Execution + +Each task runs in its own container. Butterflow supports multiple container runtimes: + +- Docker: Uses the Docker daemon to run containers +- Podman: Uses Podman to run containers +- Direct: Runs commands directly on the host + +## State Management + +Butterflow implements a comprehensive backend-agnostic state management system: + +- **Diff-based updates**: Only changes to the state are transmitted, reducing bandwidth and improving performance +- **Multiple backend support**: Store state in local files, databases, or remote APIs +- **API integration**: Send state diffs to external systems via API calls with retry mechanisms +- **Conflict resolution**: Smart merging of concurrent state updates +- **Transactional updates**: Ensure state consistency even during failures +- **State versioning**: Track changes to workflow state over time with version numbers +- **Schema validation**: Validate state against defined schemas +- **Local fallback**: API backends automatically fall back to local storage if remote is unavailable + +The state management system tracks: + +- Current status of each node/task +- Outputs from completed steps +- Matrix configurations and generated tasks +- Manual trigger status +- Execution history and logs + +### State Persistence + +Workflow state is persisted automatically during execution, allowing for: + +- **Resume after interruptions**: If a workflow is stopped or crashes, it can resume from the last saved state +- **Periodic checkpoints**: State is saved at regular intervals during execution +- **Failure recovery**: If a node fails, the workflow can be retried from the failure point +- **Manual triggers**: State is saved when manual nodes are triggered +- **Efficient storage**: Only changes (diffs) are stored between saves, reducing storage requirements + +### Transactional State Updates + +Butterflow uses a diff-based approach for state updates, where the runner sends only the changes (diffs) to the state persistence adapter rather than the entire new state. This approach provides several key benefits: + +- **Concurrent workflow execution**: Multiple users can run the same workflow for different tasks simultaneously without state updates interfering with each other +- **Atomic updates**: Each state change is applied as an atomic transaction, ensuring consistency +- **Reduced network traffic**: Only sending diffs significantly reduces the amount of data transferred +- **Conflict prevention**: Changes to different parts of the state can be applied independently +- **Audit trail**: The sequence of diffs provides a complete history of state changes + +For example, if two different users are running the same workflow but working on different tasks: +1. User A updates the state of Task 1 from "Pending" to "Running" +2. User B updates the state of Task 2 from "Pending" to "Running" +3. The state persistence adapter receives these diffs separately and applies them sequentially +4. Both updates are preserved without overwriting each other + +This transactional approach ensures that concurrent workflow executions remain isolated and don't interfere with each other's state, even when sharing the same workflow definition and state backend. + +### State Backend Configuration + +Create a `butterflow.config.yaml` file and supply it to the cli app as -c config. + +```yaml +stateManagement: + backend: "api" # Options: "local", "database", "api" + apiConfig: + endpoint: "https://api.example.com/workflow-state" + authToken: "${ENV_AUTH_TOKEN}" + retryConfig: + maxRetries: 3 + backoffFactor: 1.5 +``` + +## Usage Examples + +### Running a Simple Workflow + +```bash +$ butterflow run -w workflow.yaml --param repo_url=https://github.com/example/repo --param branch=main +Executing workflow... +[2023-06-15T10:30:45Z] Workflow started with ID: 123e4567-e89b-12d3-a456-426614174000 +[2023-06-15T10:30:45Z] Task checkout-repo (123e4567-e89b-12d3-a456-426614174001): Running +[2023-06-15T10:31:15Z] Task checkout-repo (123e4567-e89b-12d3-a456-426614174001): Completed +[2023-06-15T10:31:15Z] Task evaluate-codeowners (123e4567-e89b-12d3-a456-426614174002): Running +[2023-06-15T10:32:30Z] Task evaluate-codeowners (123e4567-e89b-12d3-a456-426614174002): Completed +[2023-06-15T10:32:30Z] Created matrix tasks for run-codemod-ts: + - Master task: 123e4567-e89b-12d3-a456-426614174003 + - Team frontend: 123e4567-e89b-12d3-a456-426614174004 (AwaitingTrigger) + - Team backend: 123e4567-e89b-12d3-a456-426614174005 (AwaitingTrigger) + - Team shared: 123e4567-e89b-12d3-a456-426614174006 (AwaitingTrigger) +[2023-06-15T10:32:30Z] Workflow paused: Manual triggers required + +Workflow is awaiting manual triggers for the following tasks: +- 123e4567-e89b-12d3-a456-426614174004 (run-codemod-ts, team: frontend) +- 123e4567-e89b-12d3-a456-426614174005 (run-codemod-ts, team: backend) +- 123e4567-e89b-12d3-a456-426614174006 (run-codemod-ts, team: shared) + +Use 'butterflow status -i 123e4567-e89b-12d3-a456-426614174000' to check status +Use 'butterflow resume -i 123e4567-e89b-12d3-a456-426614174000 --trigger-all' to trigger all tasks +``` + +The workflow run ID (123e4567-e89b-12d3-a456-426614174000) is generated automatically and will be needed for subsequent commands. Note that the command doesn't return until all automatically runnable tasks are completed or the workflow is paused due to manual triggers. + +### Checking Workflow Status + +```bash +$ butterflow status -i 123e4567-e89b-12d3-a456-426614174000 +Workflow: simple-workflow (ID: 123e4567-e89b-12d3-a456-426614174000) +Status: AwaitingTrigger +Started: 2023-06-15T10:30:45Z +Duration: 00:05:23 + +Tasks: +- checkout-repo (123e4567-e89b-12d3-a456-426614174001): Completed +- evaluate-codeowners (123e4567-e89b-12d3-a456-426614174002): Completed +- run-codemod-ts (master) (123e4567-e89b-12d3-a456-426614174003): AwaitingTrigger + - run-codemod-ts (team: frontend) (123e4567-e89b-12d3-a456-426614174004): AwaitingTrigger + - run-codemod-ts (team: backend) (123e4567-e89b-12d3-a456-426614174005): AwaitingTrigger + - run-codemod-ts (team: shared) (123e4567-e89b-12d3-a456-426614174006): AwaitingTrigger + +Manual triggers required: +- 123e4567-e89b-12d3-a456-426614174004 (run-codemod-ts, team: frontend) +- 123e4567-e89b-12d3-a456-426614174005 (run-codemod-ts, team: backend) +- 123e4567-e89b-12d3-a456-426614174006 (run-codemod-ts, team: shared) +``` + +### Resuming a Workflow + +When a workflow has tasks awaiting manual triggers, you can resume it by triggering specific tasks: + +```bash +$ butterflow resume -i 123e4567-e89b-12d3-a456-426614174000 -t 123e4567-e89b-12d3-a456-426614174004 +Resuming workflow 123e4567-e89b-12d3-a456-426614174000... +[2023-06-15T10:40:15Z] Task 123e4567-e89b-12d3-a456-426614174004 (run-codemod-ts, team: frontend) triggered +[2023-06-15T10:40:15Z] Task 123e4567-e89b-12d3-a456-426614174004 (run-codemod-ts, team: frontend): Running +[2023-06-15T10:42:30Z] Task 123e4567-e89b-12d3-a456-426614174004 (run-codemod-ts, team: frontend): Completed +[2023-06-15T10:42:30Z] Workflow paused: Manual triggers still required + +Workflow is still awaiting manual triggers for the following tasks: +- 123e4567-e89b-12d3-a456-426614174005 (run-codemod-ts, team: backend) +- 123e4567-e89b-12d3-a456-426614174006 (run-codemod-ts, team: shared) +``` + +Or trigger all awaiting tasks at once: + +```bash +$ butterflow resume -i 123e4567-e89b-12d3-a456-426614174000 --trigger-all +Resuming workflow 123e4567-e89b-12d3-a456-426614174000... +[2023-06-15T10:45:00Z] Triggering all awaiting tasks: + - 123e4567-e89b-12d3-a456-426614174005 (run-codemod-ts, team: backend) + - 123e4567-e89b-12d3-a456-426614174006 (run-codemod-ts, team: shared) +[2023-06-15T10:45:00Z] Task 123e4567-e89b-12d3-a456-426614174005 (run-codemod-ts, team: backend): Running +[2023-06-15T10:47:15Z] Task 123e4567-e89b-12d3-a456-426614174005 (run-codemod-ts, team: backend): Completed +[2023-06-15T10:47:15Z] Task 123e4567-e89b-12d3-a456-426614174006 (run-codemod-ts, team: shared): Running +[2023-06-15T10:49:30Z] Task 123e4567-e89b-12d3-a456-426614174006 (run-codemod-ts, team: shared): Completed +[2023-06-15T10:49:30Z] Task 123e4567-e89b-12d3-a456-426614174003 (run-codemod-ts master): Completed +[2023-06-15T10:49:30Z] All tasks completed successfully +[2023-06-15T10:49:30Z] Workflow completed +``` + +After all tasks are completed, you can check the final status: + +```bash +$ butterflow status -i 123e4567-e89b-12d3-a456-426614174000 +Workflow: simple-workflow (ID: 123e4567-e89b-12d3-a456-426614174000) +Status: Completed +Started: 2023-06-15T10:30:45Z +Completed: 2023-06-15T10:49:30Z +Duration: 00:18:45 + +Tasks: +- checkout-repo (123e4567-e89b-12d3-a456-426614174001): Completed +- evaluate-codeowners (123e4567-e89b-12d3-a456-426614174002): Completed +- run-codemod-ts (master) (123e4567-e89b-12d3-a456-426614174003): Completed + - run-codemod-ts (team: frontend) (123e4567-e89b-12d3-a456-426614174004): Completed + - run-codemod-ts (team: backend) (123e4567-e89b-12d3-a456-426614174005): Completed + - run-codemod-ts (team: shared) (123e4567-e89b-12d3-a456-426614174006): Completed + +Manual triggers required: None +``` + +### Validating a Workflow Definition + +Before running a workflow, you can validate its definition: + +```bash +$ butterflow validate -w workflow.yaml +✓ Workflow definition is valid +Schema validation: Passed +Node dependencies: Valid +Template references: Valid +``` + +If there are issues with the workflow definition: + +```bash +$ butterflow validate -w invalid-workflow.yaml +✗ Workflow definition is invalid +Error at nodes[2].strategy: Matrix strategy requires 'values' or 'from_state' property +Error at nodes[1].depends_on[0]: Referenced node 'non-existent-node' does not exist +``` + +### Listing Workflow Runs + +You can list all workflow runs: + +```bash +$ butterflow list +Recent workflow runs: +- ID: 123e4567-e89b-12d3-a456-426614174000 + Name: simple-workflow + Status: Completed + Started: 2023-06-15T10:30:45Z + Completed: 2023-06-15T10:49:30Z + Duration: 00:18:45 + +- ID: 123e4567-e89b-12d3-a456-426614174010 + Name: matrix-workflow + Status: Completed + Started: 2023-06-14T15:20:30Z + Completed: 2023-06-14T17:05:52Z + Duration: 01:45:22 + +- ID: 123e4567-e89b-12d3-a456-426614174020 + Name: manual-workflow + Status: Failed + Started: 2023-06-13T09:10:15Z + Failed: 2023-06-13T09:16:00Z + Duration: 00:05:45 +``` + +### Canceling a Workflow Run + +If you need to stop a workflow run: + +```bash +$ butterflow cancel -i 123e4567-e89b-12d3-a456-426614174000 +Canceling workflow run 123e4567-e89b-12d3-a456-426614174000... +[2023-06-15T10:35:00Z] Sending termination signal to running tasks +[2023-06-15T10:35:02Z] Task 123e4567-e89b-12d3-a456-426614174004 (run-codemod-ts, team: frontend): Terminated +[2023-06-15T10:35:02Z] Task 123e4567-e89b-12d3-a456-426614174005 (run-codemod-ts, team: backend): Terminated +[2023-06-15T10:35:03Z] Workflow run canceled successfully +``` + +### Running with Custom Configuration + +You can specify a custom configuration file: + +```bash +$ butterflow run -w workflow.yaml -c butterflow.config.yaml --param repo_url=https://github.com/example/repo +Using configuration from butterflow.config.yaml +State backend: api (https://api.example.com/workflow-state) + +Executing workflow... +[2023-06-16T09:15:30Z] Workflow started with ID: 123e4567-e89b-12d3-a456-426614174030 +[2023-06-16T09:15:30Z] Task checkout-repo (123e4567-e89b-12d3-a456-426614174031): Running +[2023-06-16T09:16:00Z] Task checkout-repo (123e4567-e89b-12d3-a456-426614174031): Completed +[2023-06-16T09:16:00Z] Task evaluate-codeowners (123e4567-e89b-12d3-a456-426614174032): Running +[2023-06-16T09:17:15Z] Task evaluate-codeowners (123e4567-e89b-12d3-a456-426614174032): Completed +[2023-06-16T09:17:15Z] Created matrix tasks for run-codemod-ts: + - Master task: 123e4567-e89b-12d3-a456-426614174033 + - Team frontend: 123e4567-e89b-12d3-a456-426614174034 (AwaitingTrigger) + - Team backend: 123e4567-e89b-12d3-a456-426614174035 (AwaitingTrigger) +[2023-06-16T09:17:15Z] Workflow paused: Manual triggers required + +Use 'butterflow status -i 123e4567-e89b-12d3-a456-426614174030' to check status +``` + +## Implementation Details + +Butterflow is implemented in Rust with the following components: + +- Core engine using async Rust with Tokio +- Backend-agnostic state management with diff-based synchronization +- API client for remote state management +- Persistent state storage using a local database +- CLI interface for user interaction +- Support for both JSON and YAML workflow definitions +- Template management for reusable workflow components +- State schema validation + +## Roadmap + +- [ ] Support for parsing both JSON and YAML formats +- [ ] Basic workflow execution engine +- [ ] Runner runtimes: + - [ ] Simple child processes + - [ ] Docker daemon: Rely on the system's docker daemon to run the steps in a container + - [ ] Podman: Use Podman for container execution +- [ ] Matrix execution support +- [ ] Manual triggers and resumable workflows +- [ ] Backend-agnostic state management +- [ ] Diff-based state synchronization +- [ ] API integration for state updates +- [ ] State persistence +- [ ] State-based workflow resumability +- [ ] Template management +- [ ] State schema validation +- [ ] CLI interface with resume capability +- [ ] Web UI for monitoring and manual intervention +- [ ] Distributed execution + +## Advanced Features + +### Multi-Step Nodes + +Nodes can contain multiple steps that are executed sequentially. This allows for organizing complex tasks into logical units: + +```yaml +nodes: + - id: process-data + name: Process Data + type: automatic + steps: + - id: validate + name: Validate Data + commands: + - echo "Validating data" + + - id: transform + name: Transform Data + commands: + - echo "Transforming data" + + - id: save + name: Save Results + commands: + - echo "Saving results" +``` + +Each step is executed in order, and if any step fails, the entire node is marked as failed. + +### Script-Based Commands + +Commands can include multi-line scripts with shebang lines to specify the interpreter: + +```yaml +steps: + - id: run-script + name: Run Script + commands: + - | + #!/bin/bash + echo "Running bash script" + echo "Current directory: $(pwd)" + echo "Files in directory:" + ls -la +``` + +For Python scripts: + +```yaml +steps: + - id: run-python + name: Run Python Script + commands: + - | + #!/usr/bin/env python + import os + import sys + + print("Running Python script") + print(f"Python version: {sys.version}") + print(f"Environment variables:") + print(f" REPO_URL: {os.environ.get('REPO_URL')}") +``` + +The engine automatically detects the script type based on the shebang line and executes it with the appropriate interpreter. + +### Step-Level Environment Variables + +In addition to node-level environment variables, you can define environment variables at the step level: + +```yaml +nodes: + - id: generate-report + name: Generate Report + type: automatic + steps: + - id: collect-results + name: Collect Results + commands: + - echo "Collecting results" + env: + SCRIPT_LANGUAGE: "bash" + + - id: create-report + name: Create Report + commands: + - echo "Creating report" + env: + SCRIPT_LANGUAGE: "python" + env: + REPORT_FORMAT: "html" +``` + +Step-level environment variables override node-level variables with the same name. + +### Template Usage with Inputs + +Templates can be used with specific inputs to customize their behavior: + +```yaml +nodes: + - id: checkout-code + name: Checkout Code + type: automatic + steps: + - id: checkout + name: Checkout Repository + uses: + - template: checkout-repo + inputs: + repo_url: ${params.repo_url} + branch: "feature/new-feature" + depth: 1 +``` + +This allows for reusing common functionality with different parameters across multiple nodes. + +### Matrix Strategy with Values + +Matrix strategies can use explicit values instead of state: + +```yaml +nodes: + - id: process-regions + name: Process Regions + type: automatic + strategy: + type: matrix + values: + - region: us-east + - region: us-west + - region: eu-central + steps: + - id: process + name: Process Region + commands: + - echo "Processing region $region" +``` + +This creates a separate task for each value in the matrix. + +### Matrix Strategy with State + +Matrix strategies can also use values from the workflow state: + +```yaml +nodes: + - id: process-files + name: Process Files + type: automatic + strategy: + type: matrix + from_state: files + steps: + - id: process + name: Process File + commands: + - echo "Processing file $file" +``` + +This creates a task for each item in the `files` state array. + +### Automatic vs Manual Nodes + +Butterflow supports both automatic and manual nodes: + +```yaml +nodes: + - id: automatic-node + name: Automatic Node + type: automatic + steps: + - id: step1 + commands: + - echo "Running automatically" + + - id: manual-node + name: Manual Node + type: manual + depends_on: + - automatic-node + steps: + - id: step1 + commands: + - echo "Running after manual approval" +``` + +Automatic nodes run as soon as their dependencies are satisfied, while manual nodes require explicit triggering even if all dependencies are met. + +### Manual Triggers for Automatic Nodes + +Automatic nodes can be configured to require manual triggering: + +```yaml +nodes: + - id: approval-gate + name: Approval Gate + type: automatic + trigger: + type: manual + steps: + - id: step1 + commands: + - echo "Running after manual approval" +``` + +This is useful for creating approval gates in otherwise automatic workflows. + +## Complete Workflow Examples + +### Simple Workflow + +A basic workflow with sequential nodes: + +```yaml +version: "1" +nodes: + - id: hello-world + name: Hello World + type: automatic + steps: + - id: hello + commands: + - echo "Hello, World!" + + - id: current-time + name: Current Time + type: automatic + depends_on: + - hello-world + steps: + - id: time + commands: + - date +``` + +### Matrix Workflow + +A workflow with matrix-based parallel execution: + +```yaml +version: "1" +nodes: + - id: setup + name: Setup + type: automatic + steps: + - id: step1 + commands: + - echo "Setting up environment" + + - id: process-regions + name: Process Regions + type: automatic + depends_on: + - setup + strategy: + type: matrix + values: + - region: us-east + - region: us-west + - region: eu-central + steps: + - id: process + commands: + - echo "Processing region $region" + + - id: finalize + name: Finalize + type: automatic + depends_on: + - process-regions + steps: + - id: step1 + commands: + - echo "Finalizing processing" +``` + +### Complex Workflow with Templates and Manual Approval + +A more complex workflow with templates, matrix execution, and manual approval: + +```yaml +version: "1" +state: + schema: + - name: i18nShardsTs + type: array + items: + type: object + properties: + team: + type: string + shardId: + type: string + +templates: + - id: checkout-repo + name: Checkout Repository + inputs: + - name: repo_url + type: string + required: true + - name: branch + type: string + default: "main" + steps: + - id: clone + commands: + - git clone ${inputs.repo_url} repo + - cd repo && git checkout ${inputs.branch} + +nodes: + - id: evaluate-codeowners + name: Evaluate codeowners + type: automatic + steps: + - id: checkout-repo + uses: + - template: checkout-repo + inputs: + repo_url: ${params.repo_url} + branch: ${params.branch} + - id: evaluate + commands: + - echo "Evaluating codeowners" + + - id: run-codemod-ts + name: I18n Codemod (TS) + type: automatic + trigger: + type: manual + depends_on: + - evaluate-codeowners + strategy: + type: matrix + from_state: i18nShardsTs + steps: + - id: run-codemod + commands: + - echo "Running TS codemod for team ${team}" +``` + +These examples demonstrate the flexibility and power of Butterflow for defining complex workflow processes. + +## Workflow Validation + +Butterflow performs comprehensive validation of workflow definitions before execution to ensure they are well-formed and can be executed correctly. This validation happens automatically when a workflow is loaded, but can also be performed explicitly using the `validate` command. + +### Validation Checks + +The following validation checks are performed: + +1. **Schema Validation**: Ensures the workflow definition adheres to the expected schema +2. **Node ID Uniqueness**: Verifies that all node IDs are unique +3. **Step ID Uniqueness**: Checks that step IDs within a node are unique +4. **Template ID Uniqueness**: Ensures all template IDs are unique +5. **Dependency Validation**: Verifies that all referenced dependencies exist +6. **Cyclic Dependency Detection**: Prevents circular dependencies between nodes +7. **Template Reference Validation**: Ensures all template references are valid +8. **Matrix Strategy Validation**: Verifies that matrix strategies have valid configurations +9. **State Schema Validation**: Ensures state schema definitions are valid +10. **Variable Reference Validation**: Checks that variable references follow the correct syntax + +### Cyclic Dependency Prevention + +One of the most important validation checks is the detection of cyclic dependencies. A cyclic dependency occurs when a node depends on itself, either directly or indirectly through other nodes. For example: + +```yaml +nodes: + - id: node-a + depends_on: + - node-b + + - id: node-b + depends_on: + - node-c + + - id: node-c + depends_on: + - node-a # Creates a cycle: node-a → node-b → node-c → node-a +``` + +Butterflow detects such cycles using a topological sorting algorithm and reports them as validation errors: + +```bash +$ butterflow validate -w cyclic-workflow.yaml +✗ Workflow definition is invalid +Error: Cyclic dependency detected: node-a → node-b → node-c → node-a +``` + +### Dependency Graph Visualization + +Butterflow can generate a visual representation of the workflow's dependency graph to help identify potential issues: + +```bash +$ butterflow graph -w workflow.yaml -o workflow-graph.png +Generating dependency graph... +Graph saved to workflow-graph.png +``` + +This visualization can be helpful for understanding complex workflows and verifying that dependencies are correctly defined. + +### Validation Examples + +#### Valid Workflow + +```bash +$ butterflow validate -w valid-workflow.yaml +✓ Workflow definition is valid +Schema validation: Passed +Node dependencies: Valid (3 nodes, 2 dependency relationships) +Template references: Valid (2 templates, 3 references) +Matrix strategies: Valid (1 matrix node) +State schema: Valid (2 schema definitions) +``` + +#### Invalid Workflow with Multiple Issues + +```bash +$ butterflow validate -w invalid-workflow.yaml +✗ Workflow definition is invalid +Error at nodes[2].strategy: Matrix strategy requires 'values' or 'from_state' property +Error at nodes[1].depends_on[0]: Referenced node 'non-existent-node' does not exist +Error: Cyclic dependency detected: node-a → node-b → node-a +Error at nodes[0].steps[1].uses[0]: Referenced template 'non-existent-template' does not exist +``` + +### Automatic Validation + +Validation is performed automatically when running a workflow: + +```bash +$ butterflow run -w invalid-workflow.yaml +✗ Workflow definition is invalid +Error: Cyclic dependency detected: node-a → node-b → node-a +Workflow execution aborted +``` + +This ensures that only valid workflows are executed, preventing potential runtime issues. diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml new file mode 100644 index 000000000..bf9519cba --- /dev/null +++ b/crates/cli/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "butterflow-cli" +version = "0.1.0" +edition = "2021" +authors.workspace = true +description.workspace = true +documentation.workspace = true +repository.workspace = true +license.workspace = true +rust-version.workspace = true + +[dependencies] +butterflow-models = { path = "../models" } +butterflow-core = { path = "../core" } +butterflow-state = { path = "../state" } +butterflow-runners = { path = "../runners" } +tokio = { workspace = true } +clap = { version = "4.5", features = ["derive"] } +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +anyhow = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +log = { workspace = true } +env_logger = { workspace = true } +dirs = "5.0" diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs new file mode 100644 index 000000000..c5c141ff1 --- /dev/null +++ b/crates/cli/src/main.rs @@ -0,0 +1,614 @@ +use anyhow::{Context, Result}; +use clap::{Parser, Subcommand}; +use log::{error, info}; +use std::collections::HashMap; +use std::fs; +use std::path::{Path, PathBuf}; +use uuid::Uuid; + +use butterflow_core::engine::Engine; +use butterflow_core::utils; +use butterflow_models::{Task, TaskStatus, WorkflowStatus}; +use butterflow_state::ApiStateAdapter; + +#[derive(Parser)] +#[command(name = "butterflow")] +#[command(about = "A self-hostable workflow engine for code transformations", long_about = None)] +struct Cli { + #[command(subcommand)] + command: Commands, + + /// Path to configuration file + #[arg(short, long, value_name = "FILE")] + config: Option, + + /// Verbose output + #[arg(short, long)] + verbose: bool, +} + +#[derive(Subcommand)] +enum Commands { + /// Run a workflow + Run { + /// Path to workflow file + #[arg(short, long, value_name = "FILE")] + workflow: PathBuf, + + /// Workflow parameters (format: key=value) + #[arg(long = "param", value_name = "KEY=VALUE")] + params: Vec, + }, + + /// Resume a paused workflow + Resume { + /// Workflow run ID + #[arg(short, long)] + id: Uuid, + + /// Task ID to trigger (can be specified multiple times) + #[arg(short, long)] + task: Vec, + + /// Trigger all awaiting tasks + #[arg(long)] + trigger_all: bool, + }, + + /// Validate a workflow file + Validate { + /// Path to workflow file + #[arg(short, long, value_name = "FILE")] + workflow: PathBuf, + }, + + /// Show workflow run status + Status { + /// Workflow run ID + #[arg(short, long)] + id: Uuid, + }, + + /// List workflow runs + List { + /// Number of workflow runs to show + #[arg(short, long, default_value = "10")] + limit: usize, + }, + + /// Cancel a workflow run + Cancel { + /// Workflow run ID + #[arg(short, long)] + id: Uuid, + }, +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logger + env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); + + // Parse command line arguments + let cli = Cli::parse(); + + // Set log level based on verbose flag + if cli.verbose { + std::env::set_var("RUST_LOG", "debug"); + } else { + std::env::set_var("RUST_LOG", "info"); + } + + // Create engine + let engine = create_engine(&cli.config)?; + + // Handle command + match &cli.command { + Commands::Run { workflow, params } => { + run_workflow(&engine, workflow, params).await?; + } + Commands::Resume { + id, + task, + trigger_all, + } => { + resume_workflow(&engine, *id, task, *trigger_all).await?; + } + Commands::Validate { workflow } => { + validate_workflow(workflow)?; + } + Commands::Status { id } => { + show_status(&engine, *id).await?; + } + Commands::List { limit } => { + list_workflows(&engine, *limit).await?; + } + Commands::Cancel { id } => { + cancel_workflow(&engine, *id).await?; + } + } + + Ok(()) +} + +/// Create an engine based on configuration +fn create_engine(config_path: &Option) -> Result { + // If no config file is provided, use default local state adapter + if config_path.is_none() { + return Ok(Engine::new()); + } + + // Read config file + let config_path = config_path.as_ref().unwrap(); + let config_content = fs::read_to_string(config_path).context(format!( + "Failed to read config file: {}", + config_path.display() + ))?; + + // Parse config file + let config: serde_yaml::Value = serde_yaml::from_str(&config_content).context(format!( + "Failed to parse config file: {}", + config_path.display() + ))?; + + // Get state management configuration + let state_management = config.get("stateManagement").and_then(|v| v.as_mapping()); + if let Some(state_management) = state_management { + // Get backend type + let backend = state_management + .get(&serde_yaml::Value::String("backend".to_string())) + .and_then(|v| v.as_str()); + + match backend { + Some("api") => { + // Get API configuration + let api_config = state_management + .get(&serde_yaml::Value::String("apiConfig".to_string())) + .and_then(|v| v.as_mapping()); + + if let Some(api_config) = api_config { + // Get endpoint + let endpoint = api_config + .get(&serde_yaml::Value::String("endpoint".to_string())) + .and_then(|v| v.as_str()) + .unwrap_or("http://localhost:8080"); + + // Get auth token + let auth_token = api_config + .get(&serde_yaml::Value::String("authToken".to_string())) + .and_then(|v| v.as_str()) + .unwrap_or(""); + + // Create API state adapter + let state_adapter = Box::new(ApiStateAdapter::new( + endpoint.to_string(), + auth_token.to_string(), + )); + + return Ok(Engine::with_state_adapter(state_adapter)); + } + } + _ => { + // Use local state adapter + return Ok(Engine::new()); + } + } + } + + // Default to local state adapter + Ok(Engine::new()) +} + +/// Run a workflow +async fn run_workflow(engine: &Engine, workflow_path: &Path, params: &[String]) -> Result<()> { + // Parse workflow file + let workflow = utils::parse_workflow_file(workflow_path).context(format!( + "Failed to parse workflow file: {}", + workflow_path.display() + ))?; + + // Parse parameters + let params = utils::parse_params(params).context("Failed to parse parameters")?; + + // Run workflow + let workflow_run_id = engine + .run_workflow(workflow, params) + .await + .context("Failed to run workflow")?; + + info!("Workflow started with ID: {}", workflow_run_id); + + // Wait for workflow to complete or pause + loop { + // Get workflow status + let status = engine + .get_workflow_status(workflow_run_id) + .await + .context("Failed to get workflow status")?; + + match status { + WorkflowStatus::Completed => { + info!("Workflow completed successfully"); + break; + } + WorkflowStatus::Failed => { + error!("Workflow failed"); + break; + } + WorkflowStatus::AwaitingTrigger => { + // Get tasks awaiting trigger + let tasks = engine + .get_tasks(workflow_run_id) + .await + .context("Failed to get tasks")?; + + let awaiting_tasks: Vec<&Task> = tasks + .iter() + .filter(|t| t.status == TaskStatus::AwaitingTrigger) + .collect(); + + info!("Workflow paused: Manual triggers required"); + info!(""); + info!("Workflow is awaiting manual triggers for the following tasks:"); + for task in awaiting_tasks { + info!("- {} ({})", task.id, task.node_id); + } + info!(""); + info!( + "Use 'butterflow status -i {}' to check status", + workflow_run_id + ); + info!( + "Use 'butterflow resume -i {} --trigger-all' to trigger all tasks", + workflow_run_id + ); + break; + } + WorkflowStatus::Canceled => { + info!("Workflow was canceled"); + break; + } + _ => { + // Wait a bit before checking again + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + } + + Ok(()) +} + +/// Resume a workflow +async fn resume_workflow( + engine: &Engine, + workflow_run_id: Uuid, + task_ids: &[Uuid], + trigger_all: bool, +) -> Result<()> { + info!("Resuming workflow {}...", workflow_run_id); + + if trigger_all { + // Trigger all awaiting tasks + engine + .trigger_all(workflow_run_id) + .await + .context("Failed to trigger all tasks")?; + + info!("Triggered all awaiting tasks"); + } else if !task_ids.is_empty() { + // Trigger specific tasks + engine + .resume_workflow(workflow_run_id, task_ids.to_vec()) + .await + .context("Failed to resume workflow")?; + + info!("Triggered {} tasks", task_ids.len()); + } else { + error!("No tasks specified to trigger. Use --task or --trigger-all"); + return Ok(()); + } + + // Wait for workflow to complete or pause again + loop { + // Get workflow status + let status = engine + .get_workflow_status(workflow_run_id) + .await + .context("Failed to get workflow status")?; + + match status { + WorkflowStatus::Completed => { + info!("Workflow completed successfully"); + break; + } + WorkflowStatus::Failed => { + error!("Workflow failed"); + break; + } + WorkflowStatus::AwaitingTrigger => { + // Get tasks awaiting trigger + let tasks = engine + .get_tasks(workflow_run_id) + .await + .context("Failed to get tasks")?; + + let awaiting_tasks: Vec<&Task> = tasks + .iter() + .filter(|t| t.status == TaskStatus::AwaitingTrigger) + .collect(); + + info!("Workflow paused: Manual triggers still required"); + info!(""); + info!("Workflow is still awaiting manual triggers for the following tasks:"); + for task in awaiting_tasks { + info!("- {} ({})", task.id, task.node_id); + } + break; + } + WorkflowStatus::Canceled => { + info!("Workflow was canceled"); + break; + } + _ => { + // Wait a bit before checking again + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + } + + Ok(()) +} + +/// Validate a workflow file +fn validate_workflow(workflow_path: &Path) -> Result<()> { + // Parse workflow file + let workflow = utils::parse_workflow_file(workflow_path).context(format!( + "Failed to parse workflow file: {}", + workflow_path.display() + ))?; + + // Validate workflow + utils::validate_workflow(&workflow).context("Workflow validation failed")?; + + info!("✓ Workflow definition is valid"); + info!("Schema validation: Passed"); + info!( + "Node dependencies: Valid ({} nodes, {} dependency relationships)", + workflow.nodes.len(), + workflow + .nodes + .iter() + .map(|n| n.depends_on.len()) + .sum::() + ); + info!( + "Template references: Valid ({} templates, {} references)", + workflow.templates.len(), + workflow + .nodes + .iter() + .flat_map(|n| n.steps.iter()) + .filter_map(|s| s.uses.as_ref()) + .flat_map(|u| u.iter()) + .count() + ); + + // Count matrix nodes + let matrix_nodes = workflow + .nodes + .iter() + .filter(|n| n.strategy.is_some()) + .count(); + info!("Matrix strategies: Valid ({} matrix nodes)", matrix_nodes); + + // Count state schema definitions + let state_schema_count = workflow.state.as_ref().map(|s| s.schema.len()).unwrap_or(0); + info!( + "State schema: Valid ({} schema definitions)", + state_schema_count + ); + + Ok(()) +} + +/// Show workflow status +async fn show_status(engine: &Engine, workflow_run_id: Uuid) -> Result<()> { + // Get workflow run + let workflow_run = engine + .get_workflow_run(workflow_run_id) + .await + .context("Failed to get workflow run")?; + + // Get tasks + let tasks = engine + .get_tasks(workflow_run_id) + .await + .context("Failed to get tasks")?; + + // Print workflow info + info!( + "Workflow: {} (ID: {})", + workflow_run + .workflow + .nodes + .first() + .map(|n| n.name.as_str()) + .unwrap_or("unknown"), + workflow_run_id + ); + info!("Status: {:?}", workflow_run.status); + info!("Started: {}", workflow_run.started_at); + + if let Some(ended_at) = workflow_run.ended_at { + info!("Completed: {}", ended_at); + let duration = ended_at.signed_duration_since(workflow_run.started_at); + info!( + "Duration: {}", + utils::format_duration(duration.num_seconds() as u64) + ); + } else { + let duration = chrono::Utc::now().signed_duration_since(workflow_run.started_at); + info!( + "Duration: {} (running)", + utils::format_duration(duration.num_seconds() as u64) + ); + } + + info!(""); + info!("Tasks:"); + + // Group tasks by node + let mut tasks_by_node: HashMap> = HashMap::new(); + for task in &tasks { + tasks_by_node + .entry(task.node_id.clone()) + .or_insert_with(Vec::new) + .push(task); + } + + // Print tasks + for node in &workflow_run.workflow.nodes { + let empty_tasks: Vec<&Task> = Vec::new(); + let node_tasks = tasks_by_node.get(&node.id).unwrap_or(&empty_tasks); + + // Find master task for matrix nodes + let master_task = node_tasks + .iter() + .find(|t| t.master_task_id.is_none() && t.matrix_values.is_none()); + + if let Some(master_task) = master_task { + info!( + "- {} (master) ({}): {:?}", + node.id, master_task.id, master_task.status + ); + + // Print matrix tasks + for task in node_tasks.iter().filter(|t| t.master_task_id.is_some()) { + let matrix_info = task + .matrix_values + .as_ref() + .map(|m| { + m.iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", ") + }) + .unwrap_or_else(|| "unknown".to_string()); + info!( + " - {} ({}, {}): {:?}", + node.id, task.id, matrix_info, task.status + ); + } + } else if !node_tasks.is_empty() { + // Print regular task + let task = node_tasks[0]; + info!("- {} ({}): {:?}", node.id, task.id, task.status); + } else { + info!("- {}: No tasks", node.id); + } + } + + // Print manual triggers + let awaiting_tasks: Vec<&Task> = tasks + .iter() + .filter(|t| t.status == TaskStatus::AwaitingTrigger) + .collect(); + + if !awaiting_tasks.is_empty() { + info!(""); + info!("Manual triggers required:"); + for task in awaiting_tasks { + let node = workflow_run + .workflow + .nodes + .iter() + .find(|n| n.id == task.node_id) + .unwrap(); + let matrix_info = task + .matrix_values + .as_ref() + .map(|m| { + m.iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", ") + }) + .unwrap_or_else(|| "".to_string()); + info!("- {} ({}, {})", task.id, node.id, matrix_info); + } + } else { + info!(""); + info!("Manual triggers required: None"); + } + + Ok(()) +} + +/// List workflow runs +async fn list_workflows(engine: &Engine, limit: usize) -> Result<()> { + // Get workflow runs + let workflow_runs = engine + .list_workflow_runs(limit) + .await + .context("Failed to list workflow runs")?; + + if workflow_runs.is_empty() { + info!("No workflow runs found"); + return Ok(()); + } + + info!("Recent workflow runs:"); + for workflow_run in workflow_runs { + info!("- ID: {}", workflow_run.id); + info!( + " Name: {}", + workflow_run + .workflow + .nodes + .first() + .map(|n| n.name.as_str()) + .unwrap_or("unknown") + ); + info!(" Status: {:?}", workflow_run.status); + info!(" Started: {}", workflow_run.started_at); + + if let Some(ended_at) = workflow_run.ended_at { + match workflow_run.status { + WorkflowStatus::Completed => info!(" Completed: {}", ended_at), + WorkflowStatus::Failed => info!(" Failed: {}", ended_at), + WorkflowStatus::Canceled => info!(" Canceled: {}", ended_at), + _ => {} + } + let duration = ended_at.signed_duration_since(workflow_run.started_at); + info!( + " Duration: {}", + utils::format_duration(duration.num_seconds() as u64) + ); + } else { + let duration = chrono::Utc::now().signed_duration_since(workflow_run.started_at); + info!( + " Duration: {} (running)", + utils::format_duration(duration.num_seconds() as u64) + ); + } + + info!(""); + } + + Ok(()) +} + +/// Cancel a workflow +async fn cancel_workflow(engine: &Engine, workflow_run_id: Uuid) -> Result<()> { + info!("Canceling workflow run {}...", workflow_run_id); + + // Cancel workflow + engine + .cancel_workflow(workflow_run_id) + .await + .context("Failed to cancel workflow")?; + + info!("Workflow run canceled successfully"); + + Ok(()) +} diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml new file mode 100644 index 000000000..c01899922 --- /dev/null +++ b/crates/core/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "butterflow-core" +version = "0.1.0" +edition = "2021" +authors.workspace = true +description.workspace = true +documentation.workspace = true +repository.workspace = true +license.workspace = true +rust-version.workspace = true + +[dependencies] +butterflow-models = { path = "../models" } +butterflow-runners = { path = "../runners" } +butterflow-state = { path = "../state" } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +thiserror = { workspace = true } +anyhow = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +log = { workspace = true } +async-trait = { workspace = true } +regex = { workspace = true } +futures = { workspace = true } +reqwest = { workspace = true } + +[features] +default = [] diff --git a/crates/core/src/engine.rs b/crates/core/src/engine.rs new file mode 100644 index 000000000..c17034b98 --- /dev/null +++ b/crates/core/src/engine.rs @@ -0,0 +1,1098 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use chrono::Utc; +use log::{debug, error, info, warn}; +use tokio::sync::Mutex; +use tokio::time; +use uuid::Uuid; + +use crate::utils; +use butterflow_models::node::NodeType; +use butterflow_models::runtime::RuntimeType; +use butterflow_models::step::Step; +use butterflow_models::trigger::TriggerType; +use butterflow_models::{ + resolve_variables, DiffOperation, Error, FieldDiff, Node, Result, Task, TaskDiff, TaskStatus, + Workflow, WorkflowRun, WorkflowRunDiff, WorkflowStatus, +}; +use butterflow_runners::{DirectRunner, DockerRunner, PodmanRunner, Runner}; +use butterflow_state::{LocalStateAdapter, StateAdapter}; + +/// Workflow engine +pub struct Engine { + /// State adapter for persisting workflow state + state_adapter: Arc>>, +} + +impl Engine { + /// Create a new engine with a local state adapter + pub fn new() -> Self { + Self { + state_adapter: Arc::new(Mutex::new(Box::new(LocalStateAdapter::new()))), + } + } + + /// Create a new engine with a custom state adapter + pub fn with_state_adapter(state_adapter: Box) -> Self { + Self { + state_adapter: Arc::new(Mutex::new(state_adapter)), + } + } + + /// Run a workflow + pub async fn run_workflow( + &self, + workflow: Workflow, + params: HashMap, + ) -> Result { + // Validate the workflow + utils::validate_workflow(&workflow)?; + + // Create a new workflow run + let workflow_run_id = Uuid::new_v4(); + let workflow_run = WorkflowRun { + id: workflow_run_id, + workflow: workflow.clone(), + status: WorkflowStatus::Pending, + params: params.clone(), + tasks: Vec::new(), + started_at: Utc::now(), + ended_at: None, + state: HashMap::new(), + }; + + // Save the initial workflow run state + self.state_adapter + .lock() + .await + .save_workflow_run(&workflow_run) + .await?; + + // Start the workflow execution + let engine = self.clone(); + tokio::spawn(async move { + if let Err(e) = engine.execute_workflow(workflow_run_id).await { + error!("Workflow execution failed: {}", e); + } + }); + + Ok(workflow_run_id) + } + + /// Resume a workflow run + pub async fn resume_workflow(&self, workflow_run_id: Uuid, task_ids: Vec) -> Result<()> { + // Get the workflow run + let mut workflow_run = self + .state_adapter + .lock() + .await + .get_workflow_run(workflow_run_id) + .await?; + + // Trigger the specified tasks + let mut triggered = false; + for task_id in task_ids { + // Get the task directly from the state adapter + let task = self.state_adapter.lock().await.get_task(task_id).await?; + + if task.status == TaskStatus::AwaitingTrigger { + // Create a task diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(TaskStatus::Pending)?), + }, + ); + let task_diff = TaskDiff { + task_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_task_diff(&task_diff) + .await?; + + // Execute the task immediately + let engine = self.clone(); + tokio::spawn(async move { + if let Err(e) = engine.execute_task(task_id).await { + error!("Task execution failed: {}", e); + } + }); + + triggered = true; + info!("Triggered task {} ({})", task_id, task.node_id); + } else { + warn!("Task {} is not awaiting trigger", task_id); + } + } + + if !triggered { + return Err(Error::Other("No tasks were triggered".to_string())); + } + + // Create a workflow run diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(WorkflowStatus::Running)?), + }, + ); + let workflow_run_diff = WorkflowRunDiff { + workflow_run_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_workflow_run_diff(&workflow_run_diff) + .await?; + + // Resume workflow execution + let engine = self.clone(); + tokio::spawn(async move { + if let Err(e) = engine.execute_workflow(workflow_run_id).await { + error!("Workflow execution failed: {}", e); + } + }); + + Ok(()) + } + + /// Trigger all awaiting tasks in a workflow run + pub async fn trigger_all(&self, workflow_run_id: Uuid) -> Result<()> { + // Get the workflow run + let mut workflow_run = self + .state_adapter + .lock() + .await + .get_workflow_run(workflow_run_id) + .await?; + + // Get all tasks + let tasks = self + .state_adapter + .lock() + .await + .get_tasks(workflow_run_id) + .await?; + + // Find all tasks that are awaiting trigger + let awaiting_tasks: Vec<&Task> = tasks + .iter() + .filter(|t| t.status == TaskStatus::AwaitingTrigger) + .collect(); + + if awaiting_tasks.is_empty() { + return Err(Error::Other(format!( + "No tasks in workflow run {} are awaiting triggers", + workflow_run_id + ))); + } + + // Trigger all awaiting tasks + let mut triggered = false; + for task in awaiting_tasks { + // Create a task diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(TaskStatus::Pending)?), + }, + ); + let task_diff = TaskDiff { + task_id: task.id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_task_diff(&task_diff) + .await?; + + // Execute the task immediately + let engine = self.clone(); + let task_id = task.id; + tokio::spawn(async move { + if let Err(e) = engine.execute_task(task_id).await { + error!("Task execution failed: {}", e); + } + }); + + triggered = true; + info!("Triggered task {} ({})", task.id, task.node_id); + } + + if !triggered { + return Err(Error::Other("No tasks were awaiting trigger".to_string())); + } + + // Create a workflow run diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(WorkflowStatus::Running)?), + }, + ); + let workflow_run_diff = WorkflowRunDiff { + workflow_run_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_workflow_run_diff(&workflow_run_diff) + .await?; + + // Resume workflow execution + let engine = self.clone(); + tokio::spawn(async move { + if let Err(e) = engine.execute_workflow(workflow_run_id).await { + error!("Workflow execution failed: {}", e); + } + }); + + Ok(()) + } + + /// Cancel a workflow run + pub async fn cancel_workflow(&self, workflow_run_id: Uuid) -> Result<()> { + // Get the workflow run + let mut workflow_run = self + .state_adapter + .lock() + .await + .get_workflow_run(workflow_run_id) + .await?; + + // Check if the workflow is running or awaiting triggers + if workflow_run.status != WorkflowStatus::Running + && workflow_run.status != WorkflowStatus::AwaitingTrigger + { + return Err(Error::Other(format!( + "Workflow run {} is not running or awaiting triggers", + workflow_run_id + ))); + } + + // Get all tasks + let tasks = self + .state_adapter + .lock() + .await + .get_tasks(workflow_run_id) + .await?; + + // Cancel all running tasks + for task in tasks.iter().filter(|t| t.status == TaskStatus::Running) { + // Create a task diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(TaskStatus::Failed)?), + }, + ); + fields.insert( + "error".to_string(), + FieldDiff { + operation: DiffOperation::Add, + value: Some(serde_json::to_value("Canceled by user")?), + }, + ); + let task_diff = TaskDiff { + task_id: task.id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_task_diff(&task_diff) + .await?; + + info!("Canceled task {} ({})", task.id, task.node_id); + } + + // Create a workflow run diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(WorkflowStatus::Canceled)?), + }, + ); + fields.insert( + "ended_at".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(Utc::now())?), + }, + ); + let workflow_run_diff = WorkflowRunDiff { + workflow_run_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_workflow_run_diff(&workflow_run_diff) + .await?; + + Ok(()) + } + + /// Get workflow run status + pub async fn get_workflow_status(&self, workflow_run_id: Uuid) -> Result { + let workflow_run = self + .state_adapter + .lock() + .await + .get_workflow_run(workflow_run_id) + .await?; + Ok(workflow_run.status) + } + + /// Get workflow run + pub async fn get_workflow_run(&self, workflow_run_id: Uuid) -> Result { + self.state_adapter + .lock() + .await + .get_workflow_run(workflow_run_id) + .await + } + + /// Get tasks for a workflow run + pub async fn get_tasks(&self, workflow_run_id: Uuid) -> Result> { + self.state_adapter + .lock() + .await + .get_tasks(workflow_run_id) + .await + } + + /// List workflow runs + pub async fn list_workflow_runs(&self, limit: usize) -> Result> { + self.state_adapter + .lock() + .await + .list_workflow_runs(limit) + .await + } + + /// Execute a workflow + async fn execute_workflow(&self, workflow_run_id: Uuid) -> Result<()> { + // Get the workflow run + let mut workflow_run = self + .state_adapter + .lock() + .await + .get_workflow_run(workflow_run_id) + .await?; + + // Create a workflow run diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(WorkflowStatus::Running)?), + }, + ); + let workflow_run_diff = WorkflowRunDiff { + workflow_run_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_workflow_run_diff(&workflow_run_diff) + .await?; + + info!("Starting workflow run {}", workflow_run_id); + + // Create tasks for all nodes if they don't exist yet + let existing_tasks = self + .state_adapter + .lock() + .await + .get_tasks(workflow_run_id) + .await?; + if existing_tasks.is_empty() { + self.create_initial_tasks(&workflow_run).await?; + } + + // Main execution loop + loop { + // Get the current workflow run state + let workflow_run = self + .state_adapter + .lock() + .await + .get_workflow_run(workflow_run_id) + .await?; + + // Get all tasks + let tasks = self + .state_adapter + .lock() + .await + .get_tasks(workflow_run_id) + .await?; + + // Check if all tasks are completed or failed + let all_done = tasks.iter().all(|t| { + t.status == TaskStatus::Completed + || t.status == TaskStatus::Failed + || t.status == TaskStatus::WontDo + }); + + if all_done { + // Check if any tasks failed + let any_failed = tasks.iter().any(|t| t.status == TaskStatus::Failed); + + // Create a workflow run diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(if any_failed { + WorkflowStatus::Failed + } else { + WorkflowStatus::Completed + })?), + }, + ); + fields.insert( + "ended_at".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(Utc::now())?), + }, + ); + let workflow_run_diff = WorkflowRunDiff { + workflow_run_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_workflow_run_diff(&workflow_run_diff) + .await?; + + info!( + "Workflow run {} {}", + workflow_run_id, + if any_failed { "failed" } else { "completed" } + ); + + break; + } + + // Find runnable tasks + let runnable_tasks = self.find_runnable_tasks(&workflow_run, &tasks).await?; + + // Check if any tasks are awaiting trigger + let awaiting_trigger = tasks + .iter() + .any(|t| t.status == TaskStatus::AwaitingTrigger); + let any_running = tasks.iter().any(|t| t.status == TaskStatus::Running); + + // If there are tasks awaiting trigger and no runnable tasks and no running tasks, + // then we need to pause the workflow and wait for manual triggers + if awaiting_trigger && runnable_tasks.is_empty() && !any_running { + // Create a workflow run diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(WorkflowStatus::AwaitingTrigger)?), + }, + ); + let workflow_run_diff = WorkflowRunDiff { + workflow_run_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_workflow_run_diff(&workflow_run_diff) + .await?; + + info!("Workflow run {} is awaiting triggers", workflow_run_id); + + // Exit the execution loop, will be resumed when triggers are received + break; + } + + // Execute runnable tasks + for task_id in runnable_tasks { + let task = tasks.iter().find(|t| t.id == task_id).unwrap(); + let node = workflow_run + .workflow + .nodes + .iter() + .find(|n| n.id == task.node_id) + .unwrap(); + + // Start task execution + let engine = self.clone(); + let task_id = task.id; + tokio::spawn(async move { + if let Err(e) = engine.execute_task(task_id).await { + error!("Task execution failed: {}", e); + } + }); + } + + // Wait a bit before checking again + time::sleep(Duration::from_secs(1)).await; + } + + Ok(()) + } + + /// Create initial tasks for all nodes + async fn create_initial_tasks(&self, workflow_run: &WorkflowRun) -> Result<()> { + for node in &workflow_run.workflow.nodes { + // Check if the node has a matrix strategy + if let Some(strategy) = &node.strategy { + // Create a master task for the matrix + let master_task = Task::new_matrix_master(workflow_run.id, node.id.clone()); + self.state_adapter + .lock() + .await + .save_task(&master_task) + .await?; + + // If the matrix uses values, create tasks for each value + if let Some(values) = &strategy.values { + for value in values { + let task = Task::new_matrix( + workflow_run.id, + node.id.clone(), + master_task.id, + value.clone(), + ); + self.state_adapter.lock().await.save_task(&task).await?; + } + } + // If the matrix uses state, we'll create tasks when the state is available + } else { + // Create a single task for the node + let task = Task::new(workflow_run.id, node.id.clone()); + self.state_adapter.lock().await.save_task(&task).await?; + } + } + + Ok(()) + } + + /// Find tasks that can be executed + async fn find_runnable_tasks( + &self, + workflow_run: &WorkflowRun, + tasks: &[Task], + ) -> Result> { + let mut runnable_tasks = Vec::new(); + + for task in tasks { + // Only consider pending tasks + if task.status != TaskStatus::Pending { + continue; + } + + // Get the node for this task + let node = workflow_run + .workflow + .nodes + .iter() + .find(|n| n.id == task.node_id) + .ok_or_else(|| Error::NodeNotFound(task.node_id.clone()))?; + + // Check if the node has a manual trigger + if node.r#type == NodeType::Manual + || node + .trigger + .as_ref() + .map(|t| t.r#type == TriggerType::Manual) + .unwrap_or(false) + { + // Create a task diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(TaskStatus::AwaitingTrigger)?), + }, + ); + let task_diff = TaskDiff { + task_id: task.id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_task_diff(&task_diff) + .await?; + continue; + } + + // Check if all dependencies are satisfied + let mut dependencies_satisfied = true; + for dep_id in &node.depends_on { + // Find all tasks for this dependency + let dep_tasks: Vec<&Task> = tasks.iter().filter(|t| t.node_id == *dep_id).collect(); + + // If there are no tasks for this dependency, it's not satisfied + if dep_tasks.is_empty() { + dependencies_satisfied = false; + break; + } + + // Check if all tasks for this dependency are completed + let all_completed = dep_tasks.iter().all(|t| t.status == TaskStatus::Completed); + + if !all_completed { + dependencies_satisfied = false; + break; + } + } + + if dependencies_satisfied { + runnable_tasks.push(task.id); + } + } + + Ok(runnable_tasks) + } + + /// Execute a task + async fn execute_task(&self, task_id: Uuid) -> Result<()> { + // Get the task + let mut task = self.state_adapter.lock().await.get_task(task_id).await?; + + // Get the workflow run + let workflow_run = self + .state_adapter + .lock() + .await + .get_workflow_run(task.workflow_run_id) + .await?; + + // Get the node for this task + let node = workflow_run + .workflow + .nodes + .iter() + .find(|n| n.id == task.node_id) + .ok_or_else(|| Error::NodeNotFound(task.node_id.clone()))?; + + // Create a task diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(TaskStatus::Running)?), + }, + ); + fields.insert( + "started_at".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(Utc::now())?), + }, + ); + let task_diff = TaskDiff { + task_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_task_diff(&task_diff) + .await?; + + info!("Executing task {} ({})", task_id, node.id); + + // Create a runner for this task + let runner: Box = match node + .runtime + .as_ref() + .map(|r| r.r#type) + .unwrap_or(RuntimeType::Direct) + { + RuntimeType::Direct => Box::new(DirectRunner::new()), + RuntimeType::Docker => Box::new(DockerRunner::new()), + RuntimeType::Podman => Box::new(PodmanRunner::new()), + }; + + // Execute each step in the node + for step in &node.steps { + // Check if the step uses a template + if let Some(uses) = &step.uses { + for template_use in uses { + // Find the template + let template = workflow_run + .workflow + .templates + .iter() + .find(|t| t.id == template_use.template) + .ok_or_else(|| { + Error::Template(format!( + "Template not found: {}", + template_use.template + )) + })?; + + // Execute the template steps + for template_step in &template.steps { + // Execute the step + let result = self + .execute_step( + runner.as_ref(), + template_step, + &node, + &task, + &workflow_run, + ) + .await; + + if let Err(e) = result { + // Create a task diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(TaskStatus::Failed)?), + }, + ); + fields.insert( + "ended_at".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(Utc::now())?), + }, + ); + fields.insert( + "error".to_string(), + FieldDiff { + operation: DiffOperation::Add, + value: Some(serde_json::to_value(format!("Step {} failed: {}", template_step.id, e))?), + }, + ); + let task_diff = TaskDiff { + task_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_task_diff(&task_diff) + .await?; + + error!( + "Task {} ({}) step {} failed: {}", + task_id, node.id, template_step.id, e + ); + + return Err(e); + } + } + } + } else if let Some(commands) = &step.commands { + // Execute the step + let result = self + .execute_step(runner.as_ref(), step, &node, &task, &workflow_run) + .await; + + if let Err(e) = result { + // Create a task diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(TaskStatus::Failed)?), + }, + ); + fields.insert( + "ended_at".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(Utc::now())?), + }, + ); + fields.insert( + "error".to_string(), + FieldDiff { + operation: DiffOperation::Add, + value: Some(serde_json::to_value(format!("Step {} failed: {}", step.id, e))?), + }, + ); + let task_diff = TaskDiff { + task_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_task_diff(&task_diff) + .await?; + + error!( + "Task {} ({}) step {} failed: {}", + task_id, node.id, step.id, e + ); + + return Err(e); + } + } + } + + // Create a task diff to update the status + let mut fields = HashMap::new(); + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(TaskStatus::Completed)?), + }, + ); + fields.insert( + "ended_at".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(Utc::now())?), + }, + ); + let task_diff = TaskDiff { + task_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_task_diff(&task_diff) + .await?; + + info!("Task {} ({}) completed", task_id, node.id); + + // If this is a matrix task, update the master task status + if let Some(master_task_id) = task.master_task_id { + self.update_matrix_master_status(master_task_id).await?; + } + + Ok(()) + } + + /// Execute a step + async fn execute_step( + &self, + runner: &dyn Runner, + step: &Step, + node: &Node, + task: &Task, + workflow_run: &WorkflowRun, + ) -> Result<()> { + // Get the commands for this step + let commands = match &step.commands { + Some(commands) => commands, + None => return Ok(()), // No commands to execute + }; + + // Prepare environment variables + let mut env = HashMap::new(); + + // Add workflow parameters + for (key, value) in &workflow_run.params { + env.insert(format!("PARAM_{}", key.to_uppercase()), value.clone()); + } + + // Add node environment variables + for (key, value) in &node.env { + env.insert(key.clone(), value.clone()); + } + + // Add step environment variables + if let Some(step_env) = &step.env { + for (key, value) in step_env { + env.insert(key.clone(), value.clone()); + } + } + + // Add matrix values + if let Some(matrix_values) = &task.matrix_values { + for (key, value) in matrix_values { + env.insert(key.clone(), value.clone()); + } + } + + // Execute each command + for command in commands { + // Resolve variables in the command + let resolved_command = resolve_variables( + command, + &workflow_run.params, + &env, + &workflow_run.state, + &HashMap::new(), // TODO: Implement task outputs + task.matrix_values.as_ref(), + )?; + + // Execute the command + let output = runner.run_command(&resolved_command, &env).await?; + + // Get the current task + let mut current_task = self.state_adapter.lock().await.get_task(task.id).await?; + + // Append to the logs + current_task.logs.push(output.clone()); + + // Save the updated task + self.state_adapter.lock().await.save_task(¤t_task).await?; + + debug!("Command output: {}", output); + } + + Ok(()) + } + + /// Update the status of a matrix master task + async fn update_matrix_master_status(&self, master_task_id: Uuid) -> Result<()> { + // Get the master task + let mut master_task = self + .state_adapter + .lock() + .await + .get_task(master_task_id) + .await?; + + // Get all child tasks + let tasks = self + .state_adapter + .lock() + .await + .get_tasks(master_task.workflow_run_id) + .await?; + let child_tasks: Vec<&Task> = tasks + .iter() + .filter(|t| t.master_task_id == Some(master_task_id)) + .collect(); + + // Check if all child tasks are completed or failed + let all_completed = child_tasks + .iter() + .all(|t| t.status == TaskStatus::Completed); + + let any_failed = child_tasks.iter().any(|t| t.status == TaskStatus::Failed); + + let any_running = child_tasks.iter().any(|t| t.status == TaskStatus::Running); + + let any_awaiting = child_tasks + .iter() + .any(|t| t.status == TaskStatus::AwaitingTrigger); + + // Create a task diff to update the status + let mut fields = HashMap::new(); + + // Determine the new status + let new_status = if any_failed { + TaskStatus::Failed + } else if all_completed { + TaskStatus::Completed + } else if any_awaiting { + TaskStatus::AwaitingTrigger + } else if any_running { + TaskStatus::Running + } else { + master_task.status // Keep the current status if none of the above + }; + + fields.insert( + "status".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(new_status)?), + }, + ); + + // Add ended_at if the task is completed or failed + if any_failed || all_completed { + fields.insert( + "ended_at".to_string(), + FieldDiff { + operation: DiffOperation::Update, + value: Some(serde_json::to_value(Utc::now())?), + }, + ); + } + + let task_diff = TaskDiff { + task_id: master_task_id, + fields, + }; + + // Apply the diff + self.state_adapter + .lock() + .await + .apply_task_diff(&task_diff) + .await?; + + Ok(()) + } +} + +impl Clone for Engine { + fn clone(&self) -> Self { + Self { + state_adapter: Arc::clone(&self.state_adapter), + } + } +} diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs new file mode 100644 index 000000000..8ab427af1 --- /dev/null +++ b/crates/core/src/error.rs @@ -0,0 +1,49 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("YAML parsing error: {0}")] + YamlParsing(#[from] serde_yaml::Error), + + #[error("JSON parsing error: {0}")] + JsonParsing(#[from] serde_json::Error), + + #[error("Workflow validation error: {0}")] + WorkflowValidation(String), + + #[error("Node not found: {0}")] + NodeNotFound(String), + + #[error("Task not found: {0}")] + TaskNotFound(String), + + #[error("Cyclic dependency detected: {0}")] + CyclicDependency(String), + + #[error("Variable resolution error: {0}")] + VariableResolution(String), + + #[error("Runtime error: {0}")] + Runtime(String), + + #[error("State error: {0}")] + State(String), + + #[error("Template error: {0}")] + Template(String), + + #[error("Matrix error: {0}")] + Matrix(String), + + #[error("Docker error: {0}")] + Docker(String), + + #[error("HTTP error: {0}")] + Http(#[from] reqwest::Error), + + #[error("Other error: {0}")] + Other(String), +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs new file mode 100644 index 000000000..55c13b40a --- /dev/null +++ b/crates/core/src/lib.rs @@ -0,0 +1,17 @@ +pub mod engine; +pub mod utils; + +pub use butterflow_models::{ + Error, Node, Result, StateSchema, StateSchemaItems, StateSchemaProperty, StateSchemaType, Task, + TaskStatus, VariableReference, Workflow, WorkflowRun, WorkflowStatus, +}; + +pub use butterflow_models::node::NodeType; +pub use butterflow_models::runtime::Runtime; +pub use butterflow_models::runtime::RuntimeType; +pub use butterflow_models::step::Step; +pub use butterflow_models::strategy::Strategy; +pub use butterflow_models::strategy::StrategyType; +pub use butterflow_models::template::Template; +pub use butterflow_models::trigger::Trigger; +pub use butterflow_models::trigger::TriggerType; diff --git a/crates/core/src/models/mod.rs b/crates/core/src/models/mod.rs new file mode 100644 index 000000000..925fa16f7 --- /dev/null +++ b/crates/core/src/models/mod.rs @@ -0,0 +1,21 @@ +mod node; +mod step; +mod task; +mod template; +mod workflow; +mod runtime; +mod strategy; +mod trigger; +mod state; +mod variable; + +pub use node::*; +pub use step::*; +pub use task::*; +pub use template::*; +pub use workflow::*; +pub use runtime::*; +pub use strategy::*; +pub use trigger::*; +pub use state::*; +pub use variable::*; diff --git a/crates/core/src/models/node.rs b/crates/core/src/models/node.rs new file mode 100644 index 000000000..e62367f31 --- /dev/null +++ b/crates/core/src/models/node.rs @@ -0,0 +1,60 @@ +use std::collections::HashMap; +use serde::{Deserialize, Serialize}; + +use super::{Step, Runtime, Strategy, Trigger}; + +/// Type of node +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum NodeType { + /// Automatic node (runs when dependencies are satisfied) + Automatic, + + /// Manual node (requires explicit triggering) + Manual, +} + +/// Represents a node in a workflow +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Node { + /// Unique identifier for the node + pub id: String, + + /// Human-readable name + pub name: String, + + /// Detailed description of what the node does + #[serde(default)] + pub description: Option, + + /// Type of node (automatic or manual) + #[serde(default = "default_node_type")] + pub r#type: NodeType, + + /// IDs of nodes that must complete before this node can run + #[serde(default)] + pub depends_on: Vec, + + /// Configuration for how the node is triggered + #[serde(default)] + pub trigger: Option, + + /// Configuration for running multiple instances of this node + #[serde(default)] + pub strategy: Option, + + /// Container runtime configuration + #[serde(default)] + pub runtime: Option, + + /// Steps to execute within the node + pub steps: Vec, + + /// Environment variables to inject into the container + #[serde(default)] + pub env: HashMap, +} + +fn default_node_type() -> NodeType { + NodeType::Automatic +} diff --git a/crates/core/src/models/runtime.rs b/crates/core/src/models/runtime.rs new file mode 100644 index 000000000..d5cdd90b3 --- /dev/null +++ b/crates/core/src/models/runtime.rs @@ -0,0 +1,42 @@ +use serde::{Deserialize, Serialize}; + +/// Type of runtime +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum RuntimeType { + /// Direct execution on the host + Direct, + + /// Docker container execution + Docker, + + /// Podman container execution + Podman, +} + +/// Represents a runtime configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Runtime { + /// Type of runtime + pub r#type: RuntimeType, + + /// Container image (for Docker and Podman) + #[serde(default)] + pub image: Option, + + /// Working directory inside the container + #[serde(default)] + pub working_dir: Option, + + /// User to run as inside the container + #[serde(default)] + pub user: Option, + + /// Network mode for the container + #[serde(default)] + pub network: Option, + + /// Additional container options + #[serde(default)] + pub options: Option>, +} diff --git a/crates/core/src/models/state.rs b/crates/core/src/models/state.rs new file mode 100644 index 000000000..b6b8c6127 --- /dev/null +++ b/crates/core/src/models/state.rs @@ -0,0 +1,62 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Type of state schema property +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum StateSchemaType { + /// Array type + Array, + + /// Object type + Object, + + /// String type + String, + + /// Number type + Number, + + /// Boolean type + Boolean, +} + +/// Represents a state schema property +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateSchemaProperty { + /// Type of the property + pub r#type: StateSchemaType, + + /// Description of the property + #[serde(default)] + pub description: Option, +} + +/// Represents a state schema definition +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateSchema { + /// Name of the state schema + pub name: String, + + /// Type of the state schema + pub r#type: StateSchemaType, + + /// For array types, the schema of the items + #[serde(default)] + pub items: Option>, + + /// Description of the state schema + #[serde(default)] + pub description: Option, +} + +/// Represents the schema for items in an array +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateSchemaItems { + /// Type of the items + pub r#type: StateSchemaType, + + /// For object types, the properties of the object + #[serde(default)] + pub properties: Option>, +} diff --git a/crates/core/src/models/step.rs b/crates/core/src/models/step.rs new file mode 100644 index 000000000..cc0832dd7 --- /dev/null +++ b/crates/core/src/models/step.rs @@ -0,0 +1,39 @@ +use std::collections::HashMap; +use serde::{Deserialize, Serialize}; + +/// Represents a step in a node +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Step { + /// Unique identifier for the step + pub id: String, + + /// Human-readable name + pub name: String, + + /// Detailed description of what the step does + #[serde(default)] + pub description: Option, + + /// Template to use for this step + #[serde(default)] + pub uses: Option>, + + /// Commands to run + #[serde(default)] + pub commands: Option>, + + /// Environment variables specific to this step + #[serde(default)] + pub env: Option>, +} + +/// Represents a template use in a step +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TemplateUse { + /// Template ID to use + pub template: String, + + /// Inputs to pass to the template + #[serde(default)] + pub inputs: HashMap, +} diff --git a/crates/core/src/models/strategy.rs b/crates/core/src/models/strategy.rs new file mode 100644 index 000000000..0cf7b9a99 --- /dev/null +++ b/crates/core/src/models/strategy.rs @@ -0,0 +1,25 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Type of strategy +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum StrategyType { + /// Matrix strategy (run multiple instances with different inputs) + Matrix, +} + +/// Represents a strategy configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Strategy { + /// Type of strategy + pub r#type: StrategyType, + + /// Matrix values (for matrix strategy) + #[serde(default)] + pub values: Option>>, + + /// State key to get matrix values from (for matrix strategy) + #[serde(default)] + pub from_state: Option, +} diff --git a/crates/core/src/models/task.rs b/crates/core/src/models/task.rs new file mode 100644 index 000000000..396875a47 --- /dev/null +++ b/crates/core/src/models/task.rs @@ -0,0 +1,131 @@ +use std::collections::HashMap; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use chrono::{DateTime, Utc}; + +/// Status of a task +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum TaskStatus { + /// Task hasn't started execution yet + Pending, + + /// Task is currently being executed + Running, + + /// Task has completed successfully + Completed, + + /// Task execution failed with an error + Failed, + + /// Task is waiting for a manual trigger + AwaitingTrigger, + + /// Task is blocked by dependencies + Blocked, + + /// Task will not be executed + WontDo, +} + +/// Represents a task (runtime instance of a node) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Task { + /// Unique identifier for the task + pub id: Uuid, + + /// ID of the workflow run this task belongs to + pub workflow_run_id: Uuid, + + /// ID of the node this task is an instance of + pub node_id: String, + + /// Current status of the task + pub status: TaskStatus, + + /// For matrix tasks, the master task ID + #[serde(default)] + pub master_task_id: Option, + + /// For matrix tasks, the matrix values + #[serde(default)] + pub matrix_values: Option>, + + /// Start time of the task + #[serde(default)] + pub started_at: Option>, + + /// End time of the task (if completed or failed) + #[serde(default)] + pub ended_at: Option>, + + /// Error message (if failed) + #[serde(default)] + pub error: Option, + + /// Outputs from the task + #[serde(default)] + pub outputs: HashMap, + + /// Logs from the task + #[serde(default)] + pub logs: Vec, +} + +impl Task { + /// Create a new task + pub fn new(workflow_run_id: Uuid, node_id: String) -> Self { + Self { + id: Uuid::new_v4(), + workflow_run_id, + node_id, + status: TaskStatus::Pending, + master_task_id: None, + matrix_values: None, + started_at: None, + ended_at: None, + error: None, + outputs: HashMap::new(), + logs: Vec::new(), + } + } + + /// Create a new matrix task + pub fn new_matrix( + workflow_run_id: Uuid, + node_id: String, + master_task_id: Uuid, + matrix_values: HashMap, + ) -> Self { + Self { + id: Uuid::new_v4(), + workflow_run_id, + node_id, + status: TaskStatus::Pending, + master_task_id: Some(master_task_id), + matrix_values: Some(matrix_values), + started_at: None, + ended_at: None, + error: None, + outputs: HashMap::new(), + logs: Vec::new(), + } + } + + /// Create a new master task for a matrix + pub fn new_matrix_master(workflow_run_id: Uuid, node_id: String) -> Self { + Self { + id: Uuid::new_v4(), + workflow_run_id, + node_id, + status: TaskStatus::Pending, + master_task_id: None, + matrix_values: None, + started_at: None, + ended_at: None, + error: None, + outputs: HashMap::new(), + logs: Vec::new(), + } + } +} diff --git a/crates/core/src/models/template.rs b/crates/core/src/models/template.rs new file mode 100644 index 000000000..fb557487b --- /dev/null +++ b/crates/core/src/models/template.rs @@ -0,0 +1,78 @@ +use std::collections::HashMap; +use serde::{Deserialize, Serialize}; + +use super::{Step, Runtime}; + +/// Represents a template input +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TemplateInput { + /// Name of the input + pub name: String, + + /// Type of the input (string, number, boolean) + #[serde(default = "default_input_type")] + pub r#type: String, + + /// Whether the input is required + #[serde(default)] + pub required: bool, + + /// Description of the input + #[serde(default)] + pub description: Option, + + /// Default value for the input + #[serde(default)] + pub default: Option, +} + +/// Represents a template output +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TemplateOutput { + /// Name of the output + pub name: String, + + /// Value of the output + pub value: String, + + /// Description of the output + #[serde(default)] + pub description: Option, +} + +/// Represents a reusable template +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Template { + /// Unique identifier for the template + pub id: String, + + /// Human-readable name + pub name: String, + + /// Detailed description of what the template does + #[serde(default)] + pub description: Option, + + /// Container runtime configuration + #[serde(default)] + pub runtime: Option, + + /// Inputs for the template + #[serde(default)] + pub inputs: Vec, + + /// Steps to execute within the template + pub steps: Vec, + + /// Outputs from the template + #[serde(default)] + pub outputs: Vec, + + /// Environment variables to inject into the container + #[serde(default)] + pub env: HashMap, +} + +fn default_input_type() -> String { + "string".to_string() +} diff --git a/crates/core/src/models/trigger.rs b/crates/core/src/models/trigger.rs new file mode 100644 index 000000000..12cd20a7b --- /dev/null +++ b/crates/core/src/models/trigger.rs @@ -0,0 +1,27 @@ +use serde::{Deserialize, Serialize}; + +/// Type of trigger +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum TriggerType { + /// Automatic trigger (runs when dependencies are satisfied) + Automatic, + + /// Manual trigger (requires explicit triggering) + Manual, +} + +/// Represents a trigger configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Trigger { + /// Type of trigger + pub r#type: TriggerType, +} + +impl Default for Trigger { + fn default() -> Self { + Self { + r#type: TriggerType::Automatic, + } + } +} diff --git a/crates/core/src/models/variable.rs b/crates/core/src/models/variable.rs new file mode 100644 index 000000000..57a39b3dc --- /dev/null +++ b/crates/core/src/models/variable.rs @@ -0,0 +1,209 @@ +use regex::Regex; +use std::collections::HashMap; +use serde_json::Value; + +use crate::Result; +use crate::error::Error; + +/// Represents a variable reference +#[derive(Debug, Clone)] +pub struct VariableReference { + /// Full variable reference (e.g., "${params.repo_url}") + pub full_reference: String, + + /// Variable type (params, env, state, tasks) + pub var_type: String, + + /// Variable name + pub name: String, + + /// For task outputs, the task ID + pub task_id: Option, + + /// For task outputs, the output name + pub output_name: Option, +} + +impl VariableReference { + /// Parse a variable reference from a string + pub fn parse(reference: &str) -> Result { + // Match ${...} pattern + let re = Regex::new(r"\$\{([^}]+)\}").unwrap(); + let captures = re.captures(reference).ok_or_else(|| { + Error::VariableResolution(format!("Invalid variable reference: {}", reference)) + })?; + + let inner = captures.get(1).unwrap().as_str(); + + // Parse the variable type and name + if inner.starts_with("params.") { + Ok(Self { + full_reference: reference.to_string(), + var_type: "params".to_string(), + name: inner["params.".len()..].to_string(), + task_id: None, + output_name: None, + }) + } else if inner.starts_with("env.") { + Ok(Self { + full_reference: reference.to_string(), + var_type: "env".to_string(), + name: inner["env.".len()..].to_string(), + task_id: None, + output_name: None, + }) + } else if inner.starts_with("state.") { + Ok(Self { + full_reference: reference.to_string(), + var_type: "state".to_string(), + name: inner["state.".len()..].to_string(), + task_id: None, + output_name: None, + }) + } else if inner.starts_with("tasks.") { + // Parse tasks.node_id.outputs.name + let parts: Vec<&str> = inner.split('.').collect(); + if parts.len() != 4 || parts[2] != "outputs" { + return Err(Error::VariableResolution(format!( + "Invalid task output reference: {}", + reference + ))); + } + + Ok(Self { + full_reference: reference.to_string(), + var_type: "tasks".to_string(), + name: parts[3].to_string(), + task_id: Some(parts[1].to_string()), + output_name: Some(parts[3].to_string()), + }) + } else { + Err(Error::VariableResolution(format!( + "Unknown variable type: {}", + reference + ))) + } + } +} + +/// Resolve variables in a string +pub fn resolve_variables( + input: &str, + params: &HashMap, + env: &HashMap, + state: &HashMap, + task_outputs: &HashMap>, + matrix_values: Option<&HashMap>, +) -> Result { + let re = Regex::new(r"\$\{([^}]+)\}").unwrap(); + let mut result = input.to_string(); + + for captures in re.captures_iter(input) { + let full_match = captures.get(0).unwrap().as_str(); + let inner = captures.get(1).unwrap().as_str(); + + let replacement = if let Some(matrix_values) = matrix_values { + // First check if it's a direct matrix value + if matrix_values.contains_key(inner) { + matrix_values.get(inner).unwrap().clone() + } else if inner.starts_with("params.") { + let name = &inner["params.".len()..]; + params.get(name).cloned().ok_or_else(|| { + Error::VariableResolution(format!("Parameter not found: {}", name)) + })? + } else if inner.starts_with("env.") { + let name = &inner["env.".len()..]; + env.get(name).cloned().ok_or_else(|| { + Error::VariableResolution(format!("Environment variable not found: {}", name)) + })? + } else if inner.starts_with("state.") { + let name = &inner["state.".len()..]; + let value = state.get(name).ok_or_else(|| { + Error::VariableResolution(format!("State value not found: {}", name)) + })?; + value.to_string() + } else if inner.starts_with("tasks.") { + let parts: Vec<&str> = inner.split('.').collect(); + if parts.len() != 4 || parts[2] != "outputs" { + return Err(Error::VariableResolution(format!( + "Invalid task output reference: {}", + full_match + ))); + } + + let task_id = parts[1]; + let output_name = parts[3]; + + let task_output = task_outputs.get(task_id).ok_or_else(|| { + Error::VariableResolution(format!("Task not found: {}", task_id)) + })?; + + let output = task_output.get(output_name).ok_or_else(|| { + Error::VariableResolution(format!( + "Output not found: {} in task {}", + output_name, task_id + )) + })?; + + output.to_string() + } else { + return Err(Error::VariableResolution(format!( + "Unknown variable type: {}", + inner + ))); + } + } else { + // No matrix values, resolve normally + if inner.starts_with("params.") { + let name = &inner["params.".len()..]; + params.get(name).cloned().ok_or_else(|| { + Error::VariableResolution(format!("Parameter not found: {}", name)) + })? + } else if inner.starts_with("env.") { + let name = &inner["env.".len()..]; + env.get(name).cloned().ok_or_else(|| { + Error::VariableResolution(format!("Environment variable not found: {}", name)) + })? + } else if inner.starts_with("state.") { + let name = &inner["state.".len()..]; + let value = state.get(name).ok_or_else(|| { + Error::VariableResolution(format!("State value not found: {}", name)) + })?; + value.to_string() + } else if inner.starts_with("tasks.") { + let parts: Vec<&str> = inner.split('.').collect(); + if parts.len() != 4 || parts[2] != "outputs" { + return Err(Error::VariableResolution(format!( + "Invalid task output reference: {}", + full_match + ))); + } + + let task_id = parts[1]; + let output_name = parts[3]; + + let task_output = task_outputs.get(task_id).ok_or_else(|| { + Error::VariableResolution(format!("Task not found: {}", task_id)) + })?; + + let output = task_output.get(output_name).ok_or_else(|| { + Error::VariableResolution(format!( + "Output not found: {} in task {}", + output_name, task_id + )) + })?; + + output.to_string() + } else { + return Err(Error::VariableResolution(format!( + "Unknown variable type: {}", + inner + ))); + } + }; + + result = result.replace(full_match, &replacement); + } + + Ok(result) +} diff --git a/crates/core/src/models/workflow.rs b/crates/core/src/models/workflow.rs new file mode 100644 index 000000000..fc3b0b35c --- /dev/null +++ b/crates/core/src/models/workflow.rs @@ -0,0 +1,90 @@ +use std::collections::HashMap; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use chrono::{DateTime, Utc}; + +use super::{Node, Template, StateSchema}; + +/// Represents a workflow definition +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Workflow { + /// Version of the workflow format + pub version: String, + + /// State schema definition + #[serde(default)] + pub state: Option, + + /// Templates for reusable components + #[serde(default)] + pub templates: Vec