ai-agent-flow is a TypeScript-based Node.js framework designed for building intelligent, modular, and observable workflows for AI agents. It helps you compose systems using simple, pluggable components with built-in AI capabilities.
- π Composable Node-based Flows: Build AI workflows using LLM, decision, batch, and custom logic nodes
- π§ AI-first Architecture: Native OpenAI support with persistent prompt history
- π± Multi-agent Messaging: Event-driven agent communication via
MessageBus
- π Built-in Observability: Winston logging and Prometheus-compatible metrics
- π Extensible Plugin System: Add new nodes, providers, and context stores
- π Typed and Robust: Full TypeScript support with retries, error handling, and shared context
npm install ai-agent-flow
-
Nodes: The smallest executable units in your workflow
ActionNode
: Simple function-based nodes for quick tasksLLMNode
: AI model interactions (OpenAI, etc.)- Custom nodes: Extend the
Node
class for specific needs
-
Flows: Connect nodes with action-based transitions
-
Context: Shared memory between nodes
-
Runner: Executes flows with retry capabilities
import { Flow, Runner, InMemoryContextStore } from 'ai-agent-flow';
import { ActionNode } from 'ai-agent-flow/nodes/action';
import type { ChatCompletionMessageParam } from 'openai/resources/chat/completions';
// Create nodes
const greetNode = new ActionNode('greet', async () => 'Hello, World!');
const timeNode = new ActionNode('time', async () => new Date().toISOString());
// Create flow
const flow = new Flow('demo')
.addNode(greetNode)
.addNode(timeNode)
.setStartNode('greet')
.addTransition('greet', { action: 'default', to: 'time' });
// Run flow
const context = {
conversationHistory: [] as ChatCompletionMessageParam[],
data: {},
metadata: {},
};
const store = new InMemoryContextStore();
const result = await new Runner(3, 1000, store).runFlow(flow, context, 'demo');
console.log(result); // { type: 'success', output: '2024-03-20T...' }
flowchart TD
A[greetNode] -->|default| B[timeNode]
B -->|default| C[End]
Use a ContextStore
to save and resume flows. Here we reuse the memory store:
const store = new InMemoryContextStore();
const runner = new Runner(3, 1000, store);
// first run
await runner.runFlow(flow, context, 'demo');
// later resume using the same id
await runner.runFlow(flow, {}, 'demo');
Execute several flows at once and receive a map of results:
const runner = new Runner();
const results = await runner.runAgentFlows(
[flowA, flowB],
{ [flowA.getId()]: ctxA, [flowB.getId()]: ctxB },
true,
);
Extend Runner
by providing plugin objects or paths when it is constructed. A plugin is a module exporting { name, setup(runner) }
.
// logger-plugin.js
module.exports = {
default: {
name: 'logger',
setup(runner) {
runner.onUpdate((u) => console.log(u));
},
},
};
Load plugins using a file path or directory:
import path from 'node:path';
import { Runner } from 'ai-agent-flow';
const runner = new Runner(3, 1000, undefined, [
path.join(__dirname, 'logger-plugin.js'),
path.join(__dirname, 'plugins'), // directory of plugins
]);
The ActionNode
class provides a simple way to create nodes from async functions:
import { ActionNode } from 'ai-agent-flow/nodes/action';
// Simple action
const simpleNode = new ActionNode('simple', async () => 'result');
// With context
const contextNode = new ActionNode('withContext', async (context) => {
const { data } = context;
return `Processed ${data.item}`;
});
// With error handling
const safeNode = new ActionNode('safe', async () => {
try {
return await someOperation();
} catch (error) {
throw new Error('Operation failed');
}
});
The LLMNode
class provides AI model interactions. Construct it with
{ model?: string; messages: (ctx: Context) => ChatCompletionMessageParam[] }
.
The model
field defaults to "gpt-3.5-turbo"
:
import { LLMNode } from 'ai-agent-flow/nodes/llm';
const chatNode = new LLMNode('chat', {
model: 'gpt-3.5-turbo',
messages: (context) => [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: context.data.userInput },
],
});
The BatchNode
class processes multiple items in parallel:
import { BatchNode } from 'ai-agent-flow/nodes/batch';
const processItemsNode = new BatchNode('process-items', async (items, context) => {
const results = await Promise.all(
items.map(async (item) => {
return { type: 'success', output: `Processed ${item}` };
}),
);
return { type: 'success', output: results };
});
The DecisionNode
class makes decisions based on context:
import { DecisionNode } from 'ai-agent-flow/nodes/decision';
const decideNode = new DecisionNode('decide', async (context) => {
if (context.data.shouldContinue) {
return { type: 'success', action: 'continue' };
}
return { type: 'success', action: 'stop' };
});
HttpNode
performs HTTP requests during a flow. Provide the request url
,
optional method
, headers and body. Values can be static or derived from the
current context.
import { HttpNode } from 'ai-agent-flow/nodes/http';
const fetchData = new HttpNode('fetch-data', {
url: 'https://api.example.com/data',
});
The MessageBus
enables event-driven communication between agents. It allows agents to send and subscribe to messages asynchronously.
import { MessageBus } from 'ai-agent-flow/utils/message-bus';
// Create a new MessageBus instance
const bus = new MessageBus();
// Subscribe agentB to receive messages
bus.subscribe('agentB', (senderId, message) => {
console.log(`${senderId} says: ${message}`);
});
// Send a message from agentA to agentB
bus.send('agentA', 'agentB', 'Hello');
// Output: agentA says: Hello
This is particularly useful for multi-agent systems where agents need to communicate asynchronously.
For single-process apps you can rely on the in-memory MessageBus
. To enable cross-process messaging, switch to the RedisMessageBus
:
import { RedisMessageBus } from 'ai-agent-flow/utils/redis-message-bus';
const bus = new RedisMessageBus({ url: 'redis://localhost:6379' });
Both buses expose the same API so you can pick one based on your environment.
For detailed documentation, visit our API Documentation.
The framework uses a modular architecture with subpath exports for better code organization and tree-shaking:
// Core components
import { Flow, Runner } from 'ai-agent-flow';
// Node types
import { ActionNode } from 'ai-agent-flow/nodes/action';
import { BatchNode } from 'ai-agent-flow/nodes/batch';
import { DecisionNode } from 'ai-agent-flow/nodes/decision';
import { LLMNode } from 'ai-agent-flow/nodes/llm';
// Types
import { Context, NodeResult, Transition } from 'ai-agent-flow/types';
To use the subpath imports, make sure your tsconfig.json
includes:
{
"compilerOptions": {
"module": "NodeNext",
"moduleResolution": "NodeNext"
}
}
Contributions are welcome! Please read our Contributing Guide for details on our code of conduct and the process for submitting pull requests.
This project is licensed under the MIT License - see the LICENSE file for details.
npm test # Run all tests
npm run coverage # Generate coverage report
- Nodes API - Learn about the Node system and ActionNode
- Flow API - Understand Flow creation and management
- Runner API - Explore Flow execution and monitoring
- Complete API Reference - Full API documentation
- Plugin Guide - Write and load plugins
Check out our examples directory for complete working examples:
- Basic flows
- Error handling
- Data processing
- API integration
- Multi-step workflows
Generate docs locally:
npm run docs
Run a flow module directly from the command line:
npx aaflow run path/to/flow.ts
The module should export a flow
instance (and optionally a context
object):
// flow.ts
import { Flow, ActionNode } from 'ai-agent-flow';
import type { ChatCompletionMessageParam } from 'openai/resources/chat/completions';
export const flow = new Flow('hello')
.addNode(new ActionNode('hello', async () => 'Hi'))
.setStartNode('hello');
export const context = {
conversationHistory: [] as ChatCompletionMessageParam[],
data: {},
metadata: {},
};
Then run:
npx aaflow run ./flow.ts
Inspect a flow without executing it:
npx aaflow inspect ./flow.ts
import { Node, Context, NodeResult } from 'ai-agent-flow';
export class CustomNode extends Node {
constructor(id: string) {
super(id);
}
async execute(context: Context): Promise<NodeResult> {
try {
// Your custom logic here
return {
type: 'success',
output: 'result',
};
} catch (error) {
return {
type: 'error',
error: error instanceof Error ? error : new Error(String(error)),
};
}
}
}
Phase | Features |
---|---|
β Now | Core engine, ActionNode, observability |
πΈοΈ Short-term | LLMNode, CLI tool, more examples |
π§ Mid-term | Visual editor, plugin system |
ποΈ Long-term | Distributed agents, auto-routing |
git clone https://github.com/EunixTech/ai-agent-flow
npm install
npm test
We welcome all contributions β bug fixes, new nodes, documentation, examples π
MIT Β© 2025 Rajesh Dhiman
Open issues or reach out here:
π https://www.rajeshdhiman.in/contact
"Build agent flows like LEGO blocks β simple, powerful, and easy to debug."