A multi-agent AI research system with real-time token-based streaming capabilities. Built for the BNB AI Hackathon (Unibase Track).
git clone <your-repo-url>
cd kognys-agents-python-2
python3 -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp env-example .env # Fill in your API keys
uvicorn api_main:app --host 0.0.0.0 --port 8000 --reload
Server runs at http://localhost:8000
POST /papers
{
"message": "What are the latest developments in AI?",
"user_id": "user123"
}
Response:
{
"paper_id": "unique-id",
"paper_content": "Complete research paper..."
}
POST /papers/stream
- Primary API Endpoint
curl -X POST http://localhost:8000/papers/stream \
-H "Content-Type: application/json" \
-d '{"message": "Your question", "user_id": "user123"}' \
--no-buffer
β¨ Key Features:
- Token-by-token streaming from LLM responses
- Real-time progress updates as research happens
- Immediate feedback - see text being generated live
- Complete research workflow with live agent interactions
GET /papers/{paper_id}
- Get specific paper
GET /users/{user_id}/papers
- Get all user papers
The SSE endpoint provides granular, real-time events:
{"event_type": "research_started", "data": {"status": "Starting research process..."}}
{"event_type": "question_validated", "data": {"status": "Question validated and refined"}}
{"event_type": "documents_retrieved", "data": {"document_count": 5, "status": "Retrieved 5 relevant documents"}}
{"event_type": "draft_answer_token", "data": {"token": "The"}}
{"event_type": "draft_answer_token", "data": {"token": " importance"}}
{"event_type": "draft_answer_token", "data": {"token": " of"}}
{"event_type": "criticism_token", "data": {"criticism": "Needs more specific examples"}}
{"event_type": "final_answer_token", "data": {"token": "In"}}
{"event_type": "final_answer_token", "data": {"token": " conclusion"}}
{"event_type": "draft_generated", "data": {"status": "Initial draft generated"}}
{"event_type": "criticisms_received", "data": {"criticism_count": 2, "status": "Received 2 criticisms"}}
{"event_type": "orchestrator_decision", "data": {"decision": "FINALIZE", "status": "Orchestrator decided: FINALIZE"}}
{"event_type": "research_completed", "data": {"final_answer": "...", "status": "Research completed successfully"}}
{"event_type": "paper_generated", "data": {"paper_id": "uuid", "paper_content": "...", "status": "completed"}}
const eventSource = new EventSource("http://localhost:8000/papers/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
message: "What are the latest AI developments?",
user_id: "frontend-user",
}),
});
// Real-time token streaming
let currentDraft = "";
let currentCriticisms = [];
let finalAnswer = "";
eventSource.onmessage = function (event) {
const data = JSON.parse(event.data);
switch (data.event_type) {
case "research_started":
console.log("π Research started");
break;
case "draft_answer_token":
// Build draft token by token
currentDraft += data.data.token || "";
updateUI("draft", currentDraft);
break;
case "criticism_token":
// Collect criticisms as they arrive
currentCriticisms.push(data.data.criticism);
updateUI("criticisms", currentCriticisms);
break;
case "final_answer_token":
// Build final answer token by token
finalAnswer += data.data.token || "";
updateUI("final", finalAnswer);
break;
case "research_completed":
console.log("β
Research completed!");
console.log("Final answer:", data.data.final_answer);
break;
case "paper_generated":
console.log("π Paper generated with ID:", data.data.paper_id);
// Paper is ready for download/display
break;
case "error":
console.error("β Error:", data.data.error);
break;
}
};
function updateUI(section, content) {
// Update your UI components with streaming content
document.getElementById(section).textContent = content;
}
import requests
import json
def stream_research(question: str, user_id: str):
"""Stream research with real-time token updates."""
url = "http://localhost:8000/papers/stream"
payload = {"message": question, "user_id": user_id}
response = requests.post(url, json=payload, stream=True)
current_draft = ""
current_criticisms = []
final_answer = ""
for line in response.iter_lines():
if line:
# SSE format: "data: {json}\n\n"
if line.startswith(b'data: '):
json_data = line[6:] # Remove "data: "
event = json.loads(json_data)
event_type = event.get('event_type')
data = event.get('data', {})
if event_type == 'draft_answer_token':
current_draft += data.get('token', '')
print(f"Draft: {current_draft}")
elif event_type == 'criticism_token':
criticism = data.get('criticism')
current_criticisms.append(criticism)
print(f"Criticism: {criticism}")
elif event_type == 'final_answer_token':
final_answer += data.get('token', '')
print(f"Final: {final_answer}")
elif event_type == 'research_completed':
print("β
Research completed!")
return data.get('final_answer')
elif event_type == 'error':
print(f"β Error: {data.get('error')}")
break
# Usage
final_result = stream_research(
"What is token-based streaming in LLMs?",
"python-client"
)
The system automatically validates and reformulates questions:
- Input: "Tell me about AI"
- Reformulated: "What are the current state-of-the-art developments in artificial intelligence research?"
- π Validator - Improves question quality
- π Retriever - Searches academic sources
- βοΈ Synthesizer - Creates research drafts (token streaming)
- π€ Challenger - Provides critical feedback (token streaming)
- π― Orchestrator - Decides next steps + final answer (token streaming)
- π€ Publisher - Finalizes and stores results
Input Validator β Retriever β Synthesizer β Challenger β Orchestrator
β β β β
ββββββββββββββ΄βββββββββββββ΄βββββββ Decision:
- Research Again
- Revise Draft
- Finalize β Publisher
π₯ All agents stream tokens in real-time for immediate UI feedback!
# Run the comprehensive streaming test
python tests/test_token_streaming.py
This test validates:
- β Token-by-token streaming from all agents
- β Real-time progress events
- β Complete research workflow
- β Error handling and recovery
# Health check
curl http://localhost:8000/
# Standard research
curl -X POST http://localhost:8000/papers \
-H "Content-Type: application/json" \
-d '{"message": "AI ethics", "user_id": "test"}'
# Streaming research
curl -X POST http://localhost:8000/papers/stream \
-H "Content-Type: application/json" \
-d '{"message": "AI ethics", "user_id": "test"}' \
--no-buffer
Required in .env
:
GOOGLE_API_KEY=...
POWERFUL_LLM_MODEL=gpt-4
MONGODB_URI=mongodb://...
MEMBASE_API_KEY=...
MEMBASE_ID=kognys_starter
MEMBASE_API_URL=...
UNIBASE_DA_API_URL=...
Questions that are too broad, unclear, or non-research-worthy return:
{
"event_type": "validation_error",
"data": {
"error": "Question rejected by validator",
"suggestion": "Please rephrase your question..."
}
}
- Blockchain failures: Research continues gracefully
- API rate limits: Automatic retry with backoff
- Connection issues: Clear error events sent via SSE
- Token overflow: Handles partial responses elegantly
- π Immediate Response: Users see content being generated in real-time
- π Progress Tracking: Live visibility into research process
- β‘ Better UX: No waiting for complete responses
- π Interactive: Can process partial responses as they arrive
- πͺ Robust: Graceful handling of interruptions
- SSE: Simpler, more reliable, better for one-way streaming
- No WebSocket complexity: Easier integration and debugging
- Better browser support: Works with standard HTTP infrastructure
- Automatic reconnection: Built-in resilience
uvicorn api_main:app --reload
uvicorn api_main:app --host 0.0.0.0 --port 8000 --workers 1
Note: Use single worker for SSE streaming to maintain connection state.
- β Full SSE implementation - Removed WebSocket complexity
- β Token-by-token streaming - Real-time content generation
- β Async agent architecture - Better performance and streaming
- β Enhanced error handling - Graceful degradation for all failures
- β Improved efficiency - Reduced research cycles from 25+ to 3-4
- β Better blockchain integration - Robust retry logic and task validation
Built for BNB AI Hackathon - Real-time AI research with live token streaming π