Skip to content

Streaming chat cleanup #247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
43b69c4
refactoring/cleanup
jkwatson Jun 13, 2025
ead687c
type fixes and import cleanup
baasitsharief Jun 13, 2025
72bd426
name changes
baasitsharief Jun 13, 2025
98cd41f
wip
ewilliams-cloudera Jun 13, 2025
d1e4657
event testing
jkwatson Jun 13, 2025
2f080a8
wip event queue
baasitsharief Jun 13, 2025
61a6e65
moving poison pill around
ewilliams-cloudera Jun 13, 2025
d0f8232
event wip
jkwatson Jun 13, 2025
2bd1086
WIP event queue
baasitsharief Jun 13, 2025
aae5433
WIP event queue
baasitsharief Jun 13, 2025
6a3162c
WIP even queue
baasitsharief Jun 13, 2025
3085982
wip on chat events
ewilliams-cloudera Jun 13, 2025
8a0466a
work in progress on chat events
jkwatson Jun 13, 2025
6706fb1
WIP event queue
baasitsharief Jun 13, 2025
5c15d97
drop databases
mliu-cloudera Jun 13, 2025
d1ea0c9
wip on openai streaming events
ewilliams-cloudera Jun 13, 2025
208390e
send additional done after we're really done
jkwatson Jun 13, 2025
e111981
getting close to streaming events on non openai agents
baasitsharief Jun 13, 2025
155a37e
gracefully shutdown handler and close loop
baasitsharief Jun 13, 2025
6ab4d43
python cleanup
jkwatson Jun 13, 2025
2a94906
error handling in the non-openai streaming
jkwatson Jun 13, 2025
b2458c4
cleanup
jkwatson Jun 13, 2025
2ad8039
render contents of a tags and remove chat event queue
ewilliams-cloudera Jun 13, 2025
0f5b9ac
input for date tool
ewilliams-cloudera Jun 13, 2025
85ee90a
default input
ewilliams-cloudera Jun 13, 2025
1d88bac
fix duplicated timestamp issue
jkwatson Jun 13, 2025
084409b
mypy
jkwatson Jun 13, 2025
0fa1371
remove openaiagent
ewilliams-cloudera Jun 13, 2025
76a1b6c
Merge branch 'main' into mob/main
ewilliams-cloudera Jun 13, 2025
28fc83c
update lock file
ewilliams-cloudera Jun 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 16 additions & 40 deletions llm-service/app/routers/index/sessions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
import base64
import json
import logging
import queue
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Generator, Any

from fastapi import APIRouter, Header, HTTPException
from fastapi.responses import StreamingResponse
from llama_index.core.base.llms.types import ChatResponse
from pydantic import BaseModel
from starlette.responses import ContentStream
from starlette.types import Receive
Expand All @@ -63,8 +63,7 @@
from ....services.chat_history.paginator import paginate
from ....services.metadata_apis import session_metadata_api
from ....services.mlflow import rating_mlflow_log_metric, feedback_mlflow_log_table
from ....services.query.agents.tool_calling_querier import poison_pill
from ....services.query.chat_events import ToolEvent
from ....services.query.chat_events import ChatEvent
from ....services.session import rename_session

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -258,38 +257,9 @@ def stream_chat_completion(
session = session_metadata_api.get_session(session_id, user_name=origin_remote_user)
configuration = request.configuration or RagPredictConfiguration()

tool_events_queue: queue.Queue[ToolEvent] = queue.Queue()
# Create a cancellation event to signal when the client disconnects
cancel_event = threading.Event()

def tools_callback(chat_future: Future[Any]) -> Generator[str, None, None]:
while True:
# Check if client has disconnected
if cancel_event.is_set():
logger.info("Client disconnected, stopping tool callback")
# Try to cancel the future if it's still running
if not chat_future.done():
chat_future.cancel()
break

if chat_future.done() and (e := chat_future.exception()):
raise e

try:
event_data = tool_events_queue.get(block=True, timeout=1.0)
if event_data.type == poison_pill:
break
event_json = json.dumps({"event": event_data.model_dump()})
yield f"data: {event_json}\n\n"
except queue.Empty:
# Send a heartbeat event every second to keep the connection alive
heartbeat = ToolEvent(
type="event", name="Processing", timestamp=time.time()
)
event_json = json.dumps({"event": heartbeat.model_dump()})
yield f"data: {event_json}\n\n"
time.sleep(1)

def generate_stream() -> Generator[str, None, None]:
response_id: str = ""
executor = None
Expand All @@ -303,28 +273,31 @@ def generate_stream() -> Generator[str, None, None]:
query=request.query,
configuration=configuration,
user_name=origin_remote_user,
tool_events_queue=tool_events_queue,
)

# Yield from tools_callback, which will check for cancellation
yield from tools_callback(future)

# If we get here and the cancel_event is set, the client has disconnected
if cancel_event.is_set():
logger.info("Client disconnected, not processing results")
return

first_message = True
stream = future.result()
for response in stream:
for item in stream:
response: ChatResponse = item
# Check for cancellation between each response
if cancel_event.is_set():
logger.info("Client disconnected during result processing")
break

if "chat_event" in response.additional_kwargs:
chat_event: ChatEvent = response.additional_kwargs.get("chat_event")
event_json = json.dumps({"event": chat_event.model_dump()})
yield f"data: {event_json}\n\n"
continue
# send an initial message to let the client know the response stream is starting
if first_message:
done = ToolEvent(type="done", name="done", timestamp=time.time())
done = ChatEvent(
type="done", name="agent_done", timestamp=time.time()
)
event_json = json.dumps({"event": done.model_dump()})
yield f"data: {event_json}\n\n"
first_message = False
Expand All @@ -333,6 +306,9 @@ def generate_stream() -> Generator[str, None, None]:
yield f"data: {json_delta}\n\n"

if not cancel_event.is_set() and response_id:
done = ChatEvent(type="done", name="chat_done", timestamp=time.time())
event_json = json.dumps({"event": done.model_dump()})
yield f"data: {event_json}\n\n"
yield f'data: {{"response_id" : "{response_id}"}}\n\n'

except TimeoutError:
Expand Down
11 changes: 2 additions & 9 deletions llm-service/app/services/chat/streaming_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#
import time
import uuid
from queue import Queue
from typing import Optional, Generator

from llama_index.core.base.llms.types import ChatResponse, ChatMessage
Expand All @@ -59,12 +58,10 @@
from app.services.metadata_apis.session_metadata_api import Session
from app.services.mlflow import record_direct_llm_mlflow_run
from app.services.query import querier
from app.services.query.agents.tool_calling_querier import poison_pill
from app.services.query.chat_engine import (
FlexibleContextChatEngine,
build_flexible_chat_engine,
)
from app.services.query.chat_events import ToolEvent
from app.services.query.querier import (
build_retriever,
)
Expand All @@ -76,7 +73,6 @@ def stream_chat(
query: str,
configuration: RagPredictConfiguration,
user_name: Optional[str],
tool_events_queue: Queue[ToolEvent],
) -> Generator[ChatResponse, None, None]:
query_configuration = QueryConfiguration(
top_k=session.response_chunks,
Expand All @@ -100,12 +96,12 @@ def stream_chat(
len(session.data_source_ids) == 0 or total_data_sources_size == 0
):
# put a poison pill in the queue to stop the tool events stream
tool_events_queue.put(ToolEvent(type=poison_pill, name="no-op"))
return _stream_direct_llm_chat(session, response_id, query, user_name)

condensed_question, streaming_chat_response = build_streamer(
tool_events_queue, query, query_configuration, session
query, query_configuration, session
)

return _run_streaming_chat(
session,
response_id,
Expand All @@ -127,7 +123,6 @@ def _run_streaming_chat(
condensed_question: Optional[str] = None,
) -> Generator[ChatResponse, None, None]:
response: ChatResponse = ChatResponse(message=ChatMessage(content=query))

if streaming_chat_response.chat_stream:
for response in streaming_chat_response.chat_stream:
response.additional_kwargs["response_id"] = response_id
Expand All @@ -151,7 +146,6 @@ def _run_streaming_chat(


def build_streamer(
chat_events_queue: Queue[ToolEvent],
query: str,
query_configuration: QueryConfiguration,
session: Session,
Expand Down Expand Up @@ -180,7 +174,6 @@ def build_streamer(
query,
query_configuration,
chat_messages,
tool_events_queue=chat_events_queue,
session=session,
)
return condensed_question, streaming_chat_response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
# DATA.
#
from datetime import datetime
from typing import Any

from llama_index.core.tools import BaseTool, ToolOutput, ToolMetadata
from pydantic import BaseModel
Expand All @@ -46,17 +45,28 @@ class DateToolInput(BaseModel):
"""
Input schema for the DateTool
"""
input_: None = None

input: None = None


class DateTool(BaseTool):
"""
A tool that provides the current date and time.
"""

@property
def metadata(self) -> ToolMetadata:
return ToolMetadata(name="date_tool", description="A tool that provides the current date and time.", fn_schema=DateToolInput)
return ToolMetadata(
name="date_tool",
description="A tool that provides the current date and time.",
fn_schema=DateToolInput,
)

def __call__(self, input_: Any) -> ToolOutput:
def __call__(self, input: None=None) -> ToolOutput:
now = datetime.now()
return ToolOutput(content=f"The current date is {now.strftime('%Y-%m-%d %H:%M:%S')}", tool_name="date_tool", raw_input={}, raw_output=now)

return ToolOutput(
content=f"The current date is {now.strftime('%Y-%m-%d %H:%M:%S')}",
tool_name="date_tool",
raw_input={},
raw_output=now,
)
87 changes: 87 additions & 0 deletions llm-service/app/services/query/agents/agent_tools/mcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#
# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
# (C) Cloudera, Inc. 2025
# All rights reserved.
#
# Applicable Open Source License: Apache 2.0
#
# NOTE: Cloudera open source products are modular software products
# made up of hundreds of individual components, each of which was
# individually copyrighted. Each Cloudera open source product is a
# collective work under U.S. Copyright Law. Your license to use the
# collective work is as provided in your written agreement with
# Cloudera. Used apart from the collective work, this file is
# licensed for your use pursuant to the open source license
# identified above.
#
# This code is provided to you pursuant a written agreement with
# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
# this code. If you do not have a written agreement with Cloudera nor
# with an authorized and properly licensed third party, you do not
# have any rights to access nor to use this code.
#
# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the
# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
# DATA.
#
import json
import os
from copy import copy

from llama_index.core.tools import FunctionTool
from llama_index.tools.mcp import BasicMCPClient, McpToolSpec

from app.config import settings


def get_llama_index_tools(server_name: str) -> list[FunctionTool]:
"""
Find an MCP server by name in the mcp.json file and return the appropriate adapter.

Args:
server_name: The name of the MCP server to find

Returns:
An MCPServerAdapter configured for the specified server

Raises:
ValueError: If the server name is not found in the mcp.json file
"""
mcp_json_path = os.path.join(settings.tools_dir, "mcp.json")

with open(mcp_json_path, "r") as f:
mcp_config = json.load(f)

mcp_servers = mcp_config["mcp_servers"]
server_config = next(filter(lambda x: x["name"] == server_name, mcp_servers), None)

if server_config:
environment: dict[str, str] | None = copy(dict(os.environ))
if "env" in server_config and environment:
environment.update(server_config["env"])

if "command" in server_config:
client = BasicMCPClient(
command_or_url=server_config["command"],
args=server_config.get("args", []),
env=environment,
)
elif "url" in server_config:
client = BasicMCPClient(command_or_url=server_config["url"])
else:
raise ValueError("Not configured right...fixme")
tool_spec = McpToolSpec(client=client)
return tool_spec.to_tool_list()

raise ValueError(f"Invalid configuration for MCP server '{server_name}'")
Loading
Loading