A lightweight and flexible JavaScript library for creating and managing workflows. Adopted Pocket Flow design.
qflow allows you to define complex workflows as a series of connected nodes. Each node represents a step in the process, and you can control the flow of execution based on the outcome of each step.
-
Node
: The fundamental building block of a qflow. ANode
is a single step in your workflow. It has three main lifecycle methods:prep(shared)
: Prepare data for theexec
method. Theshared
object is passed through the entire flow.exec(prepRes)
: The main logic of the node. It receives the result from theprep
method.post(shared, prepRes, execRes)
: Clean up or process the results after theexec
method. The return value of this method determines the next action in the flow.
-
Flow
: AFlow
manages the execution of a series of connectedNode
s. You define the starting node and the transitions between nodes.
- Synchronous and Asynchronous Execution: qflow provides both synchronous and asynchronous versions of
Node
s andFlow
s, allowing you to handle both CPU-bound and I/O-bound tasks efficiently. - Batch Processing: The
BatchFlow
andAsyncBatchFlow
classes allow you to process arrays of items in a single workflow. - Parallel Execution: The
AsyncParallelBatchNode
andAsyncParallelBatchFlow
classes enable you to execute asynchronous operations in parallel for maximum efficiency. - Retries and Error Handling: The
Node
class has built-in support for retries with a configurable delay. You can also define aexecFallback
method to handle cases where all retries fail. - Conditional Transitions: You can define multiple successors for a node and choose which one to transition to based on the return value of the
exec
orpost
method.
Each Node
in qflow represents a distinct step in your workflow. Nodes are designed with a clear lifecycle to manage their execution:
prep(shared)
: This method is called beforeexec
. It's ideal for preparing any data or resources needed for the node's main logic. Theshared
object, which is a mutable object passed throughout the entire flow, can be used here to access or store global state. The return value ofprep
is passed asprepRes
to theexec
method.exec(prepRes)
: This is the core logic of your node. It receives the result from theprep
method (prepRes
). Theexec
method should perform the primary task of the node. The return value ofexec
is crucial for controlling the flow's path. It can be a string representing an "action" (e.g., "success", "failure", "long", "short") that theFlow
uses to determine the next node.post(shared, prepRes, execRes)
: This method is called afterexec
completes. It's suitable for cleanup, logging, or further processing of theexec
result. It receives theshared
object, theprepRes
, and theexecRes
. The return value ofpost
also dictates the flow's path, similar toexec
. If bothexec
andpost
return a value, thepost
method's return value takes precedence for determining the next transition.
A Flow
acts as the orchestrator for your nodes. It defines the sequence of execution and manages transitions between nodes.
- Starting a Flow: You initialize a
Flow
with astartNode
. This is the first node that will be executed when the flow runs. - Transitions: Nodes are connected using the
node.next(nextNode, action)
method. When a node'sexec
orpost
method returns anaction
string, theFlow
looks for a successor node registered with that specific action. If no action is specified (or the method returnsundefined
), the "default" action is used. This allows for powerful conditional branching in your workflows.
qflow provides two primary mechanisms for data management within a flow:
-
Node-Specific Parameters (
this.params
): Each node instance has athis.params
object. This object is populated by theFlow
when it callsnode.setParams(params)
. This is the recommended way to pass data into a specific node for its execution. ForBatchFlow
andAsyncBatchFlow
, theprep
(orprepAsync
) method should return an array of objects, where each object contains theparams
for a single batch item.// Example of setting params for a node class MyNode extends Node { exec() { console.log(this.params.myValue); } } const myNode = new MyNode(); // When running a flow, the flow will call myNode.setParams({ myValue: 'hello' });
-
Shared State (
shared
object): Theshared
object is a mutable object that is passed to theprep
,exec
, andpost
methods of every node in the flow. This object is ideal for maintaining global state or data that needs to be accessible and modifiable across multiple nodes in the workflow. Any changes made to theshared
object by one node will be visible to subsequent nodes.
- Return Values for Transitions: The string returned by a node's
exec
orpost
method is critical for defining the flow's path. If a node returns "success", the flow will look for a successor registered with the "success" action. If no action is returned, the "default" action is assumed. - Error Handling: Nodes can throw errors. The
Node
class providesmaxRetries
andwait
parameters for automatic retries. If all retries fail, theexecFallback
method is called. - Asynchronous Operations: For asynchronous operations, use
AsyncNode
andAsyncFlow
. Their lifecycle methods (prepAsync
,execAsync
,postAsync
) areasync
functions, allowing you to useawait
for non-blocking operations.
The base class for all nodes.
constructor()
setParams(params)
: Sets the parameters for the node.next(node, action = "default")
: Defines the next node in the flow.prep(shared)
: Pre-execution logic.exec(prepRes)
: Execution logic.post(shared, prepRes, execRes)
: Post-execution logic. The return value of this method is used to determine the next node in the flow.run(shared)
: Runs the node.
Extends BaseNode
with retry logic.
constructor(maxRetries = 1, wait = 0)
:maxRetries
is the number of times to retry on failure, andwait
is the delay in seconds between retries.execFallback(prepRes, error)
: Logic to execute if all retries fail.
Manages a workflow of nodes.
constructor(start = null)
:start
is the starting node of the flow.start(startNode)
: Sets the starting node.run(shared)
: Runs the entire flow.
Extends Flow
for batch processing. The prep
method should return an array of objects, where each object contains the parameters for a single execution of the flow.
qflow provides asynchronous versions of the core classes:
AsyncNode
AsyncFlow
AsyncBatchFlow
AsyncParallelBatchFlow
These classes have ...Async
versions of the lifecycle methods (e.g., prepAsync
, execAsync
, postAsync
) and support async/await
. The AsyncFlow
class has a runAsync
method to execute the flow.
import { Node, Flow } from './qflow.js';
// A node that prints a message
class MessageNode extends Node {
exec() {
console.log("Hello from MessageNode!");
return 'default'; // Transition to the default successor
}
}
// A node that prints the current time
class TimeNode extends Node {
exec() {
console.log(`Current time: ${Date.now()}`);
return 'default';
}
}
// Create the nodes
const messageNode = new MessageNode();
const timeNode = new TimeNode();
// Define the workflow
messageNode.next(timeNode); // After messageNode, go to timeNode
// Create and run the flow
const flow = new Flow(messageNode);
flow.run({});
import { Node, BatchFlow } from './qflow.js';
class MyBatchNode extends Node {
exec() {
console.log(`BatchNode: Processing item ${this.params.item}`);
return 'default';
}
}
const batchNode = new MyBatchNode();
const batchFlow = new BatchFlow(batchNode);
// The prep method returns an array of parameter objects for the batch.
batchFlow.prep = () => [ { item: 1 }, { item: 2 }, { item: 3 } ];
batchFlow.run({});
import { AsyncNode, AsyncParallelBatchFlow } from './qflow.js';
class MyAsyncParallelBatchNode extends AsyncNode {
async execAsync() {
console.log(`AsyncParallelBatchNode: Starting item ${this.params.item}`);
await new Promise(resolve => setTimeout(resolve, Math.random() * 100));
console.log(`AsyncParallelBatchNode: Finished item ${this.params.item}`);
return 'default';
}
}
const asyncParallelBatchNode = new MyAsyncParallelBatchNode();
const asyncParallelBatchFlow = new AsyncParallelBatchFlow(asyncParallelBatchNode);
// The prepAsync method returns an array of parameter objects for the batch.
asyncParallelBatchFlow.prepAsync = async () => [ { item: 1 }, { item: 2 }, { item: 3 }, { item: 4 }, { item: 5 } ];
await asyncParallelBatchFlow.runAsync({});
qflow's flexible node structure makes it straightforward to integrate with external APIs, including Large Language Models (LLMs). By creating custom AsyncNode
implementations, you can easily add capabilities like text generation, summarization, and more to your workflows.
Below are examples of how you might create nodes for popular LLM providers. Remember to handle API keys securely (e.g., using environment variables) and to include necessary fetch
polyfills or libraries if running in environments that don't natively support fetch
(like Node.js without a recent version or a polyfill).
This example demonstrates an AsyncNode
that interacts with the OpenAI Chat Completions API.
import { AsyncNode } from './qflow.js';
class OpenAILLMNode extends AsyncNode {
async execAsync() {
const { prompt, apiKey } = this.params; // prompt and apiKey passed via node params
if (!prompt) {
throw new Error('Prompt is required for OpenAILLMNode.');
}
if (!apiKey) {
throw new Error('OpenAI API Key is required.');
}
console.log(`OpenAILLMNode: Sending prompt to OpenAI: "${prompt.substring(0, 50)}..."`);
try {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`
},
body: JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: prompt }],
max_tokens: 150
})
});
if (!response.ok) {
const errorData = await response.json();
throw new Error(`OpenAI API error: ${response.status} - ${errorData.error.message}`);
}
const data = await response.json();
const llmResponse = data.choices[0].message.content.trim();
console.log(`OpenAILLMNode: Received response: "${llmResponse.substring(0, 50)}..."`);
return llmResponse; // Return the LLM's response
} catch (error) {
console.error('OpenAILLMNode: Error during API call:', error);
throw error; // Re-throw to trigger qflow's retry/fallback
}
}
}
// Example Usage in a Flow:
import { AsyncFlow } from './qflow.js';
const openAILLMNode = new OpenAILLMNode();
// Pass prompt and API key via params
openAILLMNode.setParams({
prompt: 'Write a short, creative slogan for a new coffee shop.',
apiKey: process.env.OPENAI_API_KEY // Load from environment variable
});
const llmFlow = new AsyncFlow(openAILLMNode);
const llmResult = await llmFlow.runAsync({});
console.log('LLM Flow Result:', llmResult);
This example demonstrates an AsyncNode
that interacts with the Google Gemini API.
import { AsyncNode } from './qflow.js';
class GeminiLLMNode extends AsyncNode {
async execAsync() {
const { prompt, apiKey } = this.params; // prompt and apiKey passed via node params
if (!prompt) {
throw new Error('Prompt is required for GeminiLLMNode.');
}
if (!apiKey) {
throw new Error('Google Gemini API Key is required.');
}
console.log(`GeminiLLMNode: Sending prompt to Gemini: "${prompt.substring(0, 50)}..."`);
try {
const response = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent?key=${apiKey}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
contents: [{ parts: [{ text: prompt }] }]
})
});
if (!response.ok) {
const errorData = await response.json();
throw new Error(`Gemini API error: ${response.status} - ${errorData.error.message}`);
}
const data = await response.json();
const llmResponse = data.candidates[0].content.parts[0].text.trim();
console.log(`GeminiLLMNode: Received response: "${llmResponse.substring(0, 50)}..."`);
return llmResponse; // Return the LLM's response
} catch (error) {
console.error('GeminiLLMNode: Error during API call:', error);
throw error; // Re-throw to trigger qflow's retry/fallback
}
}
}
// Example Usage in a Flow:
import { AsyncFlow } from './qflow.js';
const geminiLLMNode = new GeminiLLMNode();
// Pass prompt and API key via params
geminiLLMNode.setParams({
prompt: 'Generate a short, catchy headline for a tech blog post about AI in healthcare.',
apiKey: process.env.GEMINI_API_KEY // Load from environment variable
});
const llmFlow = new AsyncFlow(geminiLLMNode);
const llmResult = await llmFlow.runAsync({});
console.log('LLM Flow Result:', llmResult);
One of the powerful features of qflow is the ability to chain nodes, allowing the output of one node to become the input for the next. This is particularly useful for multi-step LLM interactions, such as generating an initial response and then refining or expanding upon it.
This example demonstrates a flow where an OpenAI LLM generates a slogan, and then a second OpenAI LLM expands on that slogan.
import { AsyncNode, AsyncFlow } from './qflow.js';
// Re-using the OpenAILLMNode defined above
// class OpenAILLMNode extends AsyncNode { ... }
class SloganExpansionNode extends AsyncNode {
async execAsync() {
const { slogan, apiKey } = this.params; // slogan from previous node, apiKey
if (!slogan) {
throw new Error('Slogan is required for SloganExpansionNode.');
}
if (!apiKey) {
throw new Error('OpenAI API Key is required.');
}
const prompt = `Expand on the following coffee shop slogan: "${slogan}"`;
console.log(`SloganExpansionNode: Sending prompt to OpenAI: "${prompt.substring(0, 50)}..."`);
try {
const response = await fetch('https://api.openai.com/v1/chat/complements', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`
},
body: JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: prompt }],
max_tokens: 200
})
});
if (!response.ok) {
const errorData = await response.json();
throw new Error(`OpenAI API error: ${response.status} - ${errorData.error.message}`);
}
const data = await response.json();
const expandedSlogan = data.choices[0].message.content.trim();
console.log(`SloganExpansionNode: Received response: "${expandedSlogan.substring(0, 50)}..."`);
return expandedSlogan; // Return the expanded slogan
} catch (error) {
console.error('SloganExpansionNode: Error during API call:', error);
throw error; // Re-throw to trigger qflow's retry/fallback
}
}
}
// Example Usage in a Chained Flow:
import { AsyncFlow } from './qflow.js';
const apiKey = process.env.OPENAI_API_KEY; // Load from environment variable
const sloganGenerator = new OpenAILLMNode();
sloganGenerator.setParams({
prompt: 'Generate a short, creative slogan for a new coffee shop.',
apiKey: apiKey
});
const sloganExpander = new SloganExpansionNode();
sloganExpander.setParams({
apiKey: apiKey
});
// Chain the nodes: output of sloganGenerator becomes input for sloganExpander
sloganGenerator.next(sloganExpander);
const chainedLLMFlow = new AsyncFlow(sloganGenerator);
const finalResult = await chainedLLMFlow.runAsync({});
console.log('Chained LLM Flow Final Result:', finalResult);
For more detailed usage examples and advanced workflows, refer to the following test files:
- test.js - Basic flow and node usage.
- test2.js - Demonstrates async flows, conditional transitions, retries, and batch processing.
- test3.js - Advanced workflow integrating with a public API and chaining LLM-like responses.
- test4.js - Automated Topic-specific article writing workflow pipeline.
Since qflow is a single file, you can simply import it into your project.
import { Node, Flow } from './qflow.js';
Contributions are welcome! Please feel free to submit a pull request.