Skip to content

yswa-var/LocoForge

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

44 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸš€ LocoForge: Advanced AI-Powered Database Orchestration System

A sophisticated hybrid database query orchestration system built with LangGraph, featuring intelligent query classification, multi-agent execution, seamless SQL/NoSQL integration, and professional data engineering support.

Python LangGraph OpenAI Architecture Deployment Web Interface

🎯 Project Overview

https://www.youtube.com/watch?v=aKE-0c00JQE Screenshot 2025-06-27 at 1 30 14 PM

LocoForge is a cutting-edge AI-powered database orchestration system that intelligently routes and executes queries across multiple database types (SQL and NoSQL) using advanced graph-based workflows. The system leverages LangGraph for state management and GPT-4o-mini for intelligent query classification and decomposition.

🌟 Key Features

  • πŸ€– Intelligent Query Classification: AI-powered domain and intent recognition with complexity assessment
  • πŸ”„ Multi-Agent Orchestration: Seamless SQL and NoSQL agent coordination
  • πŸ“Š Hybrid Query Processing: Complex queries spanning multiple database types
  • 🎯 Graph-Based Workflow: Stateful execution with conditional routing
  • πŸ“ˆ Result Aggregation: Intelligent combination of multi-source results
  • πŸ”„ Context Management: Persistent conversation history and state tracking
  • πŸ”§ LangGraph Studio Integration: Real-time workflow visualization and debugging
  • πŸ‘¨β€πŸ’Ό Data Engineer Agent: Professional handling of unclear, technical, and non-domain queries
  • 🌐 Web Interface: RESTful API with health checks and database statistics
  • πŸš€ Production Ready: Docker support and Render deployment configuration
  • πŸ›‘οΈ Enhanced Error Handling: Graceful degradation and comprehensive error recovery

πŸ—οΈ Architecture

Core Components

Editor _ Mermaid Chart-2025-06-27-083351

Enhanced Workflow Graph

The system implements a sophisticated state machine using LangGraph with the following nodes:

  1. classify_query - AI-powered query domain, intent, and complexity classification
  2. decompose_query - Complex query decomposition into sub-queries
  3. route_to_agents - Intelligent routing decision making
  4. sql_agent - SQL query execution (Employee Management)
  5. nosql_agent - NoSQL query execution (Supplies Database)
  6. data_engineer - Professional handling of unclear/technical queries
  7. aggregate_results - Multi-source result combination
  8. update_context - Conversation state management
  9. format_response - Final response formatting
Screenshot 2025-06-29 at 9 21 49 PM

πŸ› οΈ Technical Implementation

Enhanced State Management

class OrchestratorState(TypedDict):
    messages: List[BaseMessage]           # Conversation history
    current_query: str                    # Current user query
    query_domain: QueryDomain            # Classified domain (EMPLOYEE/SUPPLIES/HYBRID/UNCLEAR/TECHNICAL)
    query_intent: QueryIntent            # Query intent (SELECT/ANALYZE/COMPARE/AGGREGATE/CLARIFY/EXPLAIN)
    query_complexity: QueryComplexity    # NEW: Complexity assessment (SIMPLE/MEDIUM/COMPLEX)
    sub_queries: Dict[str, str]          # Decomposed sub-queries
    sql_results: Optional[Dict[str, Any]] # SQL agent results
    nosql_results: Optional[Dict[str, Any]] # NoSQL agent results
    combined_results: Optional[Dict[str, Any]] # Aggregated results
    context_history: List[Dict[str, Any]] # Execution context
    execution_path: List[str]            # Workflow execution trace
    error_message: Optional[str]         # Error handling
    clarification_suggestions: Optional[List[str]] # NEW: Query refinement suggestions
    data_engineer_response: Optional[str] # NEW: Data engineer agent responses

Enhanced Conditional Routing Logic

The system implements sophisticated routing decisions with new data engineer support:

def route_decision(state: OrchestratorState) -> str:
    """Intelligent routing based on query domain and complexity"""
    domain = state["query_domain"]

    if domain == QueryDomain.EMPLOYEE:
        return "sql_only"
    elif domain == QueryDomain.SUPPLIES:
        return "nosql_only"
    elif domain == QueryDomain.HYBRID:
        return "both_agents"
    elif domain in [QueryDomain.UNCLEAR, QueryDomain.TECHNICAL]:
        return "data_engineer"  # NEW: Route unclear queries to Data Engineer
    else:
        return "error_handling"

Data Engineer Agent

The new Data Engineer Agent provides professional handling for:

  • Ambiguous Queries: "Show me everything", "What's the data?"
  • Non-Domain Queries: "What's the weather like?", "Tell me a joke"
  • Technical Queries: "SELECT * FROM employees", "Show schema"
  • Overly Complex Queries: Performance-impacting queries
class DataEngineerAgent:
    def analyze_query(self, query: str) -> Dict[str, Any]:
        """Analyze query and provide professional guidance"""

    def provide_clarification_suggestions(self, query: str, analysis: Dict[str, Any]) -> List[str]:
        """Generate specific clarification suggestions"""

    def handle_technical_query(self, query: str) -> Dict[str, Any]:
        """Handle SQL/NoSQL syntax and schema questions"""

    def handle_non_domain_query(self, query: str) -> Dict[str, Any]:
        """Handle queries outside system domain"""

AI-Powered Query Classification

def classify_intent(self, query: str) -> Tuple[QueryDomain, QueryIntent, QueryComplexity]:
    """Use GPT-4o-mini to classify query domain, intent, and complexity"""
    system_prompt = """
    You are an expert query classifier for a hybrid database system with:
    1. SQL Database: Employee management (employees, departments, projects, attendance)
    2. NoSQL Database: Sample Supplies (sales, customers, items, stores)

    Classify the query into:
    - DOMAIN: employee, supplies, hybrid, unclear, technical
    - INTENT: select, analyze, compare, aggregate, clarify, explain
    - COMPLEXITY: simple, medium, complex
    """
    # LLM-based classification logic

🌐 Web Interface

RESTful API Endpoints

The system now includes a complete web interface with the following endpoints:

# Health check
GET /health

# Database statistics
GET /api/database/stats

# Natural language query processing
POST /api/query
{
    "query": "Show me employee salaries and supplies data"
}

# Direct SQL query execution
POST /api/database/query
{
    "sql": "SELECT COUNT(*) FROM employees WHERE department = 'Engineering'"
}

Example API Usage

# Health check
curl https://your-app-name.onrender.com/health

# Natural language query
curl -X POST https://your-app-name.onrender.com/api/query \
  -H "Content-Type: application/json" \
  -d '{"query": "How many employees are in Engineering?"}'

# Database statistics
curl https://your-app-name.onrender.com/api/database/stats

πŸš€ Getting Started

Prerequisites

  • Python 3.8+
  • MongoDB (for NoSQL operations)
  • SQLite/PostgreSQL (for SQL operations)
  • OpenAI API Key

Installation

# Clone the repository
git clone https://github.com/yourusername/LocoForge.git
cd LocoForge

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

# Set up environment variables
cp env_template.txt .env
# Edit .env with your OpenAI API key and database configurations

Environment Configuration

# .env file
OPENAI_API_KEY=your_openai_api_key_here
MONGO_DB=mongodb://localhost:27017/
SQL_DB=sqlite:///employee_management.db
ENVIRONMENT=development

Quick Start

Using the Web Interface

# Start the Flask application
python app.py

# Access the API at http://localhost:5000

Using the LangGraph Workflow

from my_agent.agent import graph
from my_agent.utils.state import OrchestratorState

# Initialize the workflow
workflow = graph

# Create a query
state = OrchestratorState(
    messages=[HumanMessage(content="Show me employee salaries and supplies data")],
    current_query="Show me employee salaries and supplies data"
)

# Execute the workflow
result = workflow.invoke(state)
print(result["combined_results"])

οΏ½οΏ½ Database Schemas

SQL Database (Employee Management)

-- Employees table
CREATE TABLE employees (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    department TEXT,
    salary REAL,
    hire_date DATE,
    manager_id INTEGER
);

-- Departments table
CREATE TABLE departments (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    budget REAL
);

-- Projects table
CREATE TABLE projects (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    department_id INTEGER,
    start_date DATE,
    end_date DATE
);

NoSQL Database (Supplies Database)

// Sales collection
{
  "_id": ObjectId,
  "couponUsed": true,
  "customer": {
    "age": 30,
    "email": "customer@example.com",
    "gender": "F",
    "satisfaction": 5
  },
  "items": [
    {
      "name": "Office Supplies",
      "price": 25.99,
      "quantity": 2,
      "tags": ["office", "stationery"]
    }
  ],
  "purchaseMethod": "Online",
  "saleDate": ISODate("2023-01-15"),
  "storeLocation": "Denver"
}

// Comments collection
{
  "_id": ObjectId,
  "movie_id": ObjectId,
  "name": "User Name",
  "email": "user@example.com",
  "text": "Great movie!",
  "date": ISODate("2024-01-15")
}

πŸ§ͺ Testing

Run Test Suite

# Test the orchestrator workflow
python test_orchestrator.py

# Test individual agents
python test_sql_agent.py
python test_nosql_agent.py

# Test cross-database queries
python test_cross_database_queries.py

# Test LangGraph Studio integration
python test_langgraph_studio.py

# Test edge cases and error handling
python test_edge_cases.py

# Test deployment configuration
python test_deployment.py

Example Queries

# Employee queries (SQL)
"Show me all employees in the Engineering department"
"What's the average salary by department?"
"Find employees hired in the last 6 months"

# Supplies queries (NoSQL)
"Show me all sales with high customer satisfaction"
"What are the top-selling items this year?"
"List sales by store location"
"Show total sales by customer age group"

# Hybrid queries (combining both databases)
"Show which employees bought office supplies"
"Compare employee salaries with sales data"

# Unclear queries (Data Engineer)
"Show me everything"
"What's the weather like?"
"SELECT * FROM employees"

πŸš€ Deployment

Render Deployment

The system includes complete Render deployment configuration:

# Deploy to Render using render.yaml
# The system automatically configures:
# - Flask web application
# - Gunicorn production server
# - Environment variable management
# - Health checks and monitoring

Docker Deployment

# Build and run with Docker
docker build -t locoforge .
docker run -p 5000:5000 locoforge

Environment Variables for Production

OPENAI_API_KEY=your_openai_api_key
GOOGLE_API_KEY=your_google_api_key
DATABASE_URL=sqlite:///employee_management.db
ENVIRONMENT=production

πŸ”§ Advanced Features

LangGraph Studio Integration

The system includes full LangGraph Studio support for real-time workflow visualization:

# Start LangGraph Studio
langgraph studio

# Access the interface at http://localhost:8123

Enhanced Error Handling

The system includes comprehensive error handling:

  • Agent Initialization Failures: Graceful degradation when agents are unavailable
  • Query Execution Errors: Detailed error reporting and recovery
  • Network Connectivity: Retry mechanisms for database connections
  • State Recovery: Persistent state management across sessions
  • Data Engineer Fallback: Professional responses for all query types

Custom Agent Development

Extend the system with custom agents:

class CustomAgent:
    def __init__(self):
        self.model = ChatOpenAI(model="gpt-4o-mini")

    def execute_query(self, query: str) -> Dict[str, Any]:
        # Custom query execution logic
        pass

πŸ“ˆ Performance & Scalability

Optimization Strategies

  • Lazy Loading: Agents initialized only when needed
  • Connection Pooling: Efficient database connection management
  • Caching: Query result caching for repeated requests
  • Async Processing: Non-blocking query execution where possible
  • State Management: Efficient state updates and context tracking

Monitoring & Logging

import logging

# Comprehensive logging throughout the workflow
logger = logging.getLogger(__name__)
logger.info("πŸ”„ Initializing orchestrator...")
logger.info("βœ… SQL agent initialized successfully")
logger.warning("⚠️ NoSQL agent not available")
logger.info("πŸ‘¨β€πŸ’Ό Data Engineer Agent ready for unclear queries")

πŸ“š Documentation

Additional Guides

🀝 Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ™ Acknowledgments

About

prompt to SQL, NoSQL and GoogleDrive task executing agent

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages