A sophisticated hybrid database query orchestration system built with LangGraph, featuring intelligent query classification, multi-agent execution, seamless SQL/NoSQL integration, and professional data engineering support.
https://www.youtube.com/watch?v=aKE-0c00JQE
LocoForge is a cutting-edge AI-powered database orchestration system that intelligently routes and executes queries across multiple database types (SQL and NoSQL) using advanced graph-based workflows. The system leverages LangGraph for state management and GPT-4o-mini for intelligent query classification and decomposition.
- π€ Intelligent Query Classification: AI-powered domain and intent recognition with complexity assessment
- π Multi-Agent Orchestration: Seamless SQL and NoSQL agent coordination
- π Hybrid Query Processing: Complex queries spanning multiple database types
- π― Graph-Based Workflow: Stateful execution with conditional routing
- π Result Aggregation: Intelligent combination of multi-source results
- π Context Management: Persistent conversation history and state tracking
- π§ LangGraph Studio Integration: Real-time workflow visualization and debugging
- π¨βπΌ Data Engineer Agent: Professional handling of unclear, technical, and non-domain queries
- π Web Interface: RESTful API with health checks and database statistics
- π Production Ready: Docker support and Render deployment configuration
- π‘οΈ Enhanced Error Handling: Graceful degradation and comprehensive error recovery
The system implements a sophisticated state machine using LangGraph with the following nodes:
classify_query
- AI-powered query domain, intent, and complexity classificationdecompose_query
- Complex query decomposition into sub-queriesroute_to_agents
- Intelligent routing decision makingsql_agent
- SQL query execution (Employee Management)nosql_agent
- NoSQL query execution (Supplies Database)data_engineer
- Professional handling of unclear/technical queriesaggregate_results
- Multi-source result combinationupdate_context
- Conversation state managementformat_response
- Final response formatting

class OrchestratorState(TypedDict):
messages: List[BaseMessage] # Conversation history
current_query: str # Current user query
query_domain: QueryDomain # Classified domain (EMPLOYEE/SUPPLIES/HYBRID/UNCLEAR/TECHNICAL)
query_intent: QueryIntent # Query intent (SELECT/ANALYZE/COMPARE/AGGREGATE/CLARIFY/EXPLAIN)
query_complexity: QueryComplexity # NEW: Complexity assessment (SIMPLE/MEDIUM/COMPLEX)
sub_queries: Dict[str, str] # Decomposed sub-queries
sql_results: Optional[Dict[str, Any]] # SQL agent results
nosql_results: Optional[Dict[str, Any]] # NoSQL agent results
combined_results: Optional[Dict[str, Any]] # Aggregated results
context_history: List[Dict[str, Any]] # Execution context
execution_path: List[str] # Workflow execution trace
error_message: Optional[str] # Error handling
clarification_suggestions: Optional[List[str]] # NEW: Query refinement suggestions
data_engineer_response: Optional[str] # NEW: Data engineer agent responses
The system implements sophisticated routing decisions with new data engineer support:
def route_decision(state: OrchestratorState) -> str:
"""Intelligent routing based on query domain and complexity"""
domain = state["query_domain"]
if domain == QueryDomain.EMPLOYEE:
return "sql_only"
elif domain == QueryDomain.SUPPLIES:
return "nosql_only"
elif domain == QueryDomain.HYBRID:
return "both_agents"
elif domain in [QueryDomain.UNCLEAR, QueryDomain.TECHNICAL]:
return "data_engineer" # NEW: Route unclear queries to Data Engineer
else:
return "error_handling"
The new Data Engineer Agent provides professional handling for:
- Ambiguous Queries: "Show me everything", "What's the data?"
- Non-Domain Queries: "What's the weather like?", "Tell me a joke"
- Technical Queries: "SELECT * FROM employees", "Show schema"
- Overly Complex Queries: Performance-impacting queries
class DataEngineerAgent:
def analyze_query(self, query: str) -> Dict[str, Any]:
"""Analyze query and provide professional guidance"""
def provide_clarification_suggestions(self, query: str, analysis: Dict[str, Any]) -> List[str]:
"""Generate specific clarification suggestions"""
def handle_technical_query(self, query: str) -> Dict[str, Any]:
"""Handle SQL/NoSQL syntax and schema questions"""
def handle_non_domain_query(self, query: str) -> Dict[str, Any]:
"""Handle queries outside system domain"""
def classify_intent(self, query: str) -> Tuple[QueryDomain, QueryIntent, QueryComplexity]:
"""Use GPT-4o-mini to classify query domain, intent, and complexity"""
system_prompt = """
You are an expert query classifier for a hybrid database system with:
1. SQL Database: Employee management (employees, departments, projects, attendance)
2. NoSQL Database: Sample Supplies (sales, customers, items, stores)
Classify the query into:
- DOMAIN: employee, supplies, hybrid, unclear, technical
- INTENT: select, analyze, compare, aggregate, clarify, explain
- COMPLEXITY: simple, medium, complex
"""
# LLM-based classification logic
The system now includes a complete web interface with the following endpoints:
# Health check
GET /health
# Database statistics
GET /api/database/stats
# Natural language query processing
POST /api/query
{
"query": "Show me employee salaries and supplies data"
}
# Direct SQL query execution
POST /api/database/query
{
"sql": "SELECT COUNT(*) FROM employees WHERE department = 'Engineering'"
}
# Health check
curl https://your-app-name.onrender.com/health
# Natural language query
curl -X POST https://your-app-name.onrender.com/api/query \
-H "Content-Type: application/json" \
-d '{"query": "How many employees are in Engineering?"}'
# Database statistics
curl https://your-app-name.onrender.com/api/database/stats
- Python 3.8+
- MongoDB (for NoSQL operations)
- SQLite/PostgreSQL (for SQL operations)
- OpenAI API Key
# Clone the repository
git clone https://github.com/yourusername/LocoForge.git
cd LocoForge
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Set up environment variables
cp env_template.txt .env
# Edit .env with your OpenAI API key and database configurations
# .env file
OPENAI_API_KEY=your_openai_api_key_here
MONGO_DB=mongodb://localhost:27017/
SQL_DB=sqlite:///employee_management.db
ENVIRONMENT=development
# Start the Flask application
python app.py
# Access the API at http://localhost:5000
from my_agent.agent import graph
from my_agent.utils.state import OrchestratorState
# Initialize the workflow
workflow = graph
# Create a query
state = OrchestratorState(
messages=[HumanMessage(content="Show me employee salaries and supplies data")],
current_query="Show me employee salaries and supplies data"
)
# Execute the workflow
result = workflow.invoke(state)
print(result["combined_results"])
-- Employees table
CREATE TABLE employees (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
department TEXT,
salary REAL,
hire_date DATE,
manager_id INTEGER
);
-- Departments table
CREATE TABLE departments (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
budget REAL
);
-- Projects table
CREATE TABLE projects (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
department_id INTEGER,
start_date DATE,
end_date DATE
);
// Sales collection
{
"_id": ObjectId,
"couponUsed": true,
"customer": {
"age": 30,
"email": "customer@example.com",
"gender": "F",
"satisfaction": 5
},
"items": [
{
"name": "Office Supplies",
"price": 25.99,
"quantity": 2,
"tags": ["office", "stationery"]
}
],
"purchaseMethod": "Online",
"saleDate": ISODate("2023-01-15"),
"storeLocation": "Denver"
}
// Comments collection
{
"_id": ObjectId,
"movie_id": ObjectId,
"name": "User Name",
"email": "user@example.com",
"text": "Great movie!",
"date": ISODate("2024-01-15")
}
# Test the orchestrator workflow
python test_orchestrator.py
# Test individual agents
python test_sql_agent.py
python test_nosql_agent.py
# Test cross-database queries
python test_cross_database_queries.py
# Test LangGraph Studio integration
python test_langgraph_studio.py
# Test edge cases and error handling
python test_edge_cases.py
# Test deployment configuration
python test_deployment.py
# Employee queries (SQL)
"Show me all employees in the Engineering department"
"What's the average salary by department?"
"Find employees hired in the last 6 months"
# Supplies queries (NoSQL)
"Show me all sales with high customer satisfaction"
"What are the top-selling items this year?"
"List sales by store location"
"Show total sales by customer age group"
# Hybrid queries (combining both databases)
"Show which employees bought office supplies"
"Compare employee salaries with sales data"
# Unclear queries (Data Engineer)
"Show me everything"
"What's the weather like?"
"SELECT * FROM employees"
The system includes complete Render deployment configuration:
# Deploy to Render using render.yaml
# The system automatically configures:
# - Flask web application
# - Gunicorn production server
# - Environment variable management
# - Health checks and monitoring
# Build and run with Docker
docker build -t locoforge .
docker run -p 5000:5000 locoforge
OPENAI_API_KEY=your_openai_api_key
GOOGLE_API_KEY=your_google_api_key
DATABASE_URL=sqlite:///employee_management.db
ENVIRONMENT=production
The system includes full LangGraph Studio support for real-time workflow visualization:
# Start LangGraph Studio
langgraph studio
# Access the interface at http://localhost:8123
The system includes comprehensive error handling:
- Agent Initialization Failures: Graceful degradation when agents are unavailable
- Query Execution Errors: Detailed error reporting and recovery
- Network Connectivity: Retry mechanisms for database connections
- State Recovery: Persistent state management across sessions
- Data Engineer Fallback: Professional responses for all query types
Extend the system with custom agents:
class CustomAgent:
def __init__(self):
self.model = ChatOpenAI(model="gpt-4o-mini")
def execute_query(self, query: str) -> Dict[str, Any]:
# Custom query execution logic
pass
- Lazy Loading: Agents initialized only when needed
- Connection Pooling: Efficient database connection management
- Caching: Query result caching for repeated requests
- Async Processing: Non-blocking query execution where possible
- State Management: Efficient state updates and context tracking
import logging
# Comprehensive logging throughout the workflow
logger = logging.getLogger(__name__)
logger.info("π Initializing orchestrator...")
logger.info("β
SQL agent initialized successfully")
logger.warning("β οΈ NoSQL agent not available")
logger.info("π¨βπΌ Data Engineer Agent ready for unclear queries")
- DEPLOYMENT_GUIDE.md - Complete deployment instructions
- EDGE_CASE_HANDLING_GUIDE.md - Data Engineer Agent details
- SQL_AGENT_README.md - SQL agent documentation
- NOSQL_AGENT_README.md - NoSQL agent documentation
- MONGOENGINE_MIGRATION_README.md - Database migration guide
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Our Team Yash Varshney, dhruv jain, Aditya Mandal
- Langgraph Team