A Swift port of the minimalist LLM workflow framework PocketFlow.
Lightweight: Just ~165 lines. Zero bloat, zero dependencies, zero vendor lock-in.
Expressive: Everything you love—Agents, Workflows, branching, and looping.
Agentic Coding: Let AI Agents build Agents—10x productivity boost!
PocketFlow-Swift is a Swift implementation inspired by the original PocketFlow Python framework. While the original focuses on LLM applications, this Swift port provides a general-purpose workflow orchestration framework that can be used for any asynchronous task coordination.
Add PocketFlow-Swift to your Package.swift
:
dependencies: [
.package(url: "https://github.com/phucledien/PocketFlow-Swift.git", from: "1.0.1")
]
Simply copy the Sources/PocketFlow/PocketFlow.swift
file into your project. It's only ~165 lines!
Create custom nodes by inheriting from BaseNode
:
import PocketFlow
final class ProcessData: BaseNode<String, [String]> {
override func prep(shared: inout Shared) async throws -> String {
// Preparation phase - setup input data
return "input_data"
}
override func exec(prep: String) async throws -> [String] {
// Execution phase - main logic
return prep.components(separatedBy: "_")
}
override func post(shared: inout Shared, prep: String, exec: [String]) async throws -> Action? {
// Post-processing phase - store results and determine next action
shared["processed_data"] = exec
return "success"
}
}
Connect nodes using operators:
let processData = ProcessData()
let saveResults = SaveResults()
let handleError = HandleError()
// Linear flow: processData -> saveResults
processData >>> saveResults
// Conditional flow: processData -> saveResults (on "success") or handleError (on "error")
processData <> "success" >>> saveResults
processData <> "error" >>> handleError
let flow = Flow(start: processData)
var shared = [String: Sendable]()
let result = try await flow.run(shared: &shared)
The building block of workflows. Every node has three phases:
prep
: Prepare input data from shared stateexec
: Execute the main logic (with automatic retry support)post
: Process results and determine the next action
Orchestrates the execution of connected nodes, maintaining shared state throughout the workflow.
>>>
: Sequential connection (default path)<>
: Conditional connection (specific action path)
final class StepA: BaseNode<String, String> {
override func prep(shared: inout Shared) async throws -> String {
return "input"
}
override func exec(prep: String) async throws -> String {
return "processed_\(prep)"
}
override func post(shared: inout Shared, prep: String, exec: String) async throws -> Action? {
shared["step_a_result"] = exec
return "default"
}
}
final class StepB: BaseNode<String, String> {
override func prep(shared: inout Shared) async throws -> String {
return shared["step_a_result"] as? String ?? ""
}
override func exec(prep: String) async throws -> String {
return "final_\(prep)"
}
override func post(shared: inout Shared, prep: String, exec: String) async throws -> Action? {
shared["final_result"] = exec
return nil // End of workflow
}
}
let stepA = StepA()
let stepB = StepB()
stepA >>> stepB
let flow = Flow(start: stepA)
var shared = [String: Sendable]()
_ = try await flow.run(shared: &shared)
final class DecisionNode: BaseNode<String, String> {
override func exec(prep: String) async throws -> String {
return prep
}
override func post(shared: inout Shared, prep: String, exec: String) async throws -> Action? {
// Return different actions based on logic
return exec.contains("success") ? "approve" : "reject"
}
}
let decision = DecisionNode()
let approveNode = ApproveNode()
let rejectNode = RejectNode()
decision <> "approve" >>> approveNode
decision <> "reject" >>> rejectNode
final class LoopNode: BaseNode<Int, Int> {
override func prep(shared: inout Shared) async throws -> Int {
return shared["counter"] as? Int ?? 0
}
override func exec(prep: Int) async throws -> Int {
return prep + 1
}
override func post(shared: inout Shared, prep: Int, exec: Int) async throws -> Action? {
shared["counter"] = exec
return exec >= 5 ? "done" : "continue"
}
}
let loop = LoopNode()
let done = DoneNode()
loop <> "continue" >>> loop // Self-loop
loop <> "done" >>> done
Configure automatic retries and wait periods:
let unreliableNode = UnreliableNode()
unreliableNode.maxRetries = 3
unreliableNode.waitInSeconds = 1
// Override execFallback for custom error handling
class UnreliableNode: BaseNode<String, String> {
override func execFallback(prep: String, error: any Error) async throws -> String {
return "fallback_result"
}
}
Pass data between nodes using the shared dictionary:
override func post(shared: inout Shared, prep: PrepType, exec: ExecType) async throws -> Action? {
shared["key"] = "value"
shared["results"] = computedResults
return "next_action"
}
Check out the test files for complete examples:
- Linear Flow: Sequential processing with state sharing
- Branching Flow: Conditional routing based on node results
- Looping Flow: Iterative processing with termination conditions
Feature | Original PocketFlow (Python) | PocketFlow-Swift |
---|---|---|
Purpose | LLM workflow orchestration | General async workflow orchestration |
Lines of Code | ~100 | ~165 |
Dependencies | Zero | Zero |
Type Safety | Runtime | Compile-time |
Concurrency | asyncio | Swift async/await |
Operators | >> , ~> |
>>> , <> |
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Original PocketFlow by The Pocket
- Inspired by the minimalist philosophy of the original framework
- Built for the Swift ecosystem with modern async/await support
- PocketFlow - Original Python implementation
- PocketFlow TypeScript - TypeScript port
- PocketFlow Java - Java port
- PocketFlow C++ - C++ port
- PocketFlow Go - Go port
Happy Flowing! 🚀