diff --git a/autogen/agents/experimental/agents_fastapi_websocket/README.md b/autogen/agents/experimental/agents_fastapi_websocket/README.md new file mode 100644 index 00000000000..419da0241e9 --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/README.md @@ -0,0 +1,202 @@ +# 🔌 AG2AI WebSocket Agentic Workflow UI & Backend + +An interactive WebSocket-based UI and FastAPI backend for orchestrating agentic workflows using [AG2AI Autogen](https://github.com/ag2ai/ag2). This project demonstrates real-time, multi-agent collaboration for solving problems through a WebSocket-powered interface — ideal for tasks like data analysis, EDA, and more. + +![App UI Screenshot](./docs/screenshot.png) + +--- + +## 📘 How to Use the WebSocket UI + +To learn how to interact with the UI step-by-step, check out the full guide: + +➡️ [Usage Guide (UI Walkthrough)](./docs/USAGE_GUIDE.md) + + +--- + +## 📦 Overview + +This repository includes: + +- A clean **frontend UI** to interact with WebSockets visually — more intuitive than Postman or raw clients. +- A **FastAPI backend** that manages real-time WebSocket communication and coordinates multiple agents via AG2AI Autogen. +- A custom **orchestrator agent** (`agent_aligner`) that manages execution flow, ensuring orderly agent coordination. + +--- + +## ✨ Features + +### ✅ Frontend + +- Interactive WebSocket client with formatted JSON display +- Message blocks styled for clarity and separation +- UUID-based client tracking +- Send/receive messages with live updates +- Manual message construction and quick templates + +### ✅ Backend + +- Built with **FastAPI** and **async WebSocket** handling +- Modular architecture using manager classes +- Custom `AgentChat` class for group chat orchestration +- Real-time message streaming to frontend +- Manual user input integration during live chat +- Environment-based configuration via `.env` + +--- + +## 🧩 Problem & Solution + +### ❌ The Problem We Faced + +While working with WebSocket-based agent systems using AG2AI/Autogen, we encountered several major bottlenecks that affected productivity and developer experience: + +- **Postman and raw WebSocket clients are not interactive** + These tools make it hard to follow multi-agent conversations. They lack formatting, which slows down debugging and understanding the data flow. + +- **Reading agent messages is time-consuming** + When working with multiple agents, reviewing each step (especially during prompt tuning or alignment) becomes tedious and error-prone. + +- **Lack of message formatting** + JSON responses from agents are dumped as raw strings, making them hard to read and troubleshoot — especially when nested or streamed. + +- **Frontend development was not feasible** + Building a fully custom UI in frameworks like React or Vue would add significant overhead and distract from core system development. + +- **No streamlined session management** + Keeping track of WebSocket sessions and switching between different chats was a manual and error-prone task. + +--- + +### ✅ The Solution We Implemented + +To overcome these challenges, we built a minimal yet powerful **interactive WebSocket UI**, paired with a **FastAPI backend**, enabling seamless development and debugging of agent workflows. + +Key benefits: + +- **Clean, interactive WebSocket communication** + Live messages stream directly to the browser with proper formatting and role-based separation. + +- **Well-structured message display** + All messages are styled in blocks and automatically formatted as JSON, making it easy to inspect agent responses. + +- **Faster prompt tuning & agent alignment** + Developers can instantly see how agents respond, helping fine-tune prompts with clarity and speed. + +- **Quick session switching** + Chat sessions can be created and reused easily, improving workflow efficiency during development and testing. + +- **Minimal development effort** + A lightweight HTML/JS UI replaces the need for building a full-fledged frontend framework — saving time while still improving UX significantly. + +--- + +This setup dramatically reduced the friction in debugging, testing, and managing agentic workflows — allowing us to focus on what matters: building smart and responsive agents. + +--- + +## 🤖 Agents Overview + +This system supports the following agents for structured task completion: + +- **`planner_agent`**: Produces a step-by-step execution plan (no code). +- **`code_writer`**: Converts the plan into working `python` code. +- **`code_executor`**: Executes code in the local runtime environment. +- **`debugger`**: Detects and resolves runtime errors; retries code. +- **`process_completion`**: Summarizes results and guides the next steps. +- `agent_aligner`: Coordinates the overall workflow, ensuring agents operate in the correct sequence. Enforces the execution flow (Plan → Confirm → Write → Confirm → Execute) to maintain structure, avoid loops, and ensure safe progression. + + +--- + +## 📁 Project Structure + +``` + +. +├── managers/ +│ ├── cancellation_token.py # Manages cancellation signals +│ ├── connection.py # Handles socket connections +│ ├── groupchat.py # AgentChat orchestration logic +│ ├── prompts.py # Prompt templates and roles +│ +├── templates/ +│ └── index.html # Frontend UI (WebSocket client) +│ +├── .env # API keys and environment variables +├── dependencies.py # AG2AI agent setup and configuration +├── helpers.py # Utility functions +├── main.py # FastAPI server entry point +├── ws.py # WebSocket route handler +├── requirements.txt # Python dependencies +└── Readme.md # You're reading it! + +```` + +--- + +## 🚀 Getting Started + +### 1. Clone the Repository + +```bash +git clone https://github.com/Suryaaa-Rathore/websocket-ag2ai.git +cd websocket-ag2ai +```` + +### 2. Create and Activate Virtual Environment + +```bash +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate +``` + +### 3. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +### 4. Set Your OpenAI API Key + +Create a `.env` file in the root directory: + +``` +OPENAI_API_KEY=your-openai-api-key +``` + +Or export the variable directly: + +```bash +export OPENAI_API_KEY=your-openai-api-key +``` + +### 5. Run the Server + +```bash +python main.py +``` + +### 6. Access the Frontend + +Open your browser and go to: + +``` +http://localhost:8000 +``` + +--- + +## 🔍 Discoverability Tags + +* FastAPI WebSocket Manager +* AG2AI Autogen Orchestrator +* Real-time agent workflows +* WebSocket frontend UI +* AI agent orchestration +* Agentic problem solving with Python +* Multi-agent system with streaming responses +* Custom group chat with Autogen + +--- diff --git a/autogen/agents/experimental/agents_fastapi_websocket/dependencies.py b/autogen/agents/experimental/agents_fastapi_websocket/dependencies.py new file mode 100644 index 00000000000..72377262013 --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/dependencies.py @@ -0,0 +1,65 @@ +# ✨ NEW: Add typing imports + +from fastapi import HTTPException, status + +from .managers.connection import WebSocketManager + +# ✨ CHANGED: Use Optional[WebSocketManager] instead of WebSocketManager | None +_websocket_manager: WebSocketManager | None = None + +# Singleton instance of MongoDB service + + +async def get_websocket_manager() -> WebSocketManager: + """Dependency provider for connection manager""" + if not _websocket_manager: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Connection manager not initialized", + ) + return _websocket_manager + + +# Manager initialization and cleanup +async def init_managers() -> None: + """Initialize all manager instances""" + global _websocket_manager + + try: + # Initialize connection manager + _websocket_manager = WebSocketManager() + + except Exception: + await cleanup_managers() # Cleanup any partially initialized managers + raise + + +async def cleanup_managers() -> None: + """Cleanup and shutdown all manager instances""" + global _websocket_manager + + # Cleanup connection manager first to ensure all active connections are closed + if _websocket_manager: + try: + await _websocket_manager.cleanup() + except Exception as e: + print(e) + raise + finally: + _websocket_manager = None + + +# Utility functions for dependency management + + +# Error handling for manager operations + + +class ManagerOperationError(Exception): + """Custom exception for manager operation errors""" + + def __init__(self, manager_name: str, operation: str, detail: str): + self.manager_name = manager_name + self.operation = operation + self.detail = detail + super().__init__(f"{manager_name} failed during {operation}: {detail}") diff --git a/autogen/agents/experimental/agents_fastapi_websocket/docs/USAGE_GUIDE.md b/autogen/agents/experimental/agents_fastapi_websocket/docs/USAGE_GUIDE.md new file mode 100644 index 00000000000..95aa271bef2 --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/docs/USAGE_GUIDE.md @@ -0,0 +1,104 @@ +# 🧪 AG2AI WebSocket UI – Step-by-Step Usage Guide + +This guide explains how to use the interactive WebSocket UI to send and receive agentic messages through a running AG2AI Autogen backend. + +--- + +## 🖼️ Screenshot Reference + +![WebSocket UI Screenshot](./usage_screenshot.png) + +--- + +## 🚶‍♂️ Step-by-Step Instructions + +### 1️⃣ Create a New Chat Session +Click on the **`Create New Chat`** button to generate a unique `chat_id`. +This will be used to manage your session across messages. + +--- + +### 2️⃣ Use the Chat ID +After creating a chat, click **`Use`** next to the desired `chat_id`. +This tells the system which session you're actively working in. + +--- + +### 3️⃣ Connect to WebSocket +Click the **`Connect`** button to establish a WebSocket connection with the backend server. +Once connected, messages can be sent and received in real time. + +--- + +### 4️⃣ Load a Query Template +Click **`User Query`** to auto-fill a JSON template into the input box. +This template follows the format expected by the backend. + +--- + +### 5️⃣ Enter Your Task +In the JSON input, fill in your prompt inside the `task` key. + +#### Example: +```json +{ + "type": "start", + "task": "Analyze a dataset and generate EDA visualizations", + "files": [] +} +```` + +--- + +### 6️⃣ Send the Message + +Click **`Send`** to dispatch the message. +Wait for streaming agent responses to appear in the UI. + +--- + +## 🧠 Optional: Respond to User Proxy Input Requests + +If your setup includes a **User Proxy agent** (i.e., the agent needs user confirmation/input during the flow), you can respond using the `User Input` feature. + +### When a prompt from the User Proxy appears: + +1. Click **`User Input`** to load a template into the message input box. +2. Fill your response inside the `response` key. + +#### Example: + +```json +{ + "type": "input_response", + "response": "Yes, proceed with the next step", + "files": [] +} +``` + +3. Click **`Send`** to reply to the ongoing agent conversation. + +--- + +## ✅ Tips + +* You can view each message block (sent/received) formatted with timestamps and roles. +* JSON inputs are auto-formatted for better readability. +* You can disconnect anytime using the **`Disconnect`** button. + +--- + +## 🧪 Use Case Example + +> Want to try a full agentic workflow? +> Try entering a task like: + +```json +{ + "type": "start", + "task": "Perform EDA on /path/to/titanic.csv dataset using Python", + "files": [] +} +``` + +and watch the agents plan, code, execute, debug, and summarize the task live. diff --git a/autogen/agents/experimental/agents_fastapi_websocket/docs/screenshot.png b/autogen/agents/experimental/agents_fastapi_websocket/docs/screenshot.png new file mode 100644 index 00000000000..435065453e6 Binary files /dev/null and b/autogen/agents/experimental/agents_fastapi_websocket/docs/screenshot.png differ diff --git a/autogen/agents/experimental/agents_fastapi_websocket/docs/usage_screenshot.png b/autogen/agents/experimental/agents_fastapi_websocket/docs/usage_screenshot.png new file mode 100644 index 00000000000..8b1cce8a623 Binary files /dev/null and b/autogen/agents/experimental/agents_fastapi_websocket/docs/usage_screenshot.png differ diff --git a/autogen/agents/experimental/agents_fastapi_websocket/helpers.py b/autogen/agents/experimental/agents_fastapi_websocket/helpers.py new file mode 100644 index 00000000000..10a6e9a9d7d --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/helpers.py @@ -0,0 +1,17 @@ +import asyncio +from collections.abc import Awaitable +from typing import Any + + +def async_to_sync(awaitable: Awaitable[Any]) -> Any: + """ + Convert an awaitable to a synchronous function call. + + Args: + awaitable: An awaitable object to execute synchronously + + Returns: + The result of the awaitable + """ + loop = asyncio.get_event_loop() + return loop.run_until_complete(awaitable) diff --git a/autogen/agents/experimental/agents_fastapi_websocket/main.py b/autogen/agents/experimental/agents_fastapi_websocket/main.py new file mode 100644 index 00000000000..187257bd680 --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/main.py @@ -0,0 +1,99 @@ +import os +import uuid +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from pathlib import Path + +# ✨ NEW: Add typing imports +import nest_asyncio +import uvicorn +import ws +from dependencies import cleanup_managers, init_managers +from dotenv import load_dotenv +from fastapi import FastAPI +from fastapi.middleware import Middleware +from fastapi.responses import HTMLResponse + +# from routers import apis +from starlette.middleware.cors import CORSMiddleware +from starlette.responses import RedirectResponse + +# Initialize application +load_dotenv() + +middleware = [ + Middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) +] + + +@asynccontextmanager +# ✨ CHANGED: Added return type annotation +async def lifespan(app: FastAPI) -> AsyncIterator[None]: + """ + Lifespan context manager for the FastAPI application + Handles startup and shutdown events + """ + nest_asyncio.apply() # 🧠 Patch the loop here + os.environ["HOME"] = os.path.expanduser("~") + try: + await init_managers() + except Exception: + raise + + yield # App is running and handling requests here + + # Shutdown: Close database connections and cleanup + try: + await cleanup_managers() + print("✅ Managers cleaned up successfully") + + except Exception: + raise + + +app = FastAPI(title="Autogen WebSocket", redoc_url=None, lifespan=lifespan, middleware=middleware) + +app.include_router( + ws.router, + prefix="/ws", + tags=["websocket"], + responses={404: {"description": "Not found"}}, +) + + +@app.get(path="autogen", include_in_schema=False) +# ✨ CHANGED: Added return type annotation +async def docs_redirect() -> RedirectResponse: + return RedirectResponse(url="redoc") + + +# WEBSOCKET Swagger Docs + + +@app.get("/", response_class=HTMLResponse) +# ✨ CHANGED: Added return type annotation +async def serve_html() -> HTMLResponse: + html_content = Path("templates/index.html").read_text(encoding="utf-8") + return HTMLResponse(content=html_content) + + +@app.get("/health") +# ✨ CHANGED: Added return type annotation +async def health_check() -> dict[str, str]: + """ + Health check endpoint + """ + return {"status": "ok", "message": "Service is running"} + + +@app.post("/chats/new_chat") +# ✨ CHANGED: Added return type annotation +async def new_chat() -> dict[str, str]: + """ + Health check endpoint + """ + return {"status": "ok", "chat_id": str(uuid.uuid4())} + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/autogen/agents/experimental/agents_fastapi_websocket/managers/cancellation_token.py b/autogen/agents/experimental/agents_fastapi_websocket/managers/cancellation_token.py new file mode 100644 index 00000000000..27a335abe06 --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/managers/cancellation_token.py @@ -0,0 +1,47 @@ +import threading +from asyncio import Future +from collections.abc import Callable +from typing import Any + + +class CancellationToken: + """A token used to cancel pending async calls""" + + def __init__(self) -> None: + self._cancelled: bool = False + self._lock: threading.Lock = threading.Lock() + self._callbacks: list[Callable[[], None]] = [] + + def cancel(self) -> None: + """Cancel pending async calls linked to this cancellation token.""" + with self._lock: + if not self._cancelled: + self._cancelled = True + for callback in self._callbacks: + callback() + + def is_cancelled(self) -> bool: + """Check if the CancellationToken has been used""" + with self._lock: + return self._cancelled + + def add_callback(self, callback: Callable[[], None]) -> None: + """Attach a callback that will be called when cancel is invoked""" + with self._lock: + if self._cancelled: + callback() + else: + self._callbacks.append(callback) + + def link_future(self, future: Future[Any]) -> Future[Any]: + """Link a pending async call to a token to allow its cancellation""" + with self._lock: + if self._cancelled: + future.cancel() + else: + + def _cancel() -> None: + future.cancel() + + self._callbacks.append(_cancel) + return future diff --git a/autogen/agents/experimental/agents_fastapi_websocket/managers/connection.py b/autogen/agents/experimental/agents_fastapi_websocket/managers/connection.py new file mode 100644 index 00000000000..909967ae04a --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/managers/connection.py @@ -0,0 +1,414 @@ +import asyncio +import traceback +from collections.abc import Awaitable, Callable +from datetime import datetime, timezone + +# ✨ NEW: Add Optional import +from typing import Any + +from fastapi import WebSocket, WebSocketDisconnect + +from .cancellation_token import CancellationToken +from .groupchat import AgentChat + +# Type aliases for input handling +InputFuncType = Callable[[str, str], Awaitable[str]] +InputRequestType = str # "text_input", "file_upload", etc. + + +class WebSocketManager: + """ + Simplified WebSocket manager for AG2AI GroupChat workflows + + Handles WebSocket connections, message streaming, and user input requests + without any external dependencies. + """ + + def __init__(self) -> None: + self._connections: dict[str, WebSocket] = {} + self._closed_connections: set[str] = set() + self._input_responses: dict[str, asyncio.Queue[str]] = {} + + self._active_tasks: dict[str, asyncio.Task[Any]] = {} + + self._cancellation_tokens: dict[str, CancellationToken] = {} + self._stop_flags: dict[str, bool] = {} + + async def connect(self, websocket: WebSocket, session_id: str) -> bool: + """ + Accept a new WebSocket connection + + Args: + websocket: FastAPI WebSocket instance + session_id: Unique identifier for this session + + Returns: + bool: True if connection successful, False otherwise + """ + try: + await websocket.accept() + self._connections[session_id] = websocket + self._closed_connections.discard(session_id) + self._input_responses[session_id] = asyncio.Queue() + self._stop_flags[session_id] = False + + await self._send_message( + session_id, + { + "type": "system", + "status": "connected", + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + ) + return True + except Exception: + return False + + async def start_chat_stream( + self, + session_id: str, + chat: AgentChat, + initial_message: dict[str, Any] = {}, + ) -> None: + """ + Start a chat stream over the WebSocket connection. + + Args: + session_id (str): The unique ID of the chat session. + initial_message (dict): The initial message sent by the user. + chat (AgentChat): The chat handler instance. + """ + if session_id not in self._connections or session_id in self._closed_connections: + raise ValueError(f"No active connection for session {session_id}") + + self._stop_flags[session_id] = False + cancellation_token = CancellationToken() + self._cancellation_tokens[session_id] = cancellation_token + + try: + # Send initial message if provided + if initial_message: + await self._send_message( + session_id, + { + "type": "message", + "data": { + "source": "user", + "content": initial_message, + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + }, + ) + + chat._set_cancellation_token(self._cancellation_tokens.get(session_id, None)) + task = asyncio.create_task( + chat.manager.a_initiate_chat(chat.manager, message=initial_message.get("task"), clear_history=False) + ) + self._active_tasks[session_id] = task + + # Wait for completion or cancellation + await task + + if not self._stop_flags.get(session_id, True) and session_id not in self._closed_connections: + await self._send_message( + session_id, + { + "type": "completion", + "status": "complete", + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + ) + else: + await self._send_message( + session_id, + { + "type": "completion", + "status": "cancelled", + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + ) + + except Exception as e: + traceback.print_exc() + await self._handle_stream_error(session_id, e) + finally: + self._cancellation_tokens.pop(session_id, None) + self._active_tasks.pop(session_id, None) + self._stop_flags.pop(session_id, None) + + def _create_input_func(self, session_id: str, timeout: int = 600) -> InputFuncType: + """ + Create an input function for requesting user input during chat + + Args: + session_id: Session identifier + timeout: Timeout in seconds for input response + + Returns: + Input function that can be used by chat handlers + """ + + async def input_handler( + prompt: str = "", + input_type: InputRequestType = "text_input", + ) -> str: + try: + # Send input request to client + await self._send_message( + session_id, + { + "type": "input_request", + "input_type": input_type, + "prompt": prompt, + "data": {"source": "system", "content": prompt}, + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + ) + + # Wait for response with timeout + if session_id in self._input_responses: + try: + + async def poll_for_response() -> str: + while True: + # Check if session was closed/stopped + if session_id in self._closed_connections or self._stop_flags.get(session_id, False): + raise ValueError("Session was closed or stopped") + + # Try to get response with short timeout + try: + response = await asyncio.wait_for( + self._input_responses[session_id].get(), + timeout=min(timeout, 5), + ) + return response + except asyncio.TimeoutError: + continue # Keep checking for closed status + + response = await asyncio.wait_for(poll_for_response(), timeout=timeout) + return response + + except asyncio.TimeoutError: + await self.stop_session(session_id, "Input timeout") + raise + else: + raise ValueError(f"No input queue for session {session_id}") + + except Exception: + raise + + return input_handler + + async def send_message(self, session_id: str, message: dict[str, Any]) -> None: + """ + Send a message to the client + + Args: + session_id: Session identifier + message: Message dictionary to send + """ + await self._send_message(session_id, message) + + async def send_chat_message(self, session_id: str, content: str, source: str = "assistant") -> None: + """ + Send a formatted chat message to the client + + Args: + session_id: Session identifier + content: Message content + source: Message source (e.g., "assistant", "user", agent name) + """ + await self._send_message( + session_id, + { + "type": "message", + "data": { + "source": source, + "content": content, + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + }, + ) + + async def send_agent_message(self, session_id: str, agent_name: str, content: str) -> None: + """ + Send a message from a specific agent + + Args: + session_id: Session identifier + agent_name: Name of the agent sending the message + content: Message content + """ + await self.send_chat_message(session_id, content, source=agent_name) + + async def handle_input_response(self, session_id: str, response: str) -> None: + """ + Handle input response from client + + Args: + session_id: Session identifier + response: User's input response + """ + if session_id in self._input_responses: + await self._input_responses[session_id].put(response) + else: + print(f"Received input response for inactive session {session_id}") + + async def stop_session(self, session_id: str, reason: str = "Session stopped") -> None: + """ + Stop an active session + + Args: + session_id: Session identifier + reason: Reason for stopping + """ + if session_id in self._cancellation_tokens: + try: + # Set stop flag + self._stop_flags[session_id] = True + # Cancel the task if it exists + if session_id in self._active_tasks: + task = self._active_tasks[session_id] + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + raise + except Exception as e: + raise e + + # Finally cancel the token + self._cancellation_tokens[session_id].cancel() + # Send completion message if connection is active + if session_id in self._connections and session_id not in self._closed_connections: + await self._send_message( + session_id, + { + "type": "completion", + "status": "stopped", + "reason": reason, + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + ) + + except Exception as e: + print(e) + raise + + async def disconnect(self, session_id: str) -> None: + """ + Clean up connection and associated resources + + Args: + session_id: Session identifier + """ + + # Mark as closed before cleanup + self._closed_connections.add(session_id) + + # Stop the session + await self.stop_session(session_id, "Connection closed") + + # Clean up resources + self._connections.pop(session_id, None) + self._cancellation_tokens.pop(session_id, None) + self._input_responses.pop(session_id, None) + self._active_tasks.pop(session_id, None) + self._stop_flags.pop(session_id, None) + + async def _send_message(self, session_id: str, message: dict[str, Any]) -> None: + """ + Internal method to send a message through WebSocket + + Args: + session_id: Session identifier + message: Message dictionary to send + """ + if session_id in self._closed_connections: + raise Exception(f"Session closed for {session_id}") + + try: + if session_id in self._connections: + websocket = self._connections[session_id] + await websocket.send_json(message) + except WebSocketDisconnect: + await self.disconnect(session_id) + except Exception: + await self.disconnect(session_id) + + async def _handle_stream_error(self, session_id: str, error: Exception) -> None: + """ + Handle stream errors + + Args: + session_id: Session identifier + error: Exception that occurred + """ + if session_id not in self._closed_connections: + await self._send_message( + session_id, + { + "type": "error", + "message": str(error), + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + ) + + async def cleanup(self) -> None: + """Clean up all active connections and resources""" + + try: + # Stop all sessions + for session_id in list(self._active_tasks.keys()): + self._stop_flags[session_id] = True + if session_id in self._cancellation_tokens: + self._cancellation_tokens[session_id].cancel() + + # Disconnect all websockets with timeout + + async def disconnect_all() -> None: + for session_id in list(self.active_connections): + try: + await asyncio.wait_for(self.disconnect(session_id), timeout=2) + except asyncio.TimeoutError: + raise + except Exception: + raise + + await asyncio.wait_for(disconnect_all(), timeout=10) + + except asyncio.TimeoutError: + raise + except Exception: + raise + finally: + # Clear all internal state + self._connections.clear() + self._cancellation_tokens.clear() + self._closed_connections.clear() + self._input_responses.clear() + self._active_tasks.clear() + self._stop_flags.clear() + + @property + def active_connections(self) -> set[str]: + """Get set of active session IDs""" + return set(self._connections.keys()) - self._closed_connections + + @property + def active_sessions(self) -> set[str]: + """Get set of sessions with active tasks""" + return set(self._active_tasks.keys()) + + def is_connected(self, session_id: str) -> bool: + """Check if a session is actively connected""" + return session_id in self._connections and session_id not in self._closed_connections + + def is_session_stopped(self, session_id: str) -> bool: + """Check if a session has been stopped""" + return self._stop_flags.get(session_id, False) + + def get_cancellation_token(self, session_id: str) -> CancellationToken | None: + """Get the cancellation token for a session""" + return self._cancellation_tokens.get(session_id) diff --git a/autogen/agents/experimental/agents_fastapi_websocket/managers/groupchat.py b/autogen/agents/experimental/agents_fastapi_websocket/managers/groupchat.py new file mode 100644 index 00000000000..2bbde5a830a --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/managers/groupchat.py @@ -0,0 +1,201 @@ +import asyncio +import json +import os +from datetime import datetime + +# ✨ NEW: Add Union and Optional imports +from typing import Any, Literal + +import autogen +from autogen import UserProxyAgent + +today_date = datetime.now() +from collections.abc import Awaitable, Callable + +from dependencies import get_websocket_manager +from helpers import async_to_sync +from managers import prompts + +from autogen import LLMConfig + + +class CustomGroupChatManager(autogen.GroupChatManager): + def __init__( + self, + groupchat: Any, + llm_config: Any, + chat_id: str | None = None, + human_input_mode: Literal["ALWAYS", "NEVER", "TERMINATE"] = "NEVER", + queue: asyncio.Queue[Any] | None = None, + ) -> None: + super().__init__(groupchat=groupchat, llm_config=llm_config, human_input_mode=human_input_mode) + self.queue = queue + self.chat_id = chat_id + + async def send_websocket(self, message: Any) -> None: + ws_manager = await get_websocket_manager() + await ws_manager.send_chat_message(session_id=self.chat_id, content=message, source="AgentChat") + return + + def _print_received_message( + self, message: dict[str, Any] | str, sender: "autogen.Agent", skip_head: bool = True + ) -> Any: + super()._print_received_message(message, sender) + content = message.get("content", "") if isinstance(message, dict) else str(message) + db_content = {"role": "user", "name": sender.name, "content": content} + db_content["raw_message"] = message + async_to_sync(self.send_websocket(db_content)) + + +class CustomUserProxyAgent(UserProxyAgent): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + def set_input_function(self, input_function: Callable[[str], Awaitable[str]]) -> Callable[[str], Awaitable[str]]: + self.input_function = input_function + return self.input_function + + async def a_get_human_input(self, prompt: str) -> str: + print("🔁 Custom logic before input...") + + prompt = json.dumps({"agent": self.name, "prompt": prompt}) + return await self.input_function(prompt) + + +class AgentChat: + def __init__(self, chat_id: str) -> None: + self.chat_id = chat_id + self.cancellation_token = None + self.llm_config = LLMConfig(api_type="openai", model="gpt-4o", api_key=os.environ["OPENAI_API_KEY"]) + + self.agents = self.get_agents() + self.get_chat_manager() + + def set_input_function(self, input_function: Any) -> Any: + self.input_function = input_function + return self.input_function + + def _set_cancellation_token(self, cancellation_token: Any) -> None: + self.cancellation_token = cancellation_token + + def _check_cancellation(self) -> None: + """Check if operations should be cancelled""" + if self.cancellation_token and self.cancellation_token.is_cancelled(): + raise asyncio.CancelledError(f"OMNI operation cancelled for session {self.chat_id}") + + def get_planner(self) -> autogen.AssistantAgent: + return autogen.AssistantAgent( + name="planner_agent", + llm_config=self.llm_config, + system_message=prompts.planner_agent.format(chart_location=os.getcwd()), + ) + + def get_code_writer(self) -> autogen.AssistantAgent: + return autogen.AssistantAgent( + name="code_writer_agent", llm_config=self.llm_config, system_message=prompts.code_writer + ) + + def get_code_executor(self) -> UserProxyAgent: + return UserProxyAgent( + name="code_executor_agent", + human_input_mode="NEVER", + code_execution_config={ + "work_dir": os.getcwd(), + "use_docker": False, + }, + llm_config=self.llm_config, + ) + + def get_code_debugger(self) -> autogen.AssistantAgent: + return autogen.AssistantAgent( + name="code_debugger_agent", + llm_config=self.llm_config, + max_consecutive_auto_reply=5, + system_message=prompts.debugger.format(chart_location=os.getcwd()), + ) + + def get_agent_aligner(self) -> autogen.AssistantAgent: + return autogen.AssistantAgent( + name="agent_aligner", + llm_config=self.llm_config, + description="This agent will align the whole process and will share which agent will work next. ", + system_message=prompts.agent_aligner, + ) + + def get_user_acceptance(self) -> CustomUserProxyAgent: + return CustomUserProxyAgent( + name="get_user_acceptance", + llm_config=self.llm_config, + human_input_mode="ALWAYS", + code_execution_config=False, + ) + + def get_process_completion(self) -> autogen.AssistantAgent: + return autogen.AssistantAgent( + name="process_completion_agent", + llm_config=self.llm_config, + system_message=prompts.process_completion, + ) + + def custom_speaker_selection_func(self, last_speaker: Any, groupchat: Any) -> Any | None: + try: + self._check_cancellation() + + messages = groupchat.messages + + if len(messages) == 1 or last_speaker.name == "chat_manager": + return groupchat.agent_by_name("agent_aligner") + if last_speaker.name == "agent_aligner": + if "planner_agent" in messages[-1]["content"]: + return groupchat.agent_by_name("planner_agent") + elif "code_writer_agent" in messages[-1]["content"]: + return groupchat.agent_by_name("code_writer_agent") + elif "code_executor_agent" in messages[-1]["content"]: + return groupchat.agent_by_name("code_executor_agent") + elif "process_completion_agent" in messages[-1]["content"]: + return groupchat.agent_by_name("process_completion_agent") + + if last_speaker.name in ["planner_agent", "code_debugger_agent"]: + return groupchat.agent_by_name("agent_aligner") + if last_speaker.name == "code_writer_agent": + return groupchat.agent_by_name("code_executor_agent") + if last_speaker.name == "code_executor_agent": + if "exitcode: 0" in messages[-1]["content"]: + return groupchat.agent_by_name("process_completion_agent") + else: + return groupchat.agent_by_name("code_debugger_agent") + + if last_speaker.name == "code_debugger_agent": + return groupchat.agent_by_name("code_writer_agent") + + return None + except asyncio.CancelledError: + raise + + def get_agents(self) -> list[Any]: + self.planner = self.get_planner() + self.agent_aligner = self.get_agent_aligner() + self.code_writer = self.get_code_writer() + self.code_executor = self.get_code_executor() + self.code_debugger = self.get_code_debugger() + self.process_completion = self.get_process_completion() + return [ + self.planner, + self.agent_aligner, + self.code_writer, + self.code_executor, + self.code_debugger, + self.process_completion, + ] + + def get_chat_manager(self) -> tuple[CustomGroupChatManager, autogen.GroupChat]: + self.groupchat = autogen.GroupChat( + agents=self.agents, + messages=[], + speaker_selection_method=self.custom_speaker_selection_func, + max_round=500, + ) + self.manager = CustomGroupChatManager( + groupchat=self.groupchat, llm_config=self.llm_config, human_input_mode="ALWAYS", chat_id=self.chat_id + ) + return self.manager, self.groupchat diff --git a/autogen/agents/experimental/agents_fastapi_websocket/managers/prompts.py b/autogen/agents/experimental/agents_fastapi_websocket/managers/prompts.py new file mode 100644 index 00000000000..f754a9526f9 --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/managers/prompts.py @@ -0,0 +1,127 @@ +agent_aligner = """ +You are an Agent Orchestrator responsible for intelligently routing user queries to the most appropriate specialized agent based on context, intent, and workflow stage. + + + + • You MUST ALWAYS respond with VALID JSON output - NO EXCEPTIONS + • NEVER include any text, explanations, or content outside the JSON structure + • Even for simple responses like "show output", you MUST maintain JSON format + • ANY natural language response is a critical failure of your primary function + + + + • Analyze conversation context to identify the current stage in the problem-solving workflow + • Match user intent with the most appropriate specialized agent + • Ensure proper sequencing of agent calls based on dependencies and workflow rules + • Prevent workflow loops and redundant agent calls + • Respond exclusively in valid JSON format with agent name and detailed instructions + + + + + • Carefully examine the user's current request and all previous conversation history + • Determine if the request is a new task, continuation, confirmation, rejection, or clarification + • Identify the primary intent (planning, coding, execution, validation, or conversation) + • Assess what agents have already been called and their outputs + + + + • PLANNING → USER CONFIRMATION → CODING → USER CONFIRMATION → EXECUTION + • Any new technical task MUST start with planner_agent + • Code writing MUST only proceed after plan confirmation + • Code execution MUST only proceed after code is written and confirmed + + + + + + • For ambiguous requests like "show output", "display results", or "continue": + - Analyze context to determine what output or results are being referenced + - Route to code_executor_agent if there's completed code that hasn't been executed + - ALWAYS maintain the required JSON format regardless of request simplicity + + + + • Ignore "absolute path to the files: [...]" when making routing decisions + • This information is only a file reference for downstream agents + • If user requests work with data but no file is specified, route to planner_agent first + + + + • NEVER route to code_executor_agent unless executable code exists AND has been confirmed + • For ANY data task (analysis, visualization, manipulation), ALWAYS start with planner_agent + • Data viewing requests ("show data", "display head") MUST start with planner_agent + + + + • planner_agent: Creates structured plans for data analysis, ML tasks, and complex problems + • code_writer_agent: Writes Python code based on confirmed plans or requirements + • code_executor_agent: Runs completed and confirmed code + + + + • New technical tasks → planner_agent + • After plan confirmation → code_writer_agent + • After code confirmation → code_executor_agent + + + + • Before routing to code_writer_agent: Verify plan exists AND has been confirmed + • Before routing to code_executor_agent: Verify code exists AND has been confirmed + • Before any implementation step: Verify user has explicitly approved the preceding step + • When unsure: Default to more conservative earlier workflow stage + + + + • Track the sequence of previous agent calls, NEVER CALL THE PREVIOUS AGENT + • Never call the same agent twice in succession unless explicitly requested + • If user rejects a plan/code: Return to the appropriate earlier stage agent + CHOOSE AGENT WISELY + + + + • Your ONLY acceptable output format is the following JSON structure: + { + "name": "agent_name", + "task": "Detailed instruction of what the agent should do based on current context" + } + • NEVER include any text before or after this JSON object + • NEVER include explanations about why you chose a particular agent + • NEVER use Markdown formatting for the JSON output + • Ensure the JSON object is properly formatted with quotes around keys and string values +""" + +planner_agent = """You are a Planner Agent. Your job is to understand user requests related to data analysis and create a step-by-step plan in english to achieve the desired outcome. + Always read the data from the location provided. + + + The Step by step plan should be exhaustive and would have all the steps which would be needed to solve the problem. + If possible try to write an alternative approach to the problem and let other agents decide how they want to solve it. + - Always save the plot with plt.save and save the plot in this location {chart_location}, + + + Clearly indicate which agent should be responsible for each step. + Consider the type of analysis requested (basic, analytics, forecasting, AI/ML) and plan accordingly. + **DO NOT GENERATE CODE YOURSELF.** Instruct the CodeWriter to generate the necessary code for each step.""" + +code_writer = """You are a python CodeWriter Agent. You receive instructions from the Planner and generate code in python to perform data analysis tasks. + - The code should be functional and complete. + - Specify the programming language python using code blocks (```python ... ```). + - Use subprocess.popen to do !pip install any module. + - Use appropriate libraries based on the task (pandas for general analysis, scikit-learn for ML, etc.). + - Always save the plot with plt.save and save the plot in this location {chart_location}, + """ + +debugger = """You are a Debugger Agent. Your role is to analyze code errors reported by the CodeExecutor, suggest fixes to the CodeWriter, and verify if the fixes resolve the issues. + - Clearly identify the error, its location, and possible causes. + - Suggest specific code modifications to the CodeWriter. + - Use subprocess.popen to do !pip install any module. + - Always save the plot with plt.save and save the plot in this location {chart_location}, + """ +process_completion = """Respond back with tabular format for sequential info. + Always provide the tabular response in Markdown. For example data head should be shown in markdown and so all the tabular information should be processed in markdown only. + Sequential information should be provided with complete information. + + Also try to provide tips for better process. For example, in machine learning provide them tips on better model training. Tips can be from model training, evaluation, feature engineering, or exploratory data analysis. + Also recommend two new questions to the user so that the conversation goes on. + """ diff --git a/autogen/agents/experimental/agents_fastapi_websocket/requirements.txt b/autogen/agents/experimental/agents_fastapi_websocket/requirements.txt new file mode 100644 index 00000000000..4e7bc5eb42c --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/requirements.txt @@ -0,0 +1,34 @@ +ag2==0.9.9 +annotated-types==0.7.0 +anyio==4.10.0 +asyncer==0.0.8 +certifi==2025.8.3 +charset-normalizer==3.4.3 +click==8.2.1 +diskcache==5.6.3 +distro==1.9.0 +docker==7.1.0 +fastapi==0.116.1 +h11==0.16.0 +httpcore==1.0.9 +httpx==0.28.1 +idna==3.10 +jiter==0.10.0 +nest-asyncio==1.6.0 +openai==1.76.2 +packaging==25.0 +pydantic==2.11.7 +pydantic_core==2.33.2 +python-dotenv==1.1.1 +regex==2025.7.34 +requests==2.32.5 +sniffio==1.3.1 +starlette==0.47.2 +termcolor==3.1.0 +tiktoken==0.11.0 +tqdm==4.67.1 +typing-inspection==0.4.1 +typing_extensions==4.14.1 +urllib3==2.5.0 +uvicorn==0.34.3 +websockets==15.0.1 diff --git a/autogen/agents/experimental/agents_fastapi_websocket/templates/index.html b/autogen/agents/experimental/agents_fastapi_websocket/templates/index.html new file mode 100644 index 00000000000..2ac534665f1 --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/templates/index.html @@ -0,0 +1,2102 @@ + + + + Enhanced WebSocket JSON Client + + + + + + + + + + + +
+ + + + + + + + + + + + + +
+ +
+
+
+

🚀 AG2AI WebSocket

+

Developed by: Suryaaa!

+ +
+
+
+
+ + + + +
+
+
+ + +
+ +
+
+

Controls

+ +
+ +
+ +
+
+
+ 💬 + Create New Chat +
+ +
+ +
+ + + + + +
+
+ + +
+
+
+ 🔗 + Connection +
+ +
+ +
+
+ +
+ + +
+
+ +
+
+ + Disconnected +
+ + +
+ +
+ + +
+
+
+ + +
+
+
+ 📤 + Send Message +
+ +
+ +
+
+ +
+ + +
+
+ +
+ +
+ +
+ + +
+
+
+ +
+ + +
+
+
+ + +
+
+
+ 🔍 + Search & Filter +
+ +
+ +
+
+ + +
+ +
+ + +
+ +
+ + + +
+
+
+
+
+ + +
+
+
+ + +
+
+

Messages

+
+ + + +
+
+ + + +
+
+
💬
+

No messages yet

+

Connect and start sending messages to see them here

+
+
+ + +
+
+
0
+
Sent
+
+
+
0
+
Received
+
+
+
0
+
Errors
+
+
+
--
+
Uptime
+
+
+
--
+
Quality
+
+
+
+
+
+ + + + diff --git a/autogen/agents/experimental/agents_fastapi_websocket/ws.py b/autogen/agents/experimental/agents_fastapi_websocket/ws.py new file mode 100644 index 00000000000..10675a2f0ad --- /dev/null +++ b/autogen/agents/experimental/agents_fastapi_websocket/ws.py @@ -0,0 +1,76 @@ +# api/ws.py +import asyncio +import json +from datetime import datetime +from typing import Any + +from dependencies import get_websocket_manager +from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect + +from .managers.connection import WebSocketManager +from .managers.groupchat import AgentChat + +router = APIRouter() + + +@router.websocket("/chat/{chat_id}") +async def run_websocket( + websocket: WebSocket, + chat_id: str, + ws_manager: WebSocketManager = Depends(get_websocket_manager), +) -> Any: + """WebSocket endpoint for run communication""" + chat = AgentChat(chat_id=chat_id) + input_func = ws_manager._create_input_func(chat_id) + chat.set_input_function(input_func) + # Connect websocket + connected = await ws_manager.connect(websocket, chat_id) + if not connected: + await websocket.close(code=4002, reason="Failed to establish connection") + return + + try: + while True: + try: + raw_message = await websocket.receive_text() + message = json.loads(raw_message) + + if message.get("type") == "start": + # Handle start message + if message.get("task"): + asyncio.create_task( + ws_manager.start_chat_stream(session_id=chat_id, initial_message=message, chat=chat) + ) + else: + await websocket.send_json({ + "type": "error", + "error": "Invalid start message format", + "timestamp": datetime.utcnow().isoformat(), + }) + + elif message.get("type") == "stop": + reason = message.get("reason") or "User requested stop/cancellation" + await ws_manager.stop_session(chat_id, reason=reason) + # break + + elif message.get("type") == "ping": + await websocket.send_json({"type": "pong", "timestamp": datetime.utcnow().isoformat()}) + + elif message.get("type") == "input_response": + # Handle input response from client + response = message.get("response") + if response is not None: + await ws_manager.handle_input_response(chat_id, response) + except json.JSONDecodeError: + await websocket.send_json({ + "type": "error", + "error": "Invalid message format", + "timestamp": datetime.utcnow().isoformat(), + }) + + except WebSocketDisconnect: + raise + except Exception: + raise + finally: + await ws_manager.disconnect(chat_id)