Status: π Production Ready
Python: 3.12+
Architecture: LangGraph + gRPC Microservices + Adapter Pattern
A modern local LLM agent framework with intelligent tool orchestration, conversation persistence, and clean architecture. Features an adapter-based design that bridges gRPC services to LangGraph workflows with built-in tool execution and context management.
ποΈ Architecture: See
ARCHITECTURE.mdfor system design details
οΏ½ Setup: SeeUI_SERVICE_SETUP.mdfor frontend configuration
- AgentServiceAdapter: Clean separation between gRPC and LangGraph core
- LLMClientWrapper: Adapts any LLM backend to LangGraph interface
- Tool Registry: Unified tool management with circuit breakers
- Thread-based Context: Conversation persistence with SQLite checkpointing
- web_search: Real-time web search via Serper API
- math_solver: Mathematical expression evaluation
- load_web_page: Web content extraction and analysis
- cpp_llm_inference: Native C++ LLM service integration
- Circuit breakers for fault-tolerant tool execution
- Conversation checkpointing with SQLite + WAL mode
- Thread-safe operation with proper connection management
- Comprehensive logging for debugging and monitoring
βββββββββββββββ
β UI Service β Next.js 14 + TypeScript
β (5000) β gRPC-js client with metadata
ββββββββ¬βββββββ
β
βΌ
βββββββββββββββββββ
β Agent Service β gRPC entry point
β (50054) β β’ Thread-ID extraction from metadata
ββββββββββ¬βββββββββ β’ AgentServiceAdapter orchestration
β
ββββββββββββββββββββββββββββ
βΌ βΌ
ββββββββββββββββββββ βββββββββββββββββββ
β Core Framework β β Tool Registry β
β β β β
β β’ StateGraph β β β’ web_search β
β β’ LangGraph β β β’ math_solver β
β β’ Checkpointing β β β’ web_loader β
β β’ LLMWrapper β β β’ cpp_llm β
ββββββββββ¬ββββββββββ ββββββββββ¬βββββββββ
β β
βΌ βΌ
ββββββββββββββββββββββ βββββββββββββββββββ
β LLM Service β β Chroma Service β
β (50051) β β (50052) β
β β β β
β β’ llama.cpp β β β’ Vector DB β
β β’ Qwen 2.5 0.5B β β β’ Embeddings β
ββββββββββββββββββββββ βββββββββββββββββββ
| Component | Purpose | Location |
|---|---|---|
| AgentServiceAdapter | Bridges gRPC β LangGraph workflow | agent_service/adapter.py |
| LLMClientWrapper | Adapts gRPC LLM to LangGraph interface | agent_service/llm_wrapper.py |
| StateGraph | Workflow orchestration with tool routing | core/graph.py |
| SqliteSaver | Conversation checkpointing | core/checkpointing.py |
| LocalToolRegistry | Tool registration with circuit breakers | tools/registry.py |
| gRPC Clients | Type-safe service communication | shared/clients/ |
- User Query β UI sends message with optional
thread-idmetadata - gRPC Gateway β Agent service extracts thread-id from metadata
- Adapter Layer β AgentServiceAdapter invokes StateGraph workflow
- LLM Decision β LLMClientWrapper queries llama.cpp for tool calls
- Tool Execution β Registry executes tools with circuit breaker protection
- Response β Workflow returns final answer with sources and context
- Persistence β SqliteSaver checkpoints conversation state
## Service Endpoints
| Service | Port | Health Check |
|---------|------|-------------|
| LLM Service | 50051 | `grpc_health_probe -addr:50051` |
| Chroma Service | 50052 | `grpc_health_probe -addr:50052` |
| Tool Service | 50053 | `grpc_health_probe -addr:50053` |
| Agent Service | 50054 | `grpc_health_probe -addr:50054` |
## Logical Flow Recipes
The orchestrator exposes predictable, typed tool interfaces so you can model flows visually or programmatically.
### Agent Decision Weights
1. **Prompt analysis** β the LLM response validator inspects JSON function calls first. If the payload includes `function_call`, the agent routes to the matching tool.
2. **Circuit breakers** β repeated failures trip the breaker and temporarily remove a tool from the candidate list (weights fall to zero).
3. **Context enrichment** β successful tool calls push documents into context, increasing subsequent LLM relevance scores.
4. **Native intents** β when the C++ service returns `intent_payload`, the agent biases towards native flows (e.g., scheduling) over external APIs.
These steps mirror an n8n/Node-RED graph: each tool is a node, the agent is the router, and output context is the equivalent of a data bucket for downstream nodes.
### Designing n8n-style Workflows
1. **Add an HTTP node** that POSTs to the Agent gRPC proxy (or the testing harness described below) with the `user_query`.
2. **Parse the agent response** (JSON) using a Function node. The `sources.tools_used` field indicates which microservices executed.
3. **Branch on `intent_payload`** to trigger follow-up nodesβe.g., send a confirmation email if the payload contains `schedule_event`.
4. **Persist context** by storing `context_used` entries in your knowledge base. Feed them back via the `context` field on subsequent calls for continuity.
Because the agent already enforces tool availability and cooldown windows, external flows do not need to duplicate those concernsβthey simply react to the orchestratorβs summarized outcome.
### Mock Flow Harness
To experiment without Docker:
```bash
conda run -n llm python -m testing_tool.mock_agent_flow
The harness stubs every downstream service and prints a full interaction trace. You can adapt the script to validate additional scenarios, or import run_mock_flow inside notebooks/tests for smoke validation.
from testing_tool import mock_agent_flow
summary = mock_agent_flow.run_mock_flow("Plan a retrospective with Alex")
print(summary["final_answer"])- HTTP Trigger β collects the end-user request (
user_query). - gRPC / HTTP Request β call an API gateway that forwards to
AgentService.QueryAgent. - Switch Node β branch on
sources.tools_used(e.g.,schedule_meeting,web_search). - Execute Native Actions β for
schedule_meeting, optionally call the C++ service directly via the shared Python client, or simply notify the user using the agentβs final answer. - Store Transcript β append
final_answer, context, and metrics to Notion/BigQuery for analytics.
This pattern keeps the agent as the denominator: n8n orchestrates around the agent instead of duplicating decision logic.
# Query the agent service
curl -X POST http://localhost:50054/agent.v1.AgentService/QueryAgent \
-H "Content-Type: application/json" \
-d '{"user_query": "What is the square root of the current temperature in Paris?"}'Sample response:
{
"final_answer": "The current temperature in Paris is 22Β°C. The square root is approximately 4.69.",
"context_used": [
{"source": "web_search", "content": "Paris weather: 22Β°C..."},
{"source": "math_solver", "content": "β22 = 4.690..."}
],
"sources": {
### SprintΒ 2 Highlights (MVP focus)
- β
Added `schedule_meeting` tool wired through the C++ gRPC bridge and Swift App Intents package.
- β
Refactored protobuf imports to package-scoped modules for reliable packaging/testing.
- β
Added mock harness (`testing_tool/mock_agent_flow.py`) to exercise the orchestrator without launching the full stack.
- β
Cleaned out unused models/helpers and repaired the streaming LLM client.
With these changes, the agent is the single arbitration point. All flowsβCLI, automated jobs, or low-code buildersβcall into the agent, which then fans out to services according to tool availability, circuit breakers, and intent payloads returned by the native C++ engine.
"tools_used": ["web_search", "math_solver"],
"errors": []
}
}View real-time logs:
make logsCheck service health:
make health-checkpytest testing_tool/tests/test_services_modular.pyβ fast modular coverage for clients and service handlers.pytest testing_tool/tests/test_agent_mock_flow.pyβ ensures the mock harness exercises the scheduling bridge and context propagation.python -m testing_tool.mock_agent_flowβ interactive demo without infrastructure.
- Sprint 1 (current): Deliver shared Swift
AppIntentsPackagewithScheduleMeetingIntentand unit tests (swift test). - Sprint 2: Extend
cpp_llm.protowith intent RPCs and wire Objective-C++ handlers to App Intents. - Sprint 3: Register new agent tools, add OpenTelemetry interceptors, and expose observability dashboards.
- Sprint 4: Ship end-to-end tests (XCTest + Python harness) and prep beta rollout.
# tool_service/tool_service.py
def CallTool(self, request, context):
if request.tool_name == "new_tool":
return self._handle_new_tool(request.params)
def _handle_new_tool(self, params):
# Implement tool logic
return tool_pb2.ToolResponse(...)# agent_service/agent_service.py
class WorkflowBuilder:
def build(self):
# Add custom workflow edges
self.graph.add_node("custom_step", self._custom_node)
self.graph.add_edge("agent", "custom_step")# llm_service/Dockerfile
COPY ./models/new-model.gguf /app/models/Missing Protobuf Definitions
make proto-gen && make buildTool Service Failures
- Verify SERPER_API_KEY in .env
- Check rate limits (50 free requests/day)
LLM Loading Errors
- Ensure model file exists in llm_service/models/
- Verify model compatibility with llama.cpp
- Docker and Docker Compose
- Python 3.12+ (for local development)
- SERPER_API_KEY for web search (get free key at serper.dev)
# Clone repository
git clone https://github.com/yourusername/grpc_llm.git
cd grpc_llm
# Set up environment variables
echo "SERPER_API_KEY=your_key_here" > .env
# Start all services with Docker
make up
# View logs
make logs| Service | Port | Purpose |
|---|---|---|
| UI Service | 5000 | Next.js web interface |
| Agent Service | 50054 | Main orchestration endpoint |
| LLM Service | 50051 | Local language model (Qwen 2.5) |
| Chroma Service | 50052 | Vector database for RAG |
Via Web UI:
# Open browser
open http://localhost:5000
# Start chatting - context persists across messages
# Tools are automatically triggered for queries like:
# - "What is the weather in Paris?" β web_search
# - "Calculate 234 * 567" β math_solver
# - "Tell me about https://example.com" β load_web_pageVia gRPC Client:
import grpc
from shared.generated import agent_pb2, agent_pb2_grpc
channel = grpc.insecure_channel('localhost:50054')
stub = agent_pb2_grpc.AgentServiceStub(channel)
# First message (creates new thread)
response = stub.QueryAgent(
agent_pb2.QueryRequest(message="What is Γayyolu in Ankara?")
)
print(response.message) # Uses web_search tool
thread_id = response.threadId
# Follow-up message (uses context)
metadata = [('thread-id', thread_id)]
response = stub.QueryAgent(
agent_pb2.QueryRequest(message="Tell me more about it"),
metadata=metadata
)
print(response.message) # Remembers previous context# Create Python environment
conda create -n llm python=3.12
conda activate llm
# Install dependencies
pip install -r agent_service/requirements.txt
pip install -r llm_service/requirements.txt
pip install -r chroma_service/requirements.txt
pip install -r requirements-test.txt
# Generate protobuf files
python -m grpc_tools.protoc -I ./shared/proto \
--python_out=./shared/generated \
--grpc_python_out=./shared/generated \
shared/proto/*.proto
# Run tests
pytest tests/unit/ -v
pytest tests/integration/ -vmake build # Build all containers
make up # Start services in background
make down # Stop services
make logs # View logs (all services)
make clean # Remove containers and volumes
make rebuild # Clean rebuild (no cache)grpc_llm/
βββ agent_service/ # Main orchestration service
β βββ agent_service.py # gRPC entry point
β βββ adapter.py # AgentServiceAdapter (core logic)
β βββ llm_wrapper.py # LLM interface adapter
β βββ Dockerfile
βββ core/ # Framework core
β βββ graph.py # StateGraph workflow
β βββ state.py # Conversation state
β βββ checkpointing.py # SQLite persistence
β βββ config.py # Configuration management
βββ tools/ # Tool system
β βββ registry.py # LocalToolRegistry
β βββ circuit_breaker.py # Fault tolerance
β βββ decorators.py # @tool decorator
β βββ builtin/ # Built-in tools
β βββ web_search.py
β βββ math_solver.py
β βββ web_loader.py
βββ shared/ # Shared code
β βββ clients/ # gRPC clients
β βββ generated/ # Protobuf generated code
β βββ proto/ # Protobuf definitions
βββ llm_service/ # LLM backend
βββ chroma_service/ # Vector database
βββ ui_service/ # Next.js frontend
βββ tests/ # Test suite
βββ unit/ # Unit tests
βββ integration/ # E2E tests
Tools are registered in agent_service/adapter.py. Here's how to add one:
# In adapter.py __init__ method:
def my_custom_tool(param1: str, param2: int) -> dict:
"""
Tool description for LLM.
Args:
param1: Description of first parameter
param2: Description of second parameter
Returns:
dict: {"status": "success", "result": "..."}
"""
try:
# Your tool logic here
result = do_something(param1, param2)
return {"status": "success", "result": result}
except Exception as e:
return {"status": "error", "error": str(e)}
# Register it
self.registry.register(
name="my_custom_tool",
description="Clear description for LLM to understand when to use it"
)(my_custom_tool)Edit core/config.py:
@dataclass
class ToolConfig:
circuit_breaker_threshold: int = 3 # Failures before circuit opens
circuit_breaker_timeout: int = 60 # Seconds before retry
max_retries: int = 2 # Per-tool retry limitEdit llm_service/llm_service.py:
# In RunInference method:
result = subprocess.run([
"./llama/llama-cli",
"-m", "./models/qwen2.5-0.5b-instruct-q5_k_m.gguf",
"-p", prompt,
"-n", "512", # Max tokens
"-t", "4", # Threads
"--temp", "0.7", # Temperature
"--top-p", "0.9" # Nucleus sampling
], ...)Edit core/graph.py to customize the StateGraph:
def _should_use_tools(self, state: ConversationState) -> bool:
"""Customize when tools are triggered"""
query = state.messages[-1].content.lower()
# Add custom trigger logic
if "urgent" in query:
return True
# Default heuristics
return self._detect_tool_intent(query)# All services
make logs
# Specific service
docker compose logs -f agent_service
docker compose logs -f llm_service# Service status
docker compose ps
# Port availability
lsof -i :50054 # Agent service
lsof -i :50051 # LLM service
lsof -i :5000 # UI serviceEnable verbose logging in agent_service/adapter.py:
logging.basicConfig(
level=logging.DEBUG, # Change from INFO
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)"Cannot operate on a closed database"
- Ensure SQLite file has write permissions
- Check if multiple processes are accessing the same DB file
- Solution: Restart with
make down && make up
"Name resolution failed for target dns:agent_service"
- Docker network issue
- Solution:
make down && make clean && make up
Web search returns no results
- Missing SERPER_API_KEY in
.env - Rate limit exceeded (50 free requests/day)
- Check logs:
docker compose logs -f agent_service | grep SERPER
LLM service not responding
- Model file missing or corrupted
- Check:
docker compose exec llm_service ls -lh /app/models/ - Re-download model if needed
# Run all unit tests
pytest tests/unit/ -v
# Test specific modules
pytest tests/unit/test_registry.py -v
pytest tests/unit/test_circuit_breaker.py -v
pytest tests/unit/test_builtin_tools.py -v# Start services first
make up
# Run E2E tests
pytest tests/integration/ -v
# Specific test
pytest tests/integration/test_agent_service_e2e.py::test_query_with_tools -vpytest tests/ --cov=agent_service --cov=core --cov=tools --cov-report=html
open htmlcov/index.htmlThe system uses the Adapter Pattern to bridge different architectural layers:
# User Query Flow
UI (gRPC-js)
β AgentService.QueryAgent(request, metadata)
β AgentServiceAdapter.process_query(message, thread_id)
β StateGraph.invoke(initial_state, config)
β LLMClientWrapper.run_inference(prompt, tools)
β LLMClient.RunInference(grpc_request)
β llama.cpp (local model)Key Adapters:
- AgentServiceAdapter: Converts gRPC requests β LangGraph workflow
- LLMClientWrapper: Converts LangGraph interface β gRPC LLM calls
- Tool wrappers: Convert Python functions β LangChain tool schema
- Query Analysis: LLM determines if tools are needed based on query
- Tool Selection: Workflow matches query intent to available tools
- Circuit Breaker Check: Verifies tool is healthy before execution
- Execution: Tool runs with parameters extracted by LLM
- Result Processing: Tool output is formatted and added to context
- Final Response: LLM synthesizes tool results into natural language
# Thread-based conversation tracking
thread_id = "user-123-session-456"
# First message creates checkpoint
state1 = {"messages": [HumanMessage("What is Paris?")]}
checkpointer.put(thread_id, state1)
# Second message loads context
state2 = checkpointer.get(thread_id)
# state2.messages = [HumanMessage("What is Paris?"), AIMessage("Paris is...")]
# Follow-up uses history
state2.messages.append(HumanMessage("Tell me more"))@dataclass
class ConversationState:
messages: List[BaseMessage] # Full conversation history
iterations: int # Workflow iteration counter
context: List[Dict[str, Any]] # Retrieved documents/tool results
metadata: Dict[str, Any] # Thread ID, timestamps, etc. βββββββββββββββ
β START β
ββββββββ¬βββββββ
β
βΌ
βββββββββββββββ
β agent_node ββββββββ
β (LLM decides)β β
ββββββββ¬βββββββ β
β β
ββββββββββ΄βββββββββ β
β β β
βΌ βΌ β
βββββββββββ βββββββββββ
β tools β β END β
β (execute)β βββββββββββ
ββββββ¬βββββ β²
β β
βββββββββββββββββββ
(loop until complete)
class LocalToolRegistry:
def __init__(self):
self._tools: Dict[str, ToolWrapper] = {}
self._circuit_breakers: Dict[str, CircuitBreaker] = {}
def register(self, name: str):
"""Decorator to register tools"""
def decorator(func):
self._tools[name] = ToolWrapper(func)
self._circuit_breakers[name] = CircuitBreaker(threshold=3)
return func
return decorator
def execute(self, name: str, **kwargs):
"""Execute with circuit breaker protection"""
if self._circuit_breakers[name].is_open():
raise CircuitOpenError(f"Tool {name} circuit is open")
try:
result = self._tools[name].run(**kwargs)
self._circuit_breakers[name].record_success()
return result
except Exception as e:
self._circuit_breakers[name].record_failure()
raise- β Adapter-based architecture
- β SQLite conversation persistence
- β 4 built-in tools with circuit breakers
- β Thread-based context management
- β Next.js web UI with real-time updates
- v1.1: Streaming responses for better UX
- v1.2: Multi-modal support (images, audio)
- v1.3: Plugin system for external tools
- v1.4: Observability dashboard (OpenTelemetry)
- v2.0: Multi-tenant support with authentication
We welcome contributions! Here's how to get started:
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Make your changes and add tests
- Run tests:
pytest tests/ -v - Commit:
git commit -m 'Add amazing feature' - Push:
git push origin feature/amazing-feature - Open a Pull Request
- Code Style: Follow PEP 8 for Python code
- Documentation: Update README and docstrings
- Tests: Add tests for new features
- Commits: Use conventional commit messages
- Architecture: Maintain adapter pattern separation
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.
- LangGraph for workflow orchestration
- llama.cpp for efficient local LLM inference
- Serper for web search API
- Qwen Team for the open-source model
- Documentation: See
ARCHITECTURE.mdandUI_SERVICE_SETUP.md - Issues: GitHub Issues
- Discussions: GitHub Discussions
Built with β€οΈ using LangGraph, gRPC, and modern Python