diff --git a/autogen/agents/experimental/agents_fastapi_websocket/README.md b/autogen/agents/experimental/agents_fastapi_websocket/README.md
new file mode 100644
index 00000000000..419da0241e9
--- /dev/null
+++ b/autogen/agents/experimental/agents_fastapi_websocket/README.md
@@ -0,0 +1,202 @@
+# 🔌 AG2AI WebSocket Agentic Workflow UI & Backend
+
+An interactive WebSocket-based UI and FastAPI backend for orchestrating agentic workflows using [AG2AI Autogen](https://github.com/ag2ai/ag2). This project demonstrates real-time, multi-agent collaboration for solving problems through a WebSocket-powered interface — ideal for tasks like data analysis, EDA, and more.
+
+
+
+---
+
+## 📘 How to Use the WebSocket UI
+
+To learn how to interact with the UI step-by-step, check out the full guide:
+
+➡️ [Usage Guide (UI Walkthrough)](./docs/USAGE_GUIDE.md)
+
+
+---
+
+## 📦 Overview
+
+This repository includes:
+
+- A clean **frontend UI** to interact with WebSockets visually — more intuitive than Postman or raw clients.
+- A **FastAPI backend** that manages real-time WebSocket communication and coordinates multiple agents via AG2AI Autogen.
+- A custom **orchestrator agent** (`agent_aligner`) that manages execution flow, ensuring orderly agent coordination.
+
+---
+
+## ✨ Features
+
+### ✅ Frontend
+
+- Interactive WebSocket client with formatted JSON display
+- Message blocks styled for clarity and separation
+- UUID-based client tracking
+- Send/receive messages with live updates
+- Manual message construction and quick templates
+
+### ✅ Backend
+
+- Built with **FastAPI** and **async WebSocket** handling
+- Modular architecture using manager classes
+- Custom `AgentChat` class for group chat orchestration
+- Real-time message streaming to frontend
+- Manual user input integration during live chat
+- Environment-based configuration via `.env`
+
+---
+
+## 🧩 Problem & Solution
+
+### ❌ The Problem We Faced
+
+While working with WebSocket-based agent systems using AG2AI/Autogen, we encountered several major bottlenecks that affected productivity and developer experience:
+
+- **Postman and raw WebSocket clients are not interactive**
+ These tools make it hard to follow multi-agent conversations. They lack formatting, which slows down debugging and understanding the data flow.
+
+- **Reading agent messages is time-consuming**
+ When working with multiple agents, reviewing each step (especially during prompt tuning or alignment) becomes tedious and error-prone.
+
+- **Lack of message formatting**
+ JSON responses from agents are dumped as raw strings, making them hard to read and troubleshoot — especially when nested or streamed.
+
+- **Frontend development was not feasible**
+ Building a fully custom UI in frameworks like React or Vue would add significant overhead and distract from core system development.
+
+- **No streamlined session management**
+ Keeping track of WebSocket sessions and switching between different chats was a manual and error-prone task.
+
+---
+
+### ✅ The Solution We Implemented
+
+To overcome these challenges, we built a minimal yet powerful **interactive WebSocket UI**, paired with a **FastAPI backend**, enabling seamless development and debugging of agent workflows.
+
+Key benefits:
+
+- **Clean, interactive WebSocket communication**
+ Live messages stream directly to the browser with proper formatting and role-based separation.
+
+- **Well-structured message display**
+ All messages are styled in blocks and automatically formatted as JSON, making it easy to inspect agent responses.
+
+- **Faster prompt tuning & agent alignment**
+ Developers can instantly see how agents respond, helping fine-tune prompts with clarity and speed.
+
+- **Quick session switching**
+ Chat sessions can be created and reused easily, improving workflow efficiency during development and testing.
+
+- **Minimal development effort**
+ A lightweight HTML/JS UI replaces the need for building a full-fledged frontend framework — saving time while still improving UX significantly.
+
+---
+
+This setup dramatically reduced the friction in debugging, testing, and managing agentic workflows — allowing us to focus on what matters: building smart and responsive agents.
+
+---
+
+## 🤖 Agents Overview
+
+This system supports the following agents for structured task completion:
+
+- **`planner_agent`**: Produces a step-by-step execution plan (no code).
+- **`code_writer`**: Converts the plan into working `python` code.
+- **`code_executor`**: Executes code in the local runtime environment.
+- **`debugger`**: Detects and resolves runtime errors; retries code.
+- **`process_completion`**: Summarizes results and guides the next steps.
+- `agent_aligner`: Coordinates the overall workflow, ensuring agents operate in the correct sequence. Enforces the execution flow (Plan → Confirm → Write → Confirm → Execute) to maintain structure, avoid loops, and ensure safe progression.
+
+
+---
+
+## 📁 Project Structure
+
+```
+
+.
+├── managers/
+│ ├── cancellation_token.py # Manages cancellation signals
+│ ├── connection.py # Handles socket connections
+│ ├── groupchat.py # AgentChat orchestration logic
+│ ├── prompts.py # Prompt templates and roles
+│
+├── templates/
+│ └── index.html # Frontend UI (WebSocket client)
+│
+├── .env # API keys and environment variables
+├── dependencies.py # AG2AI agent setup and configuration
+├── helpers.py # Utility functions
+├── main.py # FastAPI server entry point
+├── ws.py # WebSocket route handler
+├── requirements.txt # Python dependencies
+└── Readme.md # You're reading it!
+
+````
+
+---
+
+## 🚀 Getting Started
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/Suryaaa-Rathore/websocket-ag2ai.git
+cd websocket-ag2ai
+````
+
+### 2. Create and Activate Virtual Environment
+
+```bash
+python -m venv venv
+source venv/bin/activate # On Windows: venv\Scripts\activate
+```
+
+### 3. Install Dependencies
+
+```bash
+pip install -r requirements.txt
+```
+
+### 4. Set Your OpenAI API Key
+
+Create a `.env` file in the root directory:
+
+```
+OPENAI_API_KEY=your-openai-api-key
+```
+
+Or export the variable directly:
+
+```bash
+export OPENAI_API_KEY=your-openai-api-key
+```
+
+### 5. Run the Server
+
+```bash
+python main.py
+```
+
+### 6. Access the Frontend
+
+Open your browser and go to:
+
+```
+http://localhost:8000
+```
+
+---
+
+## 🔍 Discoverability Tags
+
+* FastAPI WebSocket Manager
+* AG2AI Autogen Orchestrator
+* Real-time agent workflows
+* WebSocket frontend UI
+* AI agent orchestration
+* Agentic problem solving with Python
+* Multi-agent system with streaming responses
+* Custom group chat with Autogen
+
+---
diff --git a/autogen/agents/experimental/agents_fastapi_websocket/dependencies.py b/autogen/agents/experimental/agents_fastapi_websocket/dependencies.py
new file mode 100644
index 00000000000..72377262013
--- /dev/null
+++ b/autogen/agents/experimental/agents_fastapi_websocket/dependencies.py
@@ -0,0 +1,65 @@
+# ✨ NEW: Add typing imports
+
+from fastapi import HTTPException, status
+
+from .managers.connection import WebSocketManager
+
+# ✨ CHANGED: Use Optional[WebSocketManager] instead of WebSocketManager | None
+_websocket_manager: WebSocketManager | None = None
+
+# Singleton instance of MongoDB service
+
+
+async def get_websocket_manager() -> WebSocketManager:
+ """Dependency provider for connection manager"""
+ if not _websocket_manager:
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail="Connection manager not initialized",
+ )
+ return _websocket_manager
+
+
+# Manager initialization and cleanup
+async def init_managers() -> None:
+ """Initialize all manager instances"""
+ global _websocket_manager
+
+ try:
+ # Initialize connection manager
+ _websocket_manager = WebSocketManager()
+
+ except Exception:
+ await cleanup_managers() # Cleanup any partially initialized managers
+ raise
+
+
+async def cleanup_managers() -> None:
+ """Cleanup and shutdown all manager instances"""
+ global _websocket_manager
+
+ # Cleanup connection manager first to ensure all active connections are closed
+ if _websocket_manager:
+ try:
+ await _websocket_manager.cleanup()
+ except Exception as e:
+ print(e)
+ raise
+ finally:
+ _websocket_manager = None
+
+
+# Utility functions for dependency management
+
+
+# Error handling for manager operations
+
+
+class ManagerOperationError(Exception):
+ """Custom exception for manager operation errors"""
+
+ def __init__(self, manager_name: str, operation: str, detail: str):
+ self.manager_name = manager_name
+ self.operation = operation
+ self.detail = detail
+ super().__init__(f"{manager_name} failed during {operation}: {detail}")
diff --git a/autogen/agents/experimental/agents_fastapi_websocket/docs/USAGE_GUIDE.md b/autogen/agents/experimental/agents_fastapi_websocket/docs/USAGE_GUIDE.md
new file mode 100644
index 00000000000..95aa271bef2
--- /dev/null
+++ b/autogen/agents/experimental/agents_fastapi_websocket/docs/USAGE_GUIDE.md
@@ -0,0 +1,104 @@
+# 🧪 AG2AI WebSocket UI – Step-by-Step Usage Guide
+
+This guide explains how to use the interactive WebSocket UI to send and receive agentic messages through a running AG2AI Autogen backend.
+
+---
+
+## 🖼️ Screenshot Reference
+
+
+
+---
+
+## 🚶♂️ Step-by-Step Instructions
+
+### 1️⃣ Create a New Chat Session
+Click on the **`Create New Chat`** button to generate a unique `chat_id`.
+This will be used to manage your session across messages.
+
+---
+
+### 2️⃣ Use the Chat ID
+After creating a chat, click **`Use`** next to the desired `chat_id`.
+This tells the system which session you're actively working in.
+
+---
+
+### 3️⃣ Connect to WebSocket
+Click the **`Connect`** button to establish a WebSocket connection with the backend server.
+Once connected, messages can be sent and received in real time.
+
+---
+
+### 4️⃣ Load a Query Template
+Click **`User Query`** to auto-fill a JSON template into the input box.
+This template follows the format expected by the backend.
+
+---
+
+### 5️⃣ Enter Your Task
+In the JSON input, fill in your prompt inside the `task` key.
+
+#### Example:
+```json
+{
+ "type": "start",
+ "task": "Analyze a dataset and generate EDA visualizations",
+ "files": []
+}
+````
+
+---
+
+### 6️⃣ Send the Message
+
+Click **`Send`** to dispatch the message.
+Wait for streaming agent responses to appear in the UI.
+
+---
+
+## 🧠 Optional: Respond to User Proxy Input Requests
+
+If your setup includes a **User Proxy agent** (i.e., the agent needs user confirmation/input during the flow), you can respond using the `User Input` feature.
+
+### When a prompt from the User Proxy appears:
+
+1. Click **`User Input`** to load a template into the message input box.
+2. Fill your response inside the `response` key.
+
+#### Example:
+
+```json
+{
+ "type": "input_response",
+ "response": "Yes, proceed with the next step",
+ "files": []
+}
+```
+
+3. Click **`Send`** to reply to the ongoing agent conversation.
+
+---
+
+## ✅ Tips
+
+* You can view each message block (sent/received) formatted with timestamps and roles.
+* JSON inputs are auto-formatted for better readability.
+* You can disconnect anytime using the **`Disconnect`** button.
+
+---
+
+## 🧪 Use Case Example
+
+> Want to try a full agentic workflow?
+> Try entering a task like:
+
+```json
+{
+ "type": "start",
+ "task": "Perform EDA on /path/to/titanic.csv dataset using Python",
+ "files": []
+}
+```
+
+and watch the agents plan, code, execute, debug, and summarize the task live.
diff --git a/autogen/agents/experimental/agents_fastapi_websocket/docs/screenshot.png b/autogen/agents/experimental/agents_fastapi_websocket/docs/screenshot.png
new file mode 100644
index 00000000000..435065453e6
Binary files /dev/null and b/autogen/agents/experimental/agents_fastapi_websocket/docs/screenshot.png differ
diff --git a/autogen/agents/experimental/agents_fastapi_websocket/docs/usage_screenshot.png b/autogen/agents/experimental/agents_fastapi_websocket/docs/usage_screenshot.png
new file mode 100644
index 00000000000..8b1cce8a623
Binary files /dev/null and b/autogen/agents/experimental/agents_fastapi_websocket/docs/usage_screenshot.png differ
diff --git a/autogen/agents/experimental/agents_fastapi_websocket/helpers.py b/autogen/agents/experimental/agents_fastapi_websocket/helpers.py
new file mode 100644
index 00000000000..10a6e9a9d7d
--- /dev/null
+++ b/autogen/agents/experimental/agents_fastapi_websocket/helpers.py
@@ -0,0 +1,17 @@
+import asyncio
+from collections.abc import Awaitable
+from typing import Any
+
+
+def async_to_sync(awaitable: Awaitable[Any]) -> Any:
+ """
+ Convert an awaitable to a synchronous function call.
+
+ Args:
+ awaitable: An awaitable object to execute synchronously
+
+ Returns:
+ The result of the awaitable
+ """
+ loop = asyncio.get_event_loop()
+ return loop.run_until_complete(awaitable)
diff --git a/autogen/agents/experimental/agents_fastapi_websocket/main.py b/autogen/agents/experimental/agents_fastapi_websocket/main.py
new file mode 100644
index 00000000000..187257bd680
--- /dev/null
+++ b/autogen/agents/experimental/agents_fastapi_websocket/main.py
@@ -0,0 +1,99 @@
+import os
+import uuid
+from collections.abc import AsyncIterator
+from contextlib import asynccontextmanager
+from pathlib import Path
+
+# ✨ NEW: Add typing imports
+import nest_asyncio
+import uvicorn
+import ws
+from dependencies import cleanup_managers, init_managers
+from dotenv import load_dotenv
+from fastapi import FastAPI
+from fastapi.middleware import Middleware
+from fastapi.responses import HTMLResponse
+
+# from routers import apis
+from starlette.middleware.cors import CORSMiddleware
+from starlette.responses import RedirectResponse
+
+# Initialize application
+load_dotenv()
+
+middleware = [
+ Middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
+]
+
+
+@asynccontextmanager
+# ✨ CHANGED: Added return type annotation
+async def lifespan(app: FastAPI) -> AsyncIterator[None]:
+ """
+ Lifespan context manager for the FastAPI application
+ Handles startup and shutdown events
+ """
+ nest_asyncio.apply() # 🧠 Patch the loop here
+ os.environ["HOME"] = os.path.expanduser("~")
+ try:
+ await init_managers()
+ except Exception:
+ raise
+
+ yield # App is running and handling requests here
+
+ # Shutdown: Close database connections and cleanup
+ try:
+ await cleanup_managers()
+ print("✅ Managers cleaned up successfully")
+
+ except Exception:
+ raise
+
+
+app = FastAPI(title="Autogen WebSocket", redoc_url=None, lifespan=lifespan, middleware=middleware)
+
+app.include_router(
+ ws.router,
+ prefix="/ws",
+ tags=["websocket"],
+ responses={404: {"description": "Not found"}},
+)
+
+
+@app.get(path="autogen", include_in_schema=False)
+# ✨ CHANGED: Added return type annotation
+async def docs_redirect() -> RedirectResponse:
+ return RedirectResponse(url="redoc")
+
+
+# WEBSOCKET Swagger Docs
+
+
+@app.get("/", response_class=HTMLResponse)
+# ✨ CHANGED: Added return type annotation
+async def serve_html() -> HTMLResponse:
+ html_content = Path("templates/index.html").read_text(encoding="utf-8")
+ return HTMLResponse(content=html_content)
+
+
+@app.get("/health")
+# ✨ CHANGED: Added return type annotation
+async def health_check() -> dict[str, str]:
+ """
+ Health check endpoint
+ """
+ return {"status": "ok", "message": "Service is running"}
+
+
+@app.post("/chats/new_chat")
+# ✨ CHANGED: Added return type annotation
+async def new_chat() -> dict[str, str]:
+ """
+ Health check endpoint
+ """
+ return {"status": "ok", "chat_id": str(uuid.uuid4())}
+
+
+if __name__ == "__main__":
+ uvicorn.run(app, host="0.0.0.0", port=8000)
diff --git a/autogen/agents/experimental/agents_fastapi_websocket/managers/cancellation_token.py b/autogen/agents/experimental/agents_fastapi_websocket/managers/cancellation_token.py
new file mode 100644
index 00000000000..27a335abe06
--- /dev/null
+++ b/autogen/agents/experimental/agents_fastapi_websocket/managers/cancellation_token.py
@@ -0,0 +1,47 @@
+import threading
+from asyncio import Future
+from collections.abc import Callable
+from typing import Any
+
+
+class CancellationToken:
+ """A token used to cancel pending async calls"""
+
+ def __init__(self) -> None:
+ self._cancelled: bool = False
+ self._lock: threading.Lock = threading.Lock()
+ self._callbacks: list[Callable[[], None]] = []
+
+ def cancel(self) -> None:
+ """Cancel pending async calls linked to this cancellation token."""
+ with self._lock:
+ if not self._cancelled:
+ self._cancelled = True
+ for callback in self._callbacks:
+ callback()
+
+ def is_cancelled(self) -> bool:
+ """Check if the CancellationToken has been used"""
+ with self._lock:
+ return self._cancelled
+
+ def add_callback(self, callback: Callable[[], None]) -> None:
+ """Attach a callback that will be called when cancel is invoked"""
+ with self._lock:
+ if self._cancelled:
+ callback()
+ else:
+ self._callbacks.append(callback)
+
+ def link_future(self, future: Future[Any]) -> Future[Any]:
+ """Link a pending async call to a token to allow its cancellation"""
+ with self._lock:
+ if self._cancelled:
+ future.cancel()
+ else:
+
+ def _cancel() -> None:
+ future.cancel()
+
+ self._callbacks.append(_cancel)
+ return future
diff --git a/autogen/agents/experimental/agents_fastapi_websocket/managers/connection.py b/autogen/agents/experimental/agents_fastapi_websocket/managers/connection.py
new file mode 100644
index 00000000000..909967ae04a
--- /dev/null
+++ b/autogen/agents/experimental/agents_fastapi_websocket/managers/connection.py
@@ -0,0 +1,414 @@
+import asyncio
+import traceback
+from collections.abc import Awaitable, Callable
+from datetime import datetime, timezone
+
+# ✨ NEW: Add Optional import
+from typing import Any
+
+from fastapi import WebSocket, WebSocketDisconnect
+
+from .cancellation_token import CancellationToken
+from .groupchat import AgentChat
+
+# Type aliases for input handling
+InputFuncType = Callable[[str, str], Awaitable[str]]
+InputRequestType = str # "text_input", "file_upload", etc.
+
+
+class WebSocketManager:
+ """
+ Simplified WebSocket manager for AG2AI GroupChat workflows
+
+ Handles WebSocket connections, message streaming, and user input requests
+ without any external dependencies.
+ """
+
+ def __init__(self) -> None:
+ self._connections: dict[str, WebSocket] = {}
+ self._closed_connections: set[str] = set()
+ self._input_responses: dict[str, asyncio.Queue[str]] = {}
+
+ self._active_tasks: dict[str, asyncio.Task[Any]] = {}
+
+ self._cancellation_tokens: dict[str, CancellationToken] = {}
+ self._stop_flags: dict[str, bool] = {}
+
+ async def connect(self, websocket: WebSocket, session_id: str) -> bool:
+ """
+ Accept a new WebSocket connection
+
+ Args:
+ websocket: FastAPI WebSocket instance
+ session_id: Unique identifier for this session
+
+ Returns:
+ bool: True if connection successful, False otherwise
+ """
+ try:
+ await websocket.accept()
+ self._connections[session_id] = websocket
+ self._closed_connections.discard(session_id)
+ self._input_responses[session_id] = asyncio.Queue()
+ self._stop_flags[session_id] = False
+
+ await self._send_message(
+ session_id,
+ {
+ "type": "system",
+ "status": "connected",
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ },
+ )
+ return True
+ except Exception:
+ return False
+
+ async def start_chat_stream(
+ self,
+ session_id: str,
+ chat: AgentChat,
+ initial_message: dict[str, Any] = {},
+ ) -> None:
+ """
+ Start a chat stream over the WebSocket connection.
+
+ Args:
+ session_id (str): The unique ID of the chat session.
+ initial_message (dict): The initial message sent by the user.
+ chat (AgentChat): The chat handler instance.
+ """
+ if session_id not in self._connections or session_id in self._closed_connections:
+ raise ValueError(f"No active connection for session {session_id}")
+
+ self._stop_flags[session_id] = False
+ cancellation_token = CancellationToken()
+ self._cancellation_tokens[session_id] = cancellation_token
+
+ try:
+ # Send initial message if provided
+ if initial_message:
+ await self._send_message(
+ session_id,
+ {
+ "type": "message",
+ "data": {
+ "source": "user",
+ "content": initial_message,
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ },
+ },
+ )
+
+ chat._set_cancellation_token(self._cancellation_tokens.get(session_id, None))
+ task = asyncio.create_task(
+ chat.manager.a_initiate_chat(chat.manager, message=initial_message.get("task"), clear_history=False)
+ )
+ self._active_tasks[session_id] = task
+
+ # Wait for completion or cancellation
+ await task
+
+ if not self._stop_flags.get(session_id, True) and session_id not in self._closed_connections:
+ await self._send_message(
+ session_id,
+ {
+ "type": "completion",
+ "status": "complete",
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ },
+ )
+ else:
+ await self._send_message(
+ session_id,
+ {
+ "type": "completion",
+ "status": "cancelled",
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ },
+ )
+
+ except Exception as e:
+ traceback.print_exc()
+ await self._handle_stream_error(session_id, e)
+ finally:
+ self._cancellation_tokens.pop(session_id, None)
+ self._active_tasks.pop(session_id, None)
+ self._stop_flags.pop(session_id, None)
+
+ def _create_input_func(self, session_id: str, timeout: int = 600) -> InputFuncType:
+ """
+ Create an input function for requesting user input during chat
+
+ Args:
+ session_id: Session identifier
+ timeout: Timeout in seconds for input response
+
+ Returns:
+ Input function that can be used by chat handlers
+ """
+
+ async def input_handler(
+ prompt: str = "",
+ input_type: InputRequestType = "text_input",
+ ) -> str:
+ try:
+ # Send input request to client
+ await self._send_message(
+ session_id,
+ {
+ "type": "input_request",
+ "input_type": input_type,
+ "prompt": prompt,
+ "data": {"source": "system", "content": prompt},
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ },
+ )
+
+ # Wait for response with timeout
+ if session_id in self._input_responses:
+ try:
+
+ async def poll_for_response() -> str:
+ while True:
+ # Check if session was closed/stopped
+ if session_id in self._closed_connections or self._stop_flags.get(session_id, False):
+ raise ValueError("Session was closed or stopped")
+
+ # Try to get response with short timeout
+ try:
+ response = await asyncio.wait_for(
+ self._input_responses[session_id].get(),
+ timeout=min(timeout, 5),
+ )
+ return response
+ except asyncio.TimeoutError:
+ continue # Keep checking for closed status
+
+ response = await asyncio.wait_for(poll_for_response(), timeout=timeout)
+ return response
+
+ except asyncio.TimeoutError:
+ await self.stop_session(session_id, "Input timeout")
+ raise
+ else:
+ raise ValueError(f"No input queue for session {session_id}")
+
+ except Exception:
+ raise
+
+ return input_handler
+
+ async def send_message(self, session_id: str, message: dict[str, Any]) -> None:
+ """
+ Send a message to the client
+
+ Args:
+ session_id: Session identifier
+ message: Message dictionary to send
+ """
+ await self._send_message(session_id, message)
+
+ async def send_chat_message(self, session_id: str, content: str, source: str = "assistant") -> None:
+ """
+ Send a formatted chat message to the client
+
+ Args:
+ session_id: Session identifier
+ content: Message content
+ source: Message source (e.g., "assistant", "user", agent name)
+ """
+ await self._send_message(
+ session_id,
+ {
+ "type": "message",
+ "data": {
+ "source": source,
+ "content": content,
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ },
+ },
+ )
+
+ async def send_agent_message(self, session_id: str, agent_name: str, content: str) -> None:
+ """
+ Send a message from a specific agent
+
+ Args:
+ session_id: Session identifier
+ agent_name: Name of the agent sending the message
+ content: Message content
+ """
+ await self.send_chat_message(session_id, content, source=agent_name)
+
+ async def handle_input_response(self, session_id: str, response: str) -> None:
+ """
+ Handle input response from client
+
+ Args:
+ session_id: Session identifier
+ response: User's input response
+ """
+ if session_id in self._input_responses:
+ await self._input_responses[session_id].put(response)
+ else:
+ print(f"Received input response for inactive session {session_id}")
+
+ async def stop_session(self, session_id: str, reason: str = "Session stopped") -> None:
+ """
+ Stop an active session
+
+ Args:
+ session_id: Session identifier
+ reason: Reason for stopping
+ """
+ if session_id in self._cancellation_tokens:
+ try:
+ # Set stop flag
+ self._stop_flags[session_id] = True
+ # Cancel the task if it exists
+ if session_id in self._active_tasks:
+ task = self._active_tasks[session_id]
+ if not task.done():
+ task.cancel()
+ try:
+ await task
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ raise e
+
+ # Finally cancel the token
+ self._cancellation_tokens[session_id].cancel()
+ # Send completion message if connection is active
+ if session_id in self._connections and session_id not in self._closed_connections:
+ await self._send_message(
+ session_id,
+ {
+ "type": "completion",
+ "status": "stopped",
+ "reason": reason,
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ },
+ )
+
+ except Exception as e:
+ print(e)
+ raise
+
+ async def disconnect(self, session_id: str) -> None:
+ """
+ Clean up connection and associated resources
+
+ Args:
+ session_id: Session identifier
+ """
+
+ # Mark as closed before cleanup
+ self._closed_connections.add(session_id)
+
+ # Stop the session
+ await self.stop_session(session_id, "Connection closed")
+
+ # Clean up resources
+ self._connections.pop(session_id, None)
+ self._cancellation_tokens.pop(session_id, None)
+ self._input_responses.pop(session_id, None)
+ self._active_tasks.pop(session_id, None)
+ self._stop_flags.pop(session_id, None)
+
+ async def _send_message(self, session_id: str, message: dict[str, Any]) -> None:
+ """
+ Internal method to send a message through WebSocket
+
+ Args:
+ session_id: Session identifier
+ message: Message dictionary to send
+ """
+ if session_id in self._closed_connections:
+ raise Exception(f"Session closed for {session_id}")
+
+ try:
+ if session_id in self._connections:
+ websocket = self._connections[session_id]
+ await websocket.send_json(message)
+ except WebSocketDisconnect:
+ await self.disconnect(session_id)
+ except Exception:
+ await self.disconnect(session_id)
+
+ async def _handle_stream_error(self, session_id: str, error: Exception) -> None:
+ """
+ Handle stream errors
+
+ Args:
+ session_id: Session identifier
+ error: Exception that occurred
+ """
+ if session_id not in self._closed_connections:
+ await self._send_message(
+ session_id,
+ {
+ "type": "error",
+ "message": str(error),
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ },
+ )
+
+ async def cleanup(self) -> None:
+ """Clean up all active connections and resources"""
+
+ try:
+ # Stop all sessions
+ for session_id in list(self._active_tasks.keys()):
+ self._stop_flags[session_id] = True
+ if session_id in self._cancellation_tokens:
+ self._cancellation_tokens[session_id].cancel()
+
+ # Disconnect all websockets with timeout
+
+ async def disconnect_all() -> None:
+ for session_id in list(self.active_connections):
+ try:
+ await asyncio.wait_for(self.disconnect(session_id), timeout=2)
+ except asyncio.TimeoutError:
+ raise
+ except Exception:
+ raise
+
+ await asyncio.wait_for(disconnect_all(), timeout=10)
+
+ except asyncio.TimeoutError:
+ raise
+ except Exception:
+ raise
+ finally:
+ # Clear all internal state
+ self._connections.clear()
+ self._cancellation_tokens.clear()
+ self._closed_connections.clear()
+ self._input_responses.clear()
+ self._active_tasks.clear()
+ self._stop_flags.clear()
+
+ @property
+ def active_connections(self) -> set[str]:
+ """Get set of active session IDs"""
+ return set(self._connections.keys()) - self._closed_connections
+
+ @property
+ def active_sessions(self) -> set[str]:
+ """Get set of sessions with active tasks"""
+ return set(self._active_tasks.keys())
+
+ def is_connected(self, session_id: str) -> bool:
+ """Check if a session is actively connected"""
+ return session_id in self._connections and session_id not in self._closed_connections
+
+ def is_session_stopped(self, session_id: str) -> bool:
+ """Check if a session has been stopped"""
+ return self._stop_flags.get(session_id, False)
+
+ def get_cancellation_token(self, session_id: str) -> CancellationToken | None:
+ """Get the cancellation token for a session"""
+ return self._cancellation_tokens.get(session_id)
diff --git a/autogen/agents/experimental/agents_fastapi_websocket/managers/groupchat.py b/autogen/agents/experimental/agents_fastapi_websocket/managers/groupchat.py
new file mode 100644
index 00000000000..2bbde5a830a
--- /dev/null
+++ b/autogen/agents/experimental/agents_fastapi_websocket/managers/groupchat.py
@@ -0,0 +1,201 @@
+import asyncio
+import json
+import os
+from datetime import datetime
+
+# ✨ NEW: Add Union and Optional imports
+from typing import Any, Literal
+
+import autogen
+from autogen import UserProxyAgent
+
+today_date = datetime.now()
+from collections.abc import Awaitable, Callable
+
+from dependencies import get_websocket_manager
+from helpers import async_to_sync
+from managers import prompts
+
+from autogen import LLMConfig
+
+
+class CustomGroupChatManager(autogen.GroupChatManager):
+ def __init__(
+ self,
+ groupchat: Any,
+ llm_config: Any,
+ chat_id: str | None = None,
+ human_input_mode: Literal["ALWAYS", "NEVER", "TERMINATE"] = "NEVER",
+ queue: asyncio.Queue[Any] | None = None,
+ ) -> None:
+ super().__init__(groupchat=groupchat, llm_config=llm_config, human_input_mode=human_input_mode)
+ self.queue = queue
+ self.chat_id = chat_id
+
+ async def send_websocket(self, message: Any) -> None:
+ ws_manager = await get_websocket_manager()
+ await ws_manager.send_chat_message(session_id=self.chat_id, content=message, source="AgentChat")
+ return
+
+ def _print_received_message(
+ self, message: dict[str, Any] | str, sender: "autogen.Agent", skip_head: bool = True
+ ) -> Any:
+ super()._print_received_message(message, sender)
+ content = message.get("content", "") if isinstance(message, dict) else str(message)
+ db_content = {"role": "user", "name": sender.name, "content": content}
+ db_content["raw_message"] = message
+ async_to_sync(self.send_websocket(db_content))
+
+
+class CustomUserProxyAgent(UserProxyAgent):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
+
+ def set_input_function(self, input_function: Callable[[str], Awaitable[str]]) -> Callable[[str], Awaitable[str]]:
+ self.input_function = input_function
+ return self.input_function
+
+ async def a_get_human_input(self, prompt: str) -> str:
+ print("🔁 Custom logic before input...")
+
+ prompt = json.dumps({"agent": self.name, "prompt": prompt})
+ return await self.input_function(prompt)
+
+
+class AgentChat:
+ def __init__(self, chat_id: str) -> None:
+ self.chat_id = chat_id
+ self.cancellation_token = None
+ self.llm_config = LLMConfig(api_type="openai", model="gpt-4o", api_key=os.environ["OPENAI_API_KEY"])
+
+ self.agents = self.get_agents()
+ self.get_chat_manager()
+
+ def set_input_function(self, input_function: Any) -> Any:
+ self.input_function = input_function
+ return self.input_function
+
+ def _set_cancellation_token(self, cancellation_token: Any) -> None:
+ self.cancellation_token = cancellation_token
+
+ def _check_cancellation(self) -> None:
+ """Check if operations should be cancelled"""
+ if self.cancellation_token and self.cancellation_token.is_cancelled():
+ raise asyncio.CancelledError(f"OMNI operation cancelled for session {self.chat_id}")
+
+ def get_planner(self) -> autogen.AssistantAgent:
+ return autogen.AssistantAgent(
+ name="planner_agent",
+ llm_config=self.llm_config,
+ system_message=prompts.planner_agent.format(chart_location=os.getcwd()),
+ )
+
+ def get_code_writer(self) -> autogen.AssistantAgent:
+ return autogen.AssistantAgent(
+ name="code_writer_agent", llm_config=self.llm_config, system_message=prompts.code_writer
+ )
+
+ def get_code_executor(self) -> UserProxyAgent:
+ return UserProxyAgent(
+ name="code_executor_agent",
+ human_input_mode="NEVER",
+ code_execution_config={
+ "work_dir": os.getcwd(),
+ "use_docker": False,
+ },
+ llm_config=self.llm_config,
+ )
+
+ def get_code_debugger(self) -> autogen.AssistantAgent:
+ return autogen.AssistantAgent(
+ name="code_debugger_agent",
+ llm_config=self.llm_config,
+ max_consecutive_auto_reply=5,
+ system_message=prompts.debugger.format(chart_location=os.getcwd()),
+ )
+
+ def get_agent_aligner(self) -> autogen.AssistantAgent:
+ return autogen.AssistantAgent(
+ name="agent_aligner",
+ llm_config=self.llm_config,
+ description="This agent will align the whole process and will share which agent will work next. ",
+ system_message=prompts.agent_aligner,
+ )
+
+ def get_user_acceptance(self) -> CustomUserProxyAgent:
+ return CustomUserProxyAgent(
+ name="get_user_acceptance",
+ llm_config=self.llm_config,
+ human_input_mode="ALWAYS",
+ code_execution_config=False,
+ )
+
+ def get_process_completion(self) -> autogen.AssistantAgent:
+ return autogen.AssistantAgent(
+ name="process_completion_agent",
+ llm_config=self.llm_config,
+ system_message=prompts.process_completion,
+ )
+
+ def custom_speaker_selection_func(self, last_speaker: Any, groupchat: Any) -> Any | None:
+ try:
+ self._check_cancellation()
+
+ messages = groupchat.messages
+
+ if len(messages) == 1 or last_speaker.name == "chat_manager":
+ return groupchat.agent_by_name("agent_aligner")
+ if last_speaker.name == "agent_aligner":
+ if "planner_agent" in messages[-1]["content"]:
+ return groupchat.agent_by_name("planner_agent")
+ elif "code_writer_agent" in messages[-1]["content"]:
+ return groupchat.agent_by_name("code_writer_agent")
+ elif "code_executor_agent" in messages[-1]["content"]:
+ return groupchat.agent_by_name("code_executor_agent")
+ elif "process_completion_agent" in messages[-1]["content"]:
+ return groupchat.agent_by_name("process_completion_agent")
+
+ if last_speaker.name in ["planner_agent", "code_debugger_agent"]:
+ return groupchat.agent_by_name("agent_aligner")
+ if last_speaker.name == "code_writer_agent":
+ return groupchat.agent_by_name("code_executor_agent")
+ if last_speaker.name == "code_executor_agent":
+ if "exitcode: 0" in messages[-1]["content"]:
+ return groupchat.agent_by_name("process_completion_agent")
+ else:
+ return groupchat.agent_by_name("code_debugger_agent")
+
+ if last_speaker.name == "code_debugger_agent":
+ return groupchat.agent_by_name("code_writer_agent")
+
+ return None
+ except asyncio.CancelledError:
+ raise
+
+ def get_agents(self) -> list[Any]:
+ self.planner = self.get_planner()
+ self.agent_aligner = self.get_agent_aligner()
+ self.code_writer = self.get_code_writer()
+ self.code_executor = self.get_code_executor()
+ self.code_debugger = self.get_code_debugger()
+ self.process_completion = self.get_process_completion()
+ return [
+ self.planner,
+ self.agent_aligner,
+ self.code_writer,
+ self.code_executor,
+ self.code_debugger,
+ self.process_completion,
+ ]
+
+ def get_chat_manager(self) -> tuple[CustomGroupChatManager, autogen.GroupChat]:
+ self.groupchat = autogen.GroupChat(
+ agents=self.agents,
+ messages=[],
+ speaker_selection_method=self.custom_speaker_selection_func,
+ max_round=500,
+ )
+ self.manager = CustomGroupChatManager(
+ groupchat=self.groupchat, llm_config=self.llm_config, human_input_mode="ALWAYS", chat_id=self.chat_id
+ )
+ return self.manager, self.groupchat
diff --git a/autogen/agents/experimental/agents_fastapi_websocket/managers/prompts.py b/autogen/agents/experimental/agents_fastapi_websocket/managers/prompts.py
new file mode 100644
index 00000000000..f754a9526f9
--- /dev/null
+++ b/autogen/agents/experimental/agents_fastapi_websocket/managers/prompts.py
@@ -0,0 +1,127 @@
+agent_aligner = """