From 43b69c4a3097bccc9b2fdc5925f3d3eacba68180 Mon Sep 17 00:00:00 2001 From: jwatson Date: Fri, 13 Jun 2025 08:16:52 -0700 Subject: [PATCH 01/29] refactoring/cleanup --- .../{tools => agents/agent_tools}/__init__.py | 0 .../{tools => agents/agent_tools}/date.py | 0 .../services/query/agents/agent_tools/mcp.py | 87 +++++++++++++++++++ .../agent_tools}/retriever.py | 0 .../query/agents/tool_calling_querier.py | 35 +++++--- llm-service/app/services/query/chat_engine.py | 7 ++ llm-service/app/services/query/querier.py | 65 +------------- 7 files changed, 118 insertions(+), 76 deletions(-) rename llm-service/app/services/query/{tools => agents/agent_tools}/__init__.py (100%) rename llm-service/app/services/query/{tools => agents/agent_tools}/date.py (100%) create mode 100644 llm-service/app/services/query/agents/agent_tools/mcp.py rename llm-service/app/services/query/{tools => agents/agent_tools}/retriever.py (100%) diff --git a/llm-service/app/services/query/tools/__init__.py b/llm-service/app/services/query/agents/agent_tools/__init__.py similarity index 100% rename from llm-service/app/services/query/tools/__init__.py rename to llm-service/app/services/query/agents/agent_tools/__init__.py diff --git a/llm-service/app/services/query/tools/date.py b/llm-service/app/services/query/agents/agent_tools/date.py similarity index 100% rename from llm-service/app/services/query/tools/date.py rename to llm-service/app/services/query/agents/agent_tools/date.py diff --git a/llm-service/app/services/query/agents/agent_tools/mcp.py b/llm-service/app/services/query/agents/agent_tools/mcp.py new file mode 100644 index 00000000..07e8e015 --- /dev/null +++ b/llm-service/app/services/query/agents/agent_tools/mcp.py @@ -0,0 +1,87 @@ +# +# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP) +# (C) Cloudera, Inc. 2025 +# All rights reserved. +# +# Applicable Open Source License: Apache 2.0 +# +# NOTE: Cloudera open source products are modular software products +# made up of hundreds of individual components, each of which was +# individually copyrighted. Each Cloudera open source product is a +# collective work under U.S. Copyright Law. Your license to use the +# collective work is as provided in your written agreement with +# Cloudera. Used apart from the collective work, this file is +# licensed for your use pursuant to the open source license +# identified above. +# +# This code is provided to you pursuant a written agreement with +# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute +# this code. If you do not have a written agreement with Cloudera nor +# with an authorized and properly licensed third party, you do not +# have any rights to access nor to use this code. +# +# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the +# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY +# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED +# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO +# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND +# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU, +# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS +# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE +# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY +# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR +# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES +# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF +# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF +# DATA. +# +import json +import os +from copy import copy + +from llama_index.core.tools import FunctionTool +from llama_index.tools.mcp import BasicMCPClient, McpToolSpec + +from app.config import settings + + +def get_llama_index_tools(server_name: str) -> list[FunctionTool]: + """ + Find an MCP server by name in the mcp.json file and return the appropriate adapter. + + Args: + server_name: The name of the MCP server to find + + Returns: + An MCPServerAdapter configured for the specified server + + Raises: + ValueError: If the server name is not found in the mcp.json file + """ + mcp_json_path = os.path.join(settings.tools_dir, "mcp.json") + + with open(mcp_json_path, "r") as f: + mcp_config = json.load(f) + + mcp_servers = mcp_config["mcp_servers"] + server_config = next(filter(lambda x: x["name"] == server_name, mcp_servers), None) + + if server_config: + environment: dict[str, str] | None = copy(dict(os.environ)) + if "env" in server_config and environment: + environment.update(server_config["env"]) + + if "command" in server_config: + client = BasicMCPClient( + command_or_url=server_config["command"], + args=server_config.get("args", []), + env=environment, + ) + elif "url" in server_config: + client = BasicMCPClient(command_or_url=server_config["url"]) + else: + raise ValueError("Not configured right...fixme") + tool_spec = McpToolSpec(client=client) + return tool_spec.to_tool_list() + + raise ValueError(f"Invalid configuration for MCP server '{server_name}'") diff --git a/llm-service/app/services/query/tools/retriever.py b/llm-service/app/services/query/agents/agent_tools/retriever.py similarity index 100% rename from llm-service/app/services/query/tools/retriever.py rename to llm-service/app/services/query/agents/agent_tools/retriever.py diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index ffb36695..a2f45a6a 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -56,6 +56,8 @@ from llama_index.llms.openai import OpenAI from app.ai.indexing.summary_indexer import SummaryIndexer +from app.services.metadata_apis.session_metadata_api import Session +from app.services.query.agents.agent_tools.mcp import get_llama_index_tools from app.services.query.chat_engine import ( FlexibleContextChatEngine, ) @@ -70,7 +72,7 @@ ) logger = logging.getLogger(__name__) -# litellm._turn_on_debug() + poison_pill = "poison_pill" @@ -142,19 +144,28 @@ def stream_chat( chat_engine: Optional[FlexibleContextChatEngine], enhanced_query: str, chat_messages: list[ChatMessage], - additional_tools: list[BaseTool], + session: Session, data_source_summaries: dict[int, str], ) -> StreamingAgentChatResponse: + mcp_tools: list[BaseTool] = [] + if session.query_configuration and session.query_configuration.selected_tools: + for tool_name in session.query_configuration.selected_tools: + try: + mcp_tools.extend(get_llama_index_tools(tool_name)) + except ValueError as e: + logger.warning(f"Could not create adapter for tool {tool_name}: {e}") + continue + # Use the existing chat engine with the enhanced query for streaming response tools: list[BaseTool] = [DateTool()] if use_retrieval and chat_engine: retrieval_tool = build_retriever_tool( - retriever=chat_engine._retriever, + retriever=chat_engine.retriever, summaries=data_source_summaries, - node_postprocessors=chat_engine._node_postprocessors, + node_postprocessors=chat_engine.node_postprocessors, ) tools.append(retrieval_tool) - tools.extend(additional_tools) + tools.extend(mcp_tools) if isinstance(llm, OpenAI): gen, source_nodes = _openai_agent_streamer( chat_messages, enhanced_query, llm, tools @@ -187,14 +198,14 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: async for event in handler.stream_events(): if isinstance(event, ToolCall): if verbose and not isinstance(event, ToolCallResult): - print("=== Calling Function ===") - print( + logger.info("=== Calling Function ===") + logger.info( f"Calling function: {event.tool_name} with args: {event.tool_kwargs}" ) if isinstance(event, ToolCallResult): if verbose: - print(f"Got output: {event.tool_output!s}") - print("========================") + logger.info(f"Got output: {event.tool_output!s}") + logger.info("========================") if ( event.tool_output.raw_output and isinstance(event.tool_output.raw_output, list) @@ -237,11 +248,11 @@ async def collect() -> list[ChatResponse]: for item in asyncio.run(collect()): yield item if verbose: - print("=== LLM Response ===") - print( + logger.info("=== LLM Response ===") + logger.info( f"{item.message.content.strip() if item.message.content else 'No content'}" ) - print("========================") + logger.info("========================") return gen(), source_nodes diff --git a/llm-service/app/services/query/chat_engine.py b/llm-service/app/services/query/chat_engine.py index c1dc1b81..0ae969cd 100644 --- a/llm-service/app/services/query/chat_engine.py +++ b/llm-service/app/services/query/chat_engine.py @@ -225,6 +225,13 @@ def _run_c3( return response_synthesizer, context_source, context_nodes + @property + def retriever(self): + return self._retriever + + @property + def node_postprocessors(self): + return self._node_postprocessors def build_flexible_chat_engine( configuration: QueryConfiguration, diff --git a/llm-service/app/services/query/querier.py b/llm-service/app/services/query/querier.py index b1083724..962a3be5 100644 --- a/llm-service/app/services/query/querier.py +++ b/llm-service/app/services/query/querier.py @@ -29,10 +29,7 @@ # ############################################################################## from __future__ import annotations -import json -import os import re -from copy import copy from queue import Queue from typing import Optional, TYPE_CHECKING, cast @@ -41,8 +38,6 @@ from llama_index.core.llms import LLM from llama_index.core.llms.function_calling import FunctionCallingLLM from llama_index.core.schema import NodeWithScore -from llama_index.core.tools import BaseTool as LLamaTool -from llama_index.core.tools import FunctionTool from .agents.tool_calling_querier import ( should_use_retrieval, @@ -53,7 +48,6 @@ from .flexible_retriever import FlexibleRetriever from .multi_retriever import MultiSourceRetriever from ..metadata_apis.session_metadata_api import Session -from ...config import settings if TYPE_CHECKING: from ..chat.utils import RagContext @@ -73,53 +67,10 @@ from app.services.query.query_configuration import QueryConfiguration from .chat_engine import build_flexible_chat_engine, FlexibleContextChatEngine from ...ai.vector_stores.vector_store_factory import VectorStoreFactory -from llama_index.tools.mcp import BasicMCPClient, McpToolSpec logger = logging.getLogger(__name__) -def get_llama_index_tools(server_name: str) -> list[FunctionTool]: - """ - Find an MCP server by name in the mcp.json file and return the appropriate adapter. - - Args: - server_name: The name of the MCP server to find - - Returns: - An MCPServerAdapter configured for the specified server - - Raises: - ValueError: If the server name is not found in the mcp.json file - """ - mcp_json_path = os.path.join(settings.tools_dir, "mcp.json") - - with open(mcp_json_path, "r") as f: - mcp_config = json.load(f) - - mcp_servers = mcp_config["mcp_servers"] - server_config = next(filter(lambda x: x["name"] == server_name, mcp_servers), None) - - if server_config: - environment: dict[str, str] | None = copy(dict(os.environ)) - if "env" in server_config and environment: - environment.update(server_config["env"]) - - if "command" in server_config: - client = BasicMCPClient( - command_or_url=server_config["command"], - args=server_config.get("args", []), - env=environment, - ) - elif "url" in server_config: - client = BasicMCPClient(command_or_url=server_config["url"]) - else: - raise ValueError("Not configured right...fixme") - tool_spec = McpToolSpec(client=client) - return tool_spec.to_tool_list() - - raise ValueError(f"Invalid configuration for MCP server '{server_name}'") - - def streaming_query( chat_engine: Optional[FlexibleContextChatEngine], query_str: str, @@ -128,20 +79,6 @@ def streaming_query( tool_events_queue: Queue[ToolEvent], session: Session, ) -> StreamingAgentChatResponse: - all_tools: list[LLamaTool] = [] - - if session.query_configuration and session.query_configuration.selected_tools: - for tool_name in session.query_configuration.selected_tools: - try: - llama_tools = get_llama_index_tools(tool_name) - # print( - # f"Adding adapter for tools: {[tool.name for tool in adapter.tools]}" - # ) - all_tools.extend(llama_tools) - except ValueError as e: - logger.warning(f"Could not create adapter for tool {tool_name}: {e}") - continue - llm = models.LLM.get(model_name=configuration.model_name) chat_response: StreamingAgentChatResponse @@ -156,7 +93,7 @@ def streaming_query( chat_engine, query_str, chat_messages, - all_tools, + session, data_source_summaries, ) tool_events_queue.put(ToolEvent(type=poison_pill, name="no-op")) From ead687c9a835dda083c478a4a54410ad5dc41fd7 Mon Sep 17 00:00:00 2001 From: Baasit Sharief Date: Fri, 13 Jun 2025 08:23:01 -0700 Subject: [PATCH 02/29] type fixes and import cleanup --- .../app/services/query/agents/tool_calling_querier.py | 4 ++-- llm-service/app/services/query/chat_engine.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index a2f45a6a..7e751aca 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -61,8 +61,8 @@ from app.services.query.chat_engine import ( FlexibleContextChatEngine, ) -from app.services.query.tools.date import DateTool -from app.services.query.tools.retriever import ( +from app.services.query.agents.agent_tools.date import DateTool +from app.services.query.agents.agent_tools.retriever import ( build_retriever_tool, ) diff --git a/llm-service/app/services/query/chat_engine.py b/llm-service/app/services/query/chat_engine.py index 0ae969cd..9f4caeef 100644 --- a/llm-service/app/services/query/chat_engine.py +++ b/llm-service/app/services/query/chat_engine.py @@ -226,13 +226,14 @@ def _run_c3( return response_synthesizer, context_source, context_nodes @property - def retriever(self): + def retriever(self) -> BaseRetriever: return self._retriever @property - def node_postprocessors(self): + def node_postprocessors(self) -> List[BaseNodePostprocessor]: return self._node_postprocessors + def build_flexible_chat_engine( configuration: QueryConfiguration, llm: LLM, From 72bd4264cf33fe1a13fa6425a980dac91d28d9c7 Mon Sep 17 00:00:00 2001 From: Baasit Sharief Date: Fri, 13 Jun 2025 08:28:12 -0700 Subject: [PATCH 03/29] name changes lastFile:ui/src/pages/RagChatTab/State/RagChatContext.tsx --- llm-service/app/routers/index/sessions/__init__.py | 8 ++++---- llm-service/app/services/chat/streaming_chat.py | 10 +++++----- llm-service/app/services/query/chat_events.py | 4 ++-- llm-service/app/services/query/querier.py | 8 ++++---- ui/src/api/chatApi.ts | 12 ++++++------ ui/src/pages/RagChatTab/ChatLayout.tsx | 4 ++-- .../ChatOutput/ChatMessages/ChatMessageBody.tsx | 4 ++-- .../ChatOutput/ChatMessages/StreamedEvents.tsx | 6 +++--- ui/src/pages/RagChatTab/State/RagChatContext.tsx | 7 ++----- 9 files changed, 30 insertions(+), 33 deletions(-) diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index 42c9212f..aef19eb5 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -64,7 +64,7 @@ from ....services.metadata_apis import session_metadata_api from ....services.mlflow import rating_mlflow_log_metric, feedback_mlflow_log_table from ....services.query.agents.tool_calling_querier import poison_pill -from ....services.query.chat_events import ToolEvent +from ....services.query.chat_events import ChatEvent from ....services.session import rename_session logger = logging.getLogger(__name__) @@ -258,7 +258,7 @@ def stream_chat_completion( session = session_metadata_api.get_session(session_id, user_name=origin_remote_user) configuration = request.configuration or RagPredictConfiguration() - tool_events_queue: queue.Queue[ToolEvent] = queue.Queue() + tool_events_queue: queue.Queue[ChatEvent] = queue.Queue() # Create a cancellation event to signal when the client disconnects cancel_event = threading.Event() @@ -283,7 +283,7 @@ def tools_callback(chat_future: Future[Any]) -> Generator[str, None, None]: yield f"data: {event_json}\n\n" except queue.Empty: # Send a heartbeat event every second to keep the connection alive - heartbeat = ToolEvent( + heartbeat = ChatEvent( type="event", name="Processing", timestamp=time.time() ) event_json = json.dumps({"event": heartbeat.model_dump()}) @@ -324,7 +324,7 @@ def generate_stream() -> Generator[str, None, None]: # send an initial message to let the client know the response stream is starting if first_message: - done = ToolEvent(type="done", name="done", timestamp=time.time()) + done = ChatEvent(type="done", name="done", timestamp=time.time()) event_json = json.dumps({"event": done.model_dump()}) yield f"data: {event_json}\n\n" first_message = False diff --git a/llm-service/app/services/chat/streaming_chat.py b/llm-service/app/services/chat/streaming_chat.py index 620e702e..7b7b813a 100644 --- a/llm-service/app/services/chat/streaming_chat.py +++ b/llm-service/app/services/chat/streaming_chat.py @@ -64,7 +64,7 @@ FlexibleContextChatEngine, build_flexible_chat_engine, ) -from app.services.query.chat_events import ToolEvent +from app.services.query.chat_events import ChatEvent from app.services.query.querier import ( build_retriever, ) @@ -76,7 +76,7 @@ def stream_chat( query: str, configuration: RagPredictConfiguration, user_name: Optional[str], - tool_events_queue: Queue[ToolEvent], + tool_events_queue: Queue[ChatEvent], ) -> Generator[ChatResponse, None, None]: query_configuration = QueryConfiguration( top_k=session.response_chunks, @@ -100,7 +100,7 @@ def stream_chat( len(session.data_source_ids) == 0 or total_data_sources_size == 0 ): # put a poison pill in the queue to stop the tool events stream - tool_events_queue.put(ToolEvent(type=poison_pill, name="no-op")) + tool_events_queue.put(ChatEvent(type=poison_pill, name="no-op")) return _stream_direct_llm_chat(session, response_id, query, user_name) condensed_question, streaming_chat_response = build_streamer( @@ -151,7 +151,7 @@ def _run_streaming_chat( def build_streamer( - chat_events_queue: Queue[ToolEvent], + chat_events_queue: Queue[ChatEvent], query: str, query_configuration: QueryConfiguration, session: Session, @@ -180,7 +180,7 @@ def build_streamer( query, query_configuration, chat_messages, - tool_events_queue=chat_events_queue, + chat_event_queue=chat_events_queue, session=session, ) return condensed_question, streaming_chat_response diff --git a/llm-service/app/services/query/chat_events.py b/llm-service/app/services/query/chat_events.py index 92f4b714..895554b2 100644 --- a/llm-service/app/services/query/chat_events.py +++ b/llm-service/app/services/query/chat_events.py @@ -43,7 +43,7 @@ from pydantic import BaseModel -class ToolEvent(BaseModel): +class ChatEvent(BaseModel): type: str name: str data: Optional[str] = None @@ -51,7 +51,7 @@ class ToolEvent(BaseModel): def step_callback( - output: Any, agent: str, tool_events_queue: Queue[ToolEvent] + output: Any, agent: str, tool_events_queue: Queue[ChatEvent] ) -> None: # todo: hook this up return None diff --git a/llm-service/app/services/query/querier.py b/llm-service/app/services/query/querier.py index 962a3be5..c230e0db 100644 --- a/llm-service/app/services/query/querier.py +++ b/llm-service/app/services/query/querier.py @@ -44,7 +44,7 @@ stream_chat, poison_pill, ) -from .chat_events import ToolEvent +from .chat_events import ChatEvent from .flexible_retriever import FlexibleRetriever from .multi_retriever import MultiSourceRetriever from ..metadata_apis.session_metadata_api import Session @@ -76,7 +76,7 @@ def streaming_query( query_str: str, configuration: QueryConfiguration, chat_messages: list[ChatMessage], - tool_events_queue: Queue[ToolEvent], + chat_event_queue: Queue[ChatEvent], session: Session, ) -> StreamingAgentChatResponse: llm = models.LLM.get(model_name=configuration.model_name) @@ -96,7 +96,7 @@ def streaming_query( session, data_source_summaries, ) - tool_events_queue.put(ToolEvent(type=poison_pill, name="no-op")) + chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) return chat_response if not chat_engine: raise HTTPException( @@ -106,7 +106,7 @@ def streaming_query( try: chat_response = chat_engine.stream_chat(query_str, chat_messages) - tool_events_queue.put(ToolEvent(type=poison_pill, name="no-op")) + chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) logger.debug("query response received from chat engine") except botocore.exceptions.ClientError as error: logger.warning(error.response) diff --git a/ui/src/api/chatApi.ts b/ui/src/api/chatApi.ts index d45a2de9..9baf72fc 100644 --- a/ui/src/api/chatApi.ts +++ b/ui/src/api/chatApi.ts @@ -331,10 +331,10 @@ export interface ChatMutationResponse { text?: string; response_id?: string; error?: string; - event?: ToolEventResponse; + event?: ChatEvent; } -export interface ToolEventResponse { +export interface ChatEvent { type: string; name: string; data?: string; @@ -378,7 +378,7 @@ const canceledChatMessage = (variables: ChatMutationRequest) => { interface StreamingChatCallbacks { onChunk: (msg: string) => void; - onEvent: (event: ToolEventResponse) => void; + onEvent: (event: ChatEvent) => void; getController?: (ctrl: AbortController) => void; } @@ -522,7 +522,7 @@ export const useStreamingChatMutation = ({ const streamChatMutation = async ( request: ChatMutationRequest, onChunk: (chunk: string) => void, - onEvent: (event: ToolEventResponse) => void, + onEvent: (event: ChatEvent) => void, onError: (error: string) => void, getController?: (ctrl: AbortController) => void, ): Promise => { @@ -598,9 +598,9 @@ const streamChatMutation = async ( }; export const getOnEvent = ( - setStreamedEvent: Dispatch>, + setStreamedEvent: Dispatch>, ) => { - return (event: ToolEventResponse) => { + return (event: ChatEvent) => { if (event.type === "done") { setStreamedEvent([]); } else { diff --git a/ui/src/pages/RagChatTab/ChatLayout.tsx b/ui/src/pages/RagChatTab/ChatLayout.tsx index 94e39de6..bf90ed3c 100644 --- a/ui/src/pages/RagChatTab/ChatLayout.tsx +++ b/ui/src/pages/RagChatTab/ChatLayout.tsx @@ -43,7 +43,7 @@ import { Outlet, useParams } from "@tanstack/react-router"; import { useMemo, useState } from "react"; import { ChatMessageType, - ToolEventResponse, + ChatEvent, useChatHistoryQuery, } from "src/api/chatApi.ts"; import { RagChatContext } from "pages/RagChatTab/State/RagChatContext.tsx"; @@ -78,7 +78,7 @@ function ChatLayout() { useGetDataSourcesForProject(+projectId); const [excludeKnowledgeBase, setExcludeKnowledgeBase] = useState(false); const [streamedChat, setStreamedChat] = useState(""); - const [streamedEvent, setStreamedEvent] = useState([]); + const [streamedEvent, setStreamedEvent] = useState([]); const [streamedAbortController, setStreamedAbortController] = useState(); const { diff --git a/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/ChatMessageBody.tsx b/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/ChatMessageBody.tsx index e0bcfc7d..0a4b8a32 100644 --- a/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/ChatMessageBody.tsx +++ b/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/ChatMessageBody.tsx @@ -36,7 +36,7 @@ * DATA. */ -import { ChatMessageType, ToolEventResponse } from "src/api/chatApi.ts"; +import { ChatMessageType, ChatEvent } from "src/api/chatApi.ts"; import UserQuestion from "pages/RagChatTab/ChatOutput/ChatMessages/UserQuestion.tsx"; import { Divider, Flex, Typography } from "antd"; import Images from "src/components/images/Images.ts"; @@ -53,7 +53,7 @@ export const ChatMessageBody = ({ streamedEvents, }: { data: ChatMessageType; - streamedEvents?: ToolEventResponse[]; + streamedEvents?: ChatEvent[]; }) => { return (
diff --git a/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/StreamedEvents.tsx b/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/StreamedEvents.tsx index 1c099d37..801620eb 100644 --- a/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/StreamedEvents.tsx +++ b/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/StreamedEvents.tsx @@ -36,13 +36,13 @@ * DATA. */ -import { ToolEventResponse } from "src/api/chatApi.ts"; +import { ChatEvent } from "src/api/chatApi.ts"; import { Button, Card, Flex, Spin, Typography } from "antd"; import { format } from "date-fns"; import { useState } from "react"; import { MinusOutlined, PlusOutlined } from "@ant-design/icons"; -const StreamedEvent = ({ event }: { event: ToolEventResponse }) => { +const StreamedEvent = ({ event }: { event: ChatEvent }) => { return ( @@ -76,7 +76,7 @@ const StreamedEvent = ({ event }: { event: ToolEventResponse }) => { const StreamedEvents = ({ streamedEvents, }: { - streamedEvents?: ToolEventResponse[]; + streamedEvents?: ChatEvent[]; }) => { const [collapsed, setCollapsed] = useState(true); diff --git a/ui/src/pages/RagChatTab/State/RagChatContext.tsx b/ui/src/pages/RagChatTab/State/RagChatContext.tsx index 5b80dd2d..eb9a43e1 100644 --- a/ui/src/pages/RagChatTab/State/RagChatContext.tsx +++ b/ui/src/pages/RagChatTab/State/RagChatContext.tsx @@ -40,7 +40,7 @@ import { createContext, Dispatch, SetStateAction } from "react"; import { ChatHistoryResponse, ChatMessageType, - ToolEventResponse, + ChatEvent, } from "src/api/chatApi.ts"; import { Session } from "src/api/sessionApi.ts"; import { DataSourceType } from "src/api/dataSourceApi.ts"; @@ -64,10 +64,7 @@ export interface RagChatContextType { >; }; streamedChatState: [string, Dispatch>]; - streamedEventState: [ - ToolEventResponse[], - Dispatch>, - ]; + streamedEventState: [ChatEvent[], Dispatch>]; streamedAbortControllerState: [ AbortController | undefined, Dispatch>, From 98cd41fb1eb7d02c70b27456f11b65f9080e3d61 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 13 Jun 2025 09:39:45 -0600 Subject: [PATCH 04/29] wip lastFile:llm-service/app/services/query/querier.py --- .../query/agents/tool_calling_querier.py | 52 +++++++++++-------- llm-service/app/services/query/querier.py | 1 + 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 7e751aca..96f732f1 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -38,6 +38,7 @@ import asyncio import logging import os +from queue import Queue from typing import Optional, Generator, AsyncGenerator, Callable, cast, Any import opik @@ -46,7 +47,7 @@ FunctionAgent, AgentStream, ToolCall, - ToolCallResult, + ToolCallResult, AgentOutput, AgentInput, ) from llama_index.core.base.llms.types import ChatMessage, MessageRole, ChatResponse from llama_index.core.chat_engine.types import StreamingAgentChatResponse @@ -57,14 +58,15 @@ from app.ai.indexing.summary_indexer import SummaryIndexer from app.services.metadata_apis.session_metadata_api import Session -from app.services.query.agents.agent_tools.mcp import get_llama_index_tools -from app.services.query.chat_engine import ( - FlexibleContextChatEngine, -) from app.services.query.agents.agent_tools.date import DateTool +from app.services.query.agents.agent_tools.mcp import get_llama_index_tools from app.services.query.agents.agent_tools.retriever import ( build_retriever_tool, ) +from app.services.query.chat_engine import ( + FlexibleContextChatEngine, +) +from app.services.query.chat_events import ChatEvent if os.environ.get("ENABLE_OPIK") == "True": opik.configure( @@ -146,6 +148,7 @@ def stream_chat( chat_messages: list[ChatMessage], session: Session, data_source_summaries: dict[int, str], + chat_event_queue: Queue[ChatEvent], ) -> StreamingAgentChatResponse: mcp_tools: list[BaseTool] = [] if session.query_configuration and session.query_configuration.selected_tools: @@ -171,20 +174,13 @@ def stream_chat( chat_messages, enhanced_query, llm, tools ) else: - gen, source_nodes = _run_non_openai_streamer( - chat_messages, enhanced_query, llm, tools - ) + gen, source_nodes = _run_non_openai_streamer(chat_messages, enhanced_query, llm, tools, chat_event_queue) return StreamingAgentChatResponse(chat_stream=gen, source_nodes=source_nodes) -def _run_non_openai_streamer( - chat_messages: list[ChatMessage], - enhanced_query: str, - llm: FunctionCallingLLM, - tools: list[BaseTool], - verbose: bool = True, -) -> tuple[Generator[ChatResponse, None, None], list[NodeWithScore]]: +def _run_non_openai_streamer(chat_messages: list[ChatMessage], enhanced_query: str, llm: FunctionCallingLLM, + tools: list[BaseTool], chat_event_queue: Queue[ChatEvent], verbose: bool = True) -> tuple[Generator[ChatResponse, None, None], list[NodeWithScore]]: agent = FunctionAgent( tools=cast(list[BaseTool | Callable[[], Any]], tools), llm=llm, @@ -197,14 +193,18 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: handler = agent.run(user_msg=enhanced_query, chat_history=chat_messages) async for event in handler.stream_events(): if isinstance(event, ToolCall): + data = f"Calling function: {event.tool_name} with args: {event.tool_kwargs}" + chat_event_queue.put(ChatEvent(type="tool_call", name=event.tool_name, data=data)) if verbose and not isinstance(event, ToolCallResult): logger.info("=== Calling Function ===") logger.info( - f"Calling function: {event.tool_name} with args: {event.tool_kwargs}" + data ) if isinstance(event, ToolCallResult): + data = f"Got output: {event.tool_output!s}" + chat_event_queue.put(ChatEvent(type="tool_result", name=event.tool_name, data=data)) if verbose: - logger.info(f"Got output: {event.tool_output!s}") + logger.info(data) logger.info("========================") if ( event.tool_output.raw_output @@ -215,6 +215,18 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ) ): source_nodes.extend(event.tool_output.raw_output) + if isinstance(event, AgentOutput): + data = f"Agent {event.current_agent_name} response: {event.response!s}" + chat_event_queue.put(ChatEvent(type="agent_output", name=event.tool_name, data=data)) + if verbose: + logger.info("=== LLM Response ===") + logger.info( + f"{str(event.response) if event.response else 'No content'}" + ) + logger.info("========================") + if isinstance(event, AgentInput): + data = f"Agent {event.current_agent_name} response: {event.response!s}" + chat_event_queue.put(ChatEvent(type="agent_input", name=event.tool_name, data=data)) if isinstance(event, AgentStream): if event.response: # Yield the delta response as a ChatResponse @@ -247,12 +259,6 @@ async def collect() -> list[ChatResponse]: ) for item in asyncio.run(collect()): yield item - if verbose: - logger.info("=== LLM Response ===") - logger.info( - f"{item.message.content.strip() if item.message.content else 'No content'}" - ) - logger.info("========================") return gen(), source_nodes diff --git a/llm-service/app/services/query/querier.py b/llm-service/app/services/query/querier.py index c230e0db..07260c73 100644 --- a/llm-service/app/services/query/querier.py +++ b/llm-service/app/services/query/querier.py @@ -95,6 +95,7 @@ def streaming_query( chat_messages, session, data_source_summaries, + chat_event_queue ) chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) return chat_response From d1e465736e412121f571bf8d074761a1919817b3 Mon Sep 17 00:00:00 2001 From: jwatson Date: Fri, 13 Jun 2025 08:50:13 -0700 Subject: [PATCH 05/29] event testing lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- .../app/services/query/agents/agent_tools/date.py | 2 +- .../services/query/agents/tool_calling_querier.py | 14 +++----------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/llm-service/app/services/query/agents/agent_tools/date.py b/llm-service/app/services/query/agents/agent_tools/date.py index 7ef69a65..6cf91c04 100644 --- a/llm-service/app/services/query/agents/agent_tools/date.py +++ b/llm-service/app/services/query/agents/agent_tools/date.py @@ -46,7 +46,7 @@ class DateToolInput(BaseModel): """ Input schema for the DateTool """ - input_: None = None + input: None = None class DateTool(BaseTool): """ diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 96f732f1..3c6b1b64 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -217,7 +217,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: source_nodes.extend(event.tool_output.raw_output) if isinstance(event, AgentOutput): data = f"Agent {event.current_agent_name} response: {event.response!s}" - chat_event_queue.put(ChatEvent(type="agent_output", name=event.tool_name, data=data)) + chat_event_queue.put(ChatEvent(type="agent_output", name=event.current_agent_name, data=data)) if verbose: logger.info("=== LLM Response ===") logger.info( @@ -225,8 +225,8 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ) logger.info("========================") if isinstance(event, AgentInput): - data = f"Agent {event.current_agent_name} response: {event.response!s}" - chat_event_queue.put(ChatEvent(type="agent_input", name=event.tool_name, data=data)) + data = f"Agent {event.current_agent_name} started execution with input: {event.input!s}" + chat_event_queue.put(ChatEvent(type="agent_input", name=event.current_agent_name, data=data)) if isinstance(event, AgentStream): if event.response: # Yield the delta response as a ChatResponse @@ -249,14 +249,6 @@ async def collect() -> list[ChatResponse]: results.append(chunk) return results - item = ChatResponse( - message=ChatMessage(role=MessageRole.ASSISTANT, content=""), - delta="", - raw=None, - additional_kwargs={ - "tool_calls": [], - }, - ) for item in asyncio.run(collect()): yield item From 2f080a8735b666568ba7f948b9581222fb470449 Mon Sep 17 00:00:00 2001 From: Baasit Sharief Date: Fri, 13 Jun 2025 09:04:18 -0700 Subject: [PATCH 06/29] wip event queue lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- .../app/routers/index/sessions/__init__.py | 7 +-- .../app/services/chat/streaming_chat.py | 10 +++-- .../query/agents/tool_calling_querier.py | 44 ++++++++++++++----- llm-service/app/services/query/chat_events.py | 8 ++-- llm-service/app/services/query/querier.py | 3 +- 5 files changed, 46 insertions(+), 26 deletions(-) diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index aef19eb5..257d7f18 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -258,7 +258,7 @@ def stream_chat_completion( session = session_metadata_api.get_session(session_id, user_name=origin_remote_user) configuration = request.configuration or RagPredictConfiguration() - tool_events_queue: queue.Queue[ChatEvent] = queue.Queue() + chat_event_queue: queue.Queue[ChatEvent] = queue.Queue() # Create a cancellation event to signal when the client disconnects cancel_event = threading.Event() @@ -276,7 +276,8 @@ def tools_callback(chat_future: Future[Any]) -> Generator[str, None, None]: raise e try: - event_data = tool_events_queue.get(block=True, timeout=1.0) + event_data = chat_event_queue.get(block=True, timeout=1.0) + print(event_data) if event_data.type == poison_pill: break event_json = json.dumps({"event": event_data.model_dump()}) @@ -303,7 +304,7 @@ def generate_stream() -> Generator[str, None, None]: query=request.query, configuration=configuration, user_name=origin_remote_user, - tool_events_queue=tool_events_queue, + chat_event_queue=chat_event_queue, ) # Yield from tools_callback, which will check for cancellation diff --git a/llm-service/app/services/chat/streaming_chat.py b/llm-service/app/services/chat/streaming_chat.py index 7b7b813a..05589456 100644 --- a/llm-service/app/services/chat/streaming_chat.py +++ b/llm-service/app/services/chat/streaming_chat.py @@ -76,7 +76,7 @@ def stream_chat( query: str, configuration: RagPredictConfiguration, user_name: Optional[str], - tool_events_queue: Queue[ChatEvent], + chat_event_queue: Queue[ChatEvent], ) -> Generator[ChatResponse, None, None]: query_configuration = QueryConfiguration( top_k=session.response_chunks, @@ -100,18 +100,20 @@ def stream_chat( len(session.data_source_ids) == 0 or total_data_sources_size == 0 ): # put a poison pill in the queue to stop the tool events stream - tool_events_queue.put(ChatEvent(type=poison_pill, name="no-op")) + chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) return _stream_direct_llm_chat(session, response_id, query, user_name) condensed_question, streaming_chat_response = build_streamer( - tool_events_queue, query, query_configuration, session + chat_event_queue, query, query_configuration, session ) + chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) return _run_streaming_chat( session, response_id, query, query_configuration, user_name, + chat_event_queue, condensed_question=condensed_question, streaming_chat_response=streaming_chat_response, ) @@ -123,11 +125,11 @@ def _run_streaming_chat( query: str, query_configuration: QueryConfiguration, user_name: Optional[str], + chat_event_queue: Queue[ChatEvent], streaming_chat_response: StreamingAgentChatResponse, condensed_question: Optional[str] = None, ) -> Generator[ChatResponse, None, None]: response: ChatResponse = ChatResponse(message=ChatMessage(content=query)) - if streaming_chat_response.chat_stream: for response in streaming_chat_response.chat_stream: response.additional_kwargs["response_id"] = response_id diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 3c6b1b64..cce61e7f 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -47,7 +47,9 @@ FunctionAgent, AgentStream, ToolCall, - ToolCallResult, AgentOutput, AgentInput, + ToolCallResult, + AgentOutput, + AgentInput, ) from llama_index.core.base.llms.types import ChatMessage, MessageRole, ChatResponse from llama_index.core.chat_engine.types import StreamingAgentChatResponse @@ -148,7 +150,7 @@ def stream_chat( chat_messages: list[ChatMessage], session: Session, data_source_summaries: dict[int, str], - chat_event_queue: Queue[ChatEvent], + chat_event_queue: Queue[ChatEvent], ) -> StreamingAgentChatResponse: mcp_tools: list[BaseTool] = [] if session.query_configuration and session.query_configuration.selected_tools: @@ -174,13 +176,21 @@ def stream_chat( chat_messages, enhanced_query, llm, tools ) else: - gen, source_nodes = _run_non_openai_streamer(chat_messages, enhanced_query, llm, tools, chat_event_queue) + gen, source_nodes = _run_non_openai_streamer( + chat_messages, enhanced_query, llm, tools, chat_event_queue + ) return StreamingAgentChatResponse(chat_stream=gen, source_nodes=source_nodes) -def _run_non_openai_streamer(chat_messages: list[ChatMessage], enhanced_query: str, llm: FunctionCallingLLM, - tools: list[BaseTool], chat_event_queue: Queue[ChatEvent], verbose: bool = True) -> tuple[Generator[ChatResponse, None, None], list[NodeWithScore]]: +def _run_non_openai_streamer( + chat_messages: list[ChatMessage], + enhanced_query: str, + llm: FunctionCallingLLM, + tools: list[BaseTool], + chat_event_queue: Queue[ChatEvent], + verbose: bool = True, +) -> tuple[Generator[ChatResponse, None, None], list[NodeWithScore]]: agent = FunctionAgent( tools=cast(list[BaseTool | Callable[[], Any]], tools), llm=llm, @@ -194,15 +204,17 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: async for event in handler.stream_events(): if isinstance(event, ToolCall): data = f"Calling function: {event.tool_name} with args: {event.tool_kwargs}" - chat_event_queue.put(ChatEvent(type="tool_call", name=event.tool_name, data=data)) + chat_event_queue.put( + ChatEvent(type="tool_call", name=event.tool_name, data=data) + ) if verbose and not isinstance(event, ToolCallResult): logger.info("=== Calling Function ===") - logger.info( - data - ) + logger.info(data) if isinstance(event, ToolCallResult): data = f"Got output: {event.tool_output!s}" - chat_event_queue.put(ChatEvent(type="tool_result", name=event.tool_name, data=data)) + chat_event_queue.put( + ChatEvent(type="tool_result", name=event.tool_name, data=data) + ) if verbose: logger.info(data) logger.info("========================") @@ -217,7 +229,11 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: source_nodes.extend(event.tool_output.raw_output) if isinstance(event, AgentOutput): data = f"Agent {event.current_agent_name} response: {event.response!s}" - chat_event_queue.put(ChatEvent(type="agent_output", name=event.current_agent_name, data=data)) + chat_event_queue.put( + ChatEvent( + type="agent_output", name=event.current_agent_name, data=data + ) + ) if verbose: logger.info("=== LLM Response ===") logger.info( @@ -226,7 +242,11 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: logger.info("========================") if isinstance(event, AgentInput): data = f"Agent {event.current_agent_name} started execution with input: {event.input!s}" - chat_event_queue.put(ChatEvent(type="agent_input", name=event.current_agent_name, data=data)) + chat_event_queue.put( + ChatEvent( + type="agent_input", name=event.current_agent_name, data=data + ) + ) if isinstance(event, AgentStream): if event.response: # Yield the delta response as a ChatResponse diff --git a/llm-service/app/services/query/chat_events.py b/llm-service/app/services/query/chat_events.py index 895554b2..68286f5c 100644 --- a/llm-service/app/services/query/chat_events.py +++ b/llm-service/app/services/query/chat_events.py @@ -50,8 +50,6 @@ class ChatEvent(BaseModel): timestamp: float = time.time() -def step_callback( - output: Any, agent: str, tool_events_queue: Queue[ChatEvent] -) -> None: - # todo: hook this up - return None +def step_callback(output: Any, agent: str, tool_events_queue: Queue[ChatEvent]) -> None: + + return diff --git a/llm-service/app/services/query/querier.py b/llm-service/app/services/query/querier.py index 07260c73..87721b4e 100644 --- a/llm-service/app/services/query/querier.py +++ b/llm-service/app/services/query/querier.py @@ -95,9 +95,8 @@ def streaming_query( chat_messages, session, data_source_summaries, - chat_event_queue + chat_event_queue, ) - chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) return chat_response if not chat_engine: raise HTTPException( From 61a6e65030cab83e2694ab7544deff1b5ea8fb18 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 13 Jun 2025 10:15:51 -0600 Subject: [PATCH 07/29] moving poison pill around lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- llm-service/app/services/chat/streaming_chat.py | 2 +- llm-service/app/services/query/agents/tool_calling_querier.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/llm-service/app/services/chat/streaming_chat.py b/llm-service/app/services/chat/streaming_chat.py index 05589456..c5829d29 100644 --- a/llm-service/app/services/chat/streaming_chat.py +++ b/llm-service/app/services/chat/streaming_chat.py @@ -106,7 +106,7 @@ def stream_chat( condensed_question, streaming_chat_response = build_streamer( chat_event_queue, query, query_configuration, session ) - chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) + return _run_streaming_chat( session, response_id, diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index cce61e7f..852bd060 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -179,7 +179,6 @@ def stream_chat( gen, source_nodes = _run_non_openai_streamer( chat_messages, enhanced_query, llm, tools, chat_event_queue ) - return StreamingAgentChatResponse(chat_stream=gen, source_nodes=source_nodes) @@ -249,6 +248,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ) if isinstance(event, AgentStream): if event.response: + chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) # Yield the delta response as a ChatResponse yield ChatResponse( message=ChatMessage( From d0f8232a89352b0a68724c5cf4f0ba670c6804eb Mon Sep 17 00:00:00 2001 From: jwatson Date: Fri, 13 Jun 2025 09:30:22 -0700 Subject: [PATCH 08/29] event wip lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- llm-service/app/services/chat/streaming_chat.py | 5 ++--- .../app/services/query/agents/tool_calling_querier.py | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/llm-service/app/services/chat/streaming_chat.py b/llm-service/app/services/chat/streaming_chat.py index c5829d29..dc27444c 100644 --- a/llm-service/app/services/chat/streaming_chat.py +++ b/llm-service/app/services/chat/streaming_chat.py @@ -106,14 +106,14 @@ def stream_chat( condensed_question, streaming_chat_response = build_streamer( chat_event_queue, query, query_configuration, session ) - + chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) + return _run_streaming_chat( session, response_id, query, query_configuration, user_name, - chat_event_queue, condensed_question=condensed_question, streaming_chat_response=streaming_chat_response, ) @@ -125,7 +125,6 @@ def _run_streaming_chat( query: str, query_configuration: QueryConfiguration, user_name: Optional[str], - chat_event_queue: Queue[ChatEvent], streaming_chat_response: StreamingAgentChatResponse, condensed_question: Optional[str] = None, ) -> Generator[ChatResponse, None, None]: diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 852bd060..a0b1ccef 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -248,7 +248,6 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ) if isinstance(event, AgentStream): if event.response: - chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) # Yield the delta response as a ChatResponse yield ChatResponse( message=ChatMessage( From 2bd1086e9490aa64de0764f0d15f6693b068b187 Mon Sep 17 00:00:00 2001 From: Baasit Sharief Date: Fri, 13 Jun 2025 09:44:01 -0700 Subject: [PATCH 09/29] WIP event queue lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- .../query/agents/tool_calling_querier.py | 78 ++++++++++++++++--- 1 file changed, 69 insertions(+), 9 deletions(-) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index a0b1ccef..6c04ca7f 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -50,6 +50,7 @@ ToolCallResult, AgentOutput, AgentInput, + AgentSetup, ) from llama_index.core.base.llms.types import ChatMessage, MessageRole, ChatResponse from llama_index.core.chat_engine.types import StreamingAgentChatResponse @@ -201,14 +202,56 @@ def _run_non_openai_streamer( async def agen() -> AsyncGenerator[ChatResponse, None]: handler = agent.run(user_msg=enhanced_query, chat_history=chat_messages) async for event in handler.stream_events(): + if isinstance(event, AgentSetup): + data = f"Agent {event.current_agent_name} setup with input: {event.input!s}" + if verbose: + logger.info("=== Agent Setup ===") + logger.info(data) + logger.info("========================") + yield ChatResponse( + message=ChatMessage( + role=MessageRole.FUNCTION, + content=data, + ), + delta="", + raw="", + additional_kwargs={ + "tool_calls": [], + }, + ) + if isinstance(event, AgentInput): + data = f"Agent {event.current_agent_name} started with input: {event.input!s}" + if verbose: + logger.info("=== Agent Input ===") + logger.info(data) + logger.info("========================") + yield ChatResponse( + message=ChatMessage( + role=MessageRole.FUNCTION, + content=data, + ), + delta="", + raw="", + additional_kwargs={ + "tool_calls": [], + }, + ) if isinstance(event, ToolCall): data = f"Calling function: {event.tool_name} with args: {event.tool_kwargs}" - chat_event_queue.put( - ChatEvent(type="tool_call", name=event.tool_name, data=data) - ) if verbose and not isinstance(event, ToolCallResult): logger.info("=== Calling Function ===") logger.info(data) + yield ChatResponse( + message=ChatMessage( + role=MessageRole.TOOL, + content="", + ), + delta="", + raw="", + additional_kwargs={ + "tool_calls": [event], + }, + ) if isinstance(event, ToolCallResult): data = f"Got output: {event.tool_output!s}" chat_event_queue.put( @@ -226,6 +269,17 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ) ): source_nodes.extend(event.tool_output.raw_output) + yield ChatResponse( + message=ChatMessage( + role=MessageRole.TOOL, + content="", + ), + delta="", + raw="", + additional_kwargs={ + "tool_calls": [event], + }, + ) if isinstance(event, AgentOutput): data = f"Agent {event.current_agent_name} response: {event.response!s}" chat_event_queue.put( @@ -239,12 +293,18 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: f"{str(event.response) if event.response else 'No content'}" ) logger.info("========================") - if isinstance(event, AgentInput): - data = f"Agent {event.current_agent_name} started execution with input: {event.input!s}" - chat_event_queue.put( - ChatEvent( - type="agent_input", name=event.current_agent_name, data=data - ) + yield ChatResponse( + message=ChatMessage( + role=MessageRole.DEVELOPER, + content=( + event.response.content if event.response.content else "" + ), + ), + delta="", + raw=event.raw, + additional_kwargs={ + "tool_calls": event.tool_calls, + }, ) if isinstance(event, AgentStream): if event.response: From aae5433f1d992d001d91776a77c88c8a31a665e5 Mon Sep 17 00:00:00 2001 From: Baasit Sharief Date: Fri, 13 Jun 2025 09:44:43 -0700 Subject: [PATCH 10/29] WIP event queue lastFile:llm-service/app/services/query/chat_events.py --- .../query/agents/tool_calling_querier.py | 27 +++++++++++++++++++ llm-service/app/services/query/chat_events.py | 5 ++++ 2 files changed, 32 insertions(+) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 6c04ca7f..80cda9b7 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -50,7 +50,10 @@ ToolCallResult, AgentOutput, AgentInput, +<<<<<<< Updated upstream AgentSetup, +======= +>>>>>>> Stashed changes ) from llama_index.core.base.llms.types import ChatMessage, MessageRole, ChatResponse from llama_index.core.chat_engine.types import StreamingAgentChatResponse @@ -180,6 +183,10 @@ def stream_chat( gen, source_nodes = _run_non_openai_streamer( chat_messages, enhanced_query, llm, tools, chat_event_queue ) +<<<<<<< Updated upstream +======= + +>>>>>>> Stashed changes return StreamingAgentChatResponse(chat_stream=gen, source_nodes=source_nodes) @@ -238,6 +245,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ) if isinstance(event, ToolCall): data = f"Calling function: {event.tool_name} with args: {event.tool_kwargs}" +<<<<<<< Updated upstream if verbose and not isinstance(event, ToolCallResult): logger.info("=== Calling Function ===") logger.info(data) @@ -252,6 +260,14 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: "tool_calls": [event], }, ) +======= + chat_event_queue.put( + ChatEvent(type="tool_call", name=event.tool_name, data=data) + ) + if verbose and not isinstance(event, ToolCallResult): + logger.info("=== Calling Function ===") + logger.info(data) +>>>>>>> Stashed changes if isinstance(event, ToolCallResult): data = f"Got output: {event.tool_output!s}" chat_event_queue.put( @@ -283,9 +299,13 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: if isinstance(event, AgentOutput): data = f"Agent {event.current_agent_name} response: {event.response!s}" chat_event_queue.put( +<<<<<<< Updated upstream ChatEvent( type="agent_output", name=event.current_agent_name, data=data ) +======= + ChatEvent(type="agent_output", name=event.tool_name, data=data) +>>>>>>> Stashed changes ) if verbose: logger.info("=== LLM Response ===") @@ -293,6 +313,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: f"{str(event.response) if event.response else 'No content'}" ) logger.info("========================") +<<<<<<< Updated upstream yield ChatResponse( message=ChatMessage( role=MessageRole.DEVELOPER, @@ -305,6 +326,12 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: additional_kwargs={ "tool_calls": event.tool_calls, }, +======= + if isinstance(event, AgentInput): + data = f"Agent {event.current_agent_name} started execution: {event.input!s}" + chat_event_queue.put( + ChatEvent(type="agent_input", name=event.tool_name, data=data) +>>>>>>> Stashed changes ) if isinstance(event, AgentStream): if event.response: diff --git a/llm-service/app/services/query/chat_events.py b/llm-service/app/services/query/chat_events.py index 68286f5c..338087de 100644 --- a/llm-service/app/services/query/chat_events.py +++ b/llm-service/app/services/query/chat_events.py @@ -51,5 +51,10 @@ class ChatEvent(BaseModel): def step_callback(output: Any, agent: str, tool_events_queue: Queue[ChatEvent]) -> None: +<<<<<<< Updated upstream return +======= + # todo: hook this up + return None +>>>>>>> Stashed changes From 6a3162c47644c2b739fdbbd61c4c3d60a1d53a75 Mon Sep 17 00:00:00 2001 From: Baasit Sharief Date: Fri, 13 Jun 2025 09:47:38 -0700 Subject: [PATCH 11/29] WIP even queue lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- .../query/agents/tool_calling_querier.py | 34 ++----------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 80cda9b7..14c714b9 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -50,10 +50,7 @@ ToolCallResult, AgentOutput, AgentInput, -<<<<<<< Updated upstream AgentSetup, -======= ->>>>>>> Stashed changes ) from llama_index.core.base.llms.types import ChatMessage, MessageRole, ChatResponse from llama_index.core.chat_engine.types import StreamingAgentChatResponse @@ -183,10 +180,7 @@ def stream_chat( gen, source_nodes = _run_non_openai_streamer( chat_messages, enhanced_query, llm, tools, chat_event_queue ) -<<<<<<< Updated upstream -======= ->>>>>>> Stashed changes return StreamingAgentChatResponse(chat_stream=gen, source_nodes=source_nodes) @@ -245,10 +239,10 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ) if isinstance(event, ToolCall): data = f"Calling function: {event.tool_name} with args: {event.tool_kwargs}" -<<<<<<< Updated upstream - if verbose and not isinstance(event, ToolCallResult): + if verbose: logger.info("=== Calling Function ===") logger.info(data) + if not isinstance(event, ToolCallResult): yield ChatResponse( message=ChatMessage( role=MessageRole.TOOL, @@ -260,14 +254,6 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: "tool_calls": [event], }, ) -======= - chat_event_queue.put( - ChatEvent(type="tool_call", name=event.tool_name, data=data) - ) - if verbose and not isinstance(event, ToolCallResult): - logger.info("=== Calling Function ===") - logger.info(data) ->>>>>>> Stashed changes if isinstance(event, ToolCallResult): data = f"Got output: {event.tool_output!s}" chat_event_queue.put( @@ -298,22 +284,12 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ) if isinstance(event, AgentOutput): data = f"Agent {event.current_agent_name} response: {event.response!s}" - chat_event_queue.put( -<<<<<<< Updated upstream - ChatEvent( - type="agent_output", name=event.current_agent_name, data=data - ) -======= - ChatEvent(type="agent_output", name=event.tool_name, data=data) ->>>>>>> Stashed changes - ) if verbose: logger.info("=== LLM Response ===") logger.info( f"{str(event.response) if event.response else 'No content'}" ) logger.info("========================") -<<<<<<< Updated upstream yield ChatResponse( message=ChatMessage( role=MessageRole.DEVELOPER, @@ -326,12 +302,6 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: additional_kwargs={ "tool_calls": event.tool_calls, }, -======= - if isinstance(event, AgentInput): - data = f"Agent {event.current_agent_name} started execution: {event.input!s}" - chat_event_queue.put( - ChatEvent(type="agent_input", name=event.tool_name, data=data) ->>>>>>> Stashed changes ) if isinstance(event, AgentStream): if event.response: From 308598211acb3a204f8c426282b380c0dfadd85b Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 13 Jun 2025 10:58:56 -0600 Subject: [PATCH 12/29] wip on chat events lastFile:llm-service/app/routers/index/sessions/__init__.py --- .../app/routers/index/sessions/__init__.py | 3 --- .../app/services/chat/streaming_chat.py | 5 +--- .../query/agents/tool_calling_querier.py | 25 +++++++++---------- llm-service/app/services/query/chat_events.py | 5 ---- llm-service/app/services/query/querier.py | 2 -- 5 files changed, 13 insertions(+), 27 deletions(-) diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index 257d7f18..836eb19d 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -307,9 +307,6 @@ def generate_stream() -> Generator[str, None, None]: chat_event_queue=chat_event_queue, ) - # Yield from tools_callback, which will check for cancellation - yield from tools_callback(future) - # If we get here and the cancel_event is set, the client has disconnected if cancel_event.is_set(): logger.info("Client disconnected, not processing results") diff --git a/llm-service/app/services/chat/streaming_chat.py b/llm-service/app/services/chat/streaming_chat.py index dc27444c..709fe95f 100644 --- a/llm-service/app/services/chat/streaming_chat.py +++ b/llm-service/app/services/chat/streaming_chat.py @@ -59,7 +59,6 @@ from app.services.metadata_apis.session_metadata_api import Session from app.services.mlflow import record_direct_llm_mlflow_run from app.services.query import querier -from app.services.query.agents.tool_calling_querier import poison_pill from app.services.query.chat_engine import ( FlexibleContextChatEngine, build_flexible_chat_engine, @@ -100,14 +99,12 @@ def stream_chat( len(session.data_source_ids) == 0 or total_data_sources_size == 0 ): # put a poison pill in the queue to stop the tool events stream - chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) return _stream_direct_llm_chat(session, response_id, query, user_name) condensed_question, streaming_chat_response = build_streamer( chat_event_queue, query, query_configuration, session ) - chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) - + return _run_streaming_chat( session, response_id, diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 14c714b9..a3ee2beb 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -237,23 +237,22 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: "tool_calls": [], }, ) - if isinstance(event, ToolCall): + if isinstance(event, ToolCall) and not isinstance(event, ToolCallResult): data = f"Calling function: {event.tool_name} with args: {event.tool_kwargs}" if verbose: logger.info("=== Calling Function ===") logger.info(data) - if not isinstance(event, ToolCallResult): - yield ChatResponse( - message=ChatMessage( - role=MessageRole.TOOL, - content="", - ), - delta="", - raw="", - additional_kwargs={ - "tool_calls": [event], - }, - ) + yield ChatResponse( + message=ChatMessage( + role=MessageRole.TOOL, + content="", + ), + delta="", + raw="", + additional_kwargs={ + "tool_calls": [event], + }, + ) if isinstance(event, ToolCallResult): data = f"Got output: {event.tool_output!s}" chat_event_queue.put( diff --git a/llm-service/app/services/query/chat_events.py b/llm-service/app/services/query/chat_events.py index 338087de..dfbb6ecd 100644 --- a/llm-service/app/services/query/chat_events.py +++ b/llm-service/app/services/query/chat_events.py @@ -51,10 +51,5 @@ class ChatEvent(BaseModel): def step_callback(output: Any, agent: str, tool_events_queue: Queue[ChatEvent]) -> None: -<<<<<<< Updated upstream - - return -======= # todo: hook this up return None ->>>>>>> Stashed changes diff --git a/llm-service/app/services/query/querier.py b/llm-service/app/services/query/querier.py index 87721b4e..d5f9ae9a 100644 --- a/llm-service/app/services/query/querier.py +++ b/llm-service/app/services/query/querier.py @@ -42,7 +42,6 @@ from .agents.tool_calling_querier import ( should_use_retrieval, stream_chat, - poison_pill, ) from .chat_events import ChatEvent from .flexible_retriever import FlexibleRetriever @@ -106,7 +105,6 @@ def streaming_query( try: chat_response = chat_engine.stream_chat(query_str, chat_messages) - chat_event_queue.put(ChatEvent(type=poison_pill, name="no-op")) logger.debug("query response received from chat engine") except botocore.exceptions.ClientError as error: logger.warning(error.response) From 8a0466af4a6449637a8a36e3d17e3cc61c97f6b7 Mon Sep 17 00:00:00 2001 From: jwatson Date: Fri, 13 Jun 2025 10:12:18 -0700 Subject: [PATCH 13/29] work in progress on chat events lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- llm-service/app/routers/index/sessions/__init__.py | 7 ++++++- .../app/services/query/agents/tool_calling_querier.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index 836eb19d..2ce49430 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -46,6 +46,7 @@ from fastapi import APIRouter, Header, HTTPException from fastapi.responses import StreamingResponse +from llama_index.core.base.llms.types import ChatResponse, MessageRole from pydantic import BaseModel from starlette.responses import ContentStream from starlette.types import Receive @@ -319,7 +320,11 @@ def generate_stream() -> Generator[str, None, None]: if cancel_event.is_set(): logger.info("Client disconnected during result processing") break - + if isinstance(response, ChatResponse): + tool_call = response.additional_kwargs.get("chat_event") + event_json = json.dumps({"event": tool_call.model_dump()}) + yield f"data: {event_json}\n\n" + continue # send an initial message to let the client know the response stream is starting if first_message: done = ChatEvent(type="done", name="done", timestamp=time.time()) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index a3ee2beb..ed014867 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -250,7 +250,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: delta="", raw="", additional_kwargs={ - "tool_calls": [event], + "chat_event": ChatEvent(type="tool_call", name=event.tool_name, data=data), }, ) if isinstance(event, ToolCallResult): From 6706fb19a9793a694853a574b6e19bb0f8337723 Mon Sep 17 00:00:00 2001 From: Baasit Sharief Date: Fri, 13 Jun 2025 10:25:53 -0700 Subject: [PATCH 14/29] WIP event queue lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- .../app/routers/index/sessions/__init__.py | 11 +++--- .../query/agents/tool_calling_querier.py | 35 +++++++++++++------ 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index 2ce49430..a327e2d8 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -42,7 +42,7 @@ import threading import time from concurrent.futures import Future, ThreadPoolExecutor -from typing import Optional, Generator, Any +from typing import Optional, Generator, Any, cast from fastapi import APIRouter, Header, HTTPException from fastapi.responses import StreamingResponse @@ -315,14 +315,15 @@ def generate_stream() -> Generator[str, None, None]: first_message = True stream = future.result() - for response in stream: + for item in stream: + response = cast(ChatResponse, item) # Check for cancellation between each response if cancel_event.is_set(): logger.info("Client disconnected during result processing") break - if isinstance(response, ChatResponse): - tool_call = response.additional_kwargs.get("chat_event") - event_json = json.dumps({"event": tool_call.model_dump()}) + if "chat_event" in response.additional_kwargs: + chat_event = response.additional_kwargs.get("chat_event") + event_json = json.dumps({"event": chat_event.model_dump()}) yield f"data: {event_json}\n\n" continue # send an initial message to let the client know the response stream is starting diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index ed014867..c32e12b3 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -212,12 +212,16 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: yield ChatResponse( message=ChatMessage( role=MessageRole.FUNCTION, - content=data, + content="", ), delta="", raw="", additional_kwargs={ - "tool_calls": [], + "chat_event": ChatEvent( + type="agent_setup", + name=event.current_agent_name, + data=data, + ), }, ) if isinstance(event, AgentInput): @@ -234,7 +238,11 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: delta="", raw="", additional_kwargs={ - "tool_calls": [], + "chat_event": ChatEvent( + type="agent_input", + name=event.current_agent_name, + data=data, + ), }, ) if isinstance(event, ToolCall) and not isinstance(event, ToolCallResult): @@ -250,14 +258,13 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: delta="", raw="", additional_kwargs={ - "chat_event": ChatEvent(type="tool_call", name=event.tool_name, data=data), + "chat_event": ChatEvent( + type="tool_call", name=event.tool_name, data=data + ), }, ) if isinstance(event, ToolCallResult): data = f"Got output: {event.tool_output!s}" - chat_event_queue.put( - ChatEvent(type="tool_result", name=event.tool_name, data=data) - ) if verbose: logger.info(data) logger.info("========================") @@ -278,7 +285,11 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: delta="", raw="", additional_kwargs={ - "tool_calls": [event], + "chat_event": ChatEvent( + type="tool_result", + name=event.tool_name, + data=data, + ), }, ) if isinstance(event, AgentOutput): @@ -291,7 +302,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: logger.info("========================") yield ChatResponse( message=ChatMessage( - role=MessageRole.DEVELOPER, + role=MessageRole.TOOL, content=( event.response.content if event.response.content else "" ), @@ -299,7 +310,11 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: delta="", raw=event.raw, additional_kwargs={ - "tool_calls": event.tool_calls, + "chat_event": ChatEvent( + type="agent_response", + name=event.current_agent_name, + data=data, + ), }, ) if isinstance(event, AgentStream): From 5c15d97104d4669b35a83e7e1d9d74cfc471ca29 Mon Sep 17 00:00:00 2001 From: Michael Liu Date: Fri, 13 Jun 2025 10:36:57 -0700 Subject: [PATCH 15/29] drop databases lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- llm-service/app/services/query/agents/tool_calling_querier.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index c32e12b3..86bad008 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -204,7 +204,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: handler = agent.run(user_msg=enhanced_query, chat_history=chat_messages) async for event in handler.stream_events(): if isinstance(event, AgentSetup): - data = f"Agent {event.current_agent_name} setup with input: {event.input!s}" + data = f"Agent {event.current_agent_name} setup with input: {event.input[-1].content!s}" if verbose: logger.info("=== Agent Setup ===") logger.info(data) @@ -225,7 +225,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: }, ) if isinstance(event, AgentInput): - data = f"Agent {event.current_agent_name} started with input: {event.input!s}" + data = f"Agent {event.current_agent_name} started with input: {event.input[-1].content!s}" if verbose: logger.info("=== Agent Input ===") logger.info(data) From d1ea0c9f407c88c135e125046f7a57b3ddcc1644 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 13 Jun 2025 11:50:39 -0600 Subject: [PATCH 16/29] wip on openai streaming events lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- llm-service/app/services/query/agents/tool_calling_querier.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 86bad008..4b7878e5 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -362,6 +362,9 @@ def _openai_agent_streamer( message=enhanced_query, chat_history=chat_messages ) + completed_tasks = agent.get_completed_tasks() + print(f"Completed tasks: {completed_tasks}") + def gen() -> Generator[ChatResponse, None, None]: response = "" res = stream_chat_response.response_gen From 208390e2243f5cafda17f706503b1bad273aaa89 Mon Sep 17 00:00:00 2001 From: jwatson Date: Fri, 13 Jun 2025 11:06:33 -0700 Subject: [PATCH 17/29] send additional done after we're really done lastFile:llm-service/app/routers/index/sessions/__init__.py --- llm-service/app/routers/index/sessions/__init__.py | 5 ++++- .../app/services/query/agents/tool_calling_querier.py | 3 --- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index a327e2d8..4596b4ac 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -328,7 +328,7 @@ def generate_stream() -> Generator[str, None, None]: continue # send an initial message to let the client know the response stream is starting if first_message: - done = ChatEvent(type="done", name="done", timestamp=time.time()) + done = ChatEvent(type="done", name="agent_done", timestamp=time.time()) event_json = json.dumps({"event": done.model_dump()}) yield f"data: {event_json}\n\n" first_message = False @@ -337,6 +337,9 @@ def generate_stream() -> Generator[str, None, None]: yield f"data: {json_delta}\n\n" if not cancel_event.is_set() and response_id: + done = ChatEvent(type="done", name="chat_done", timestamp=time.time()) + event_json = json.dumps({"event": done.model_dump()}) + yield f"data: {event_json}\n\n" yield f'data: {{"response_id" : "{response_id}"}}\n\n' except TimeoutError: diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 4b7878e5..e64cfbb5 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -327,9 +327,6 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ), delta=event.delta, raw=event.raw, - additional_kwargs={ - "tool_calls": event.tool_calls, - }, ) def gen() -> Generator[ChatResponse, None, None]: From e111981c6a2ed0a4cedfee61379b531970e5dcee Mon Sep 17 00:00:00 2001 From: Baasit Sharief Date: Fri, 13 Jun 2025 11:35:06 -0700 Subject: [PATCH 18/29] getting close to streaming events on non openai agents lastFile:llm-service/app/services/query/agents/tool_calling_querier.py --- .../query/agents/tool_calling_querier.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index e64cfbb5..eb680f53 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -330,14 +330,17 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ) def gen() -> Generator[ChatResponse, None, None]: - async def collect() -> list[ChatResponse]: - results: list[ChatResponse] = [] - async for chunk in agen(): - results.append(chunk) - return results - - for item in asyncio.run(collect()): - yield item + loop = asyncio.new_event_loop() + astream = agen() + try: + while True: + item = loop.run_until_complete(anext(astream)) + yield item + except StopAsyncIteration: + pass + finally: + loop.stop() + loop.close() return gen(), source_nodes From 155a37e797f903f90cd271050df64b392e91a39b Mon Sep 17 00:00:00 2001 From: Baasit Sharief Date: Fri, 13 Jun 2025 12:22:08 -0700 Subject: [PATCH 19/29] gracefully shutdown handler and close loop --- .../services/query/agents/tool_calling_querier.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index eb680f53..1c2c9acd 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -328,6 +328,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: delta=event.delta, raw=event.raw, ) + await handler.ctx.shutdown() def gen() -> Generator[ChatResponse, None, None]: loop = asyncio.new_event_loop() @@ -336,11 +337,16 @@ def gen() -> Generator[ChatResponse, None, None]: while True: item = loop.run_until_complete(anext(astream)) yield item - except StopAsyncIteration: + except (StopAsyncIteration, GeneratorExit): pass finally: - loop.stop() - loop.close() + try: + loop.run_until_complete(astream.aclose()) + except Exception as e: + logger.warning(f"Exception during async generator close: {e}") + if not loop.is_closed(): + loop.stop() + loop.close() return gen(), source_nodes From 6ab4d43aac9f7cb2e495c8d02a2db23f15ae55c0 Mon Sep 17 00:00:00 2001 From: jwatson Date: Fri, 13 Jun 2025 12:46:09 -0700 Subject: [PATCH 20/29] python cleanup --- llm-service/app/routers/index/sessions/__init__.py | 6 +++--- .../app/services/query/agents/tool_calling_querier.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index 4596b4ac..e06b9980 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -46,7 +46,7 @@ from fastapi import APIRouter, Header, HTTPException from fastapi.responses import StreamingResponse -from llama_index.core.base.llms.types import ChatResponse, MessageRole +from llama_index.core.base.llms.types import ChatResponse from pydantic import BaseModel from starlette.responses import ContentStream from starlette.types import Receive @@ -316,13 +316,13 @@ def generate_stream() -> Generator[str, None, None]: first_message = True stream = future.result() for item in stream: - response = cast(ChatResponse, item) + response: ChatResponse = item # Check for cancellation between each response if cancel_event.is_set(): logger.info("Client disconnected during result processing") break if "chat_event" in response.additional_kwargs: - chat_event = response.additional_kwargs.get("chat_event") + chat_event: ChatEvent = response.additional_kwargs.get("chat_event") event_json = json.dumps({"event": chat_event.model_dump()}) yield f"data: {event_json}\n\n" continue diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 1c2c9acd..5df9fe81 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -328,7 +328,8 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: delta=event.delta, raw=event.raw, ) - await handler.ctx.shutdown() + if handler.ctx: + await handler.ctx.shutdown() def gen() -> Generator[ChatResponse, None, None]: loop = asyncio.new_event_loop() From 2a949063e6bc7ba77508b7b161a16af6ad9e796a Mon Sep 17 00:00:00 2001 From: jwatson Date: Fri, 13 Jun 2025 13:17:10 -0700 Subject: [PATCH 21/29] error handling in the non-openai streaming --- .../query/agents/tool_calling_querier.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 5df9fe81..21eb2cd0 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -57,6 +57,7 @@ from llama_index.core.llms.function_calling import FunctionCallingLLM from llama_index.core.schema import NodeWithScore from llama_index.core.tools import BaseTool, ToolOutput +from llama_index.core.workflow import StopEvent from llama_index.llms.openai import OpenAI from app.ai.indexing.summary_indexer import SummaryIndexer @@ -178,7 +179,7 @@ def stream_chat( ) else: gen, source_nodes = _run_non_openai_streamer( - chat_messages, enhanced_query, llm, tools, chat_event_queue + chat_messages, enhanced_query, llm, tools ) return StreamingAgentChatResponse(chat_stream=gen, source_nodes=source_nodes) @@ -189,7 +190,6 @@ def _run_non_openai_streamer( enhanced_query: str, llm: FunctionCallingLLM, tools: list[BaseTool], - chat_event_queue: Queue[ChatEvent], verbose: bool = True, ) -> tuple[Generator[ChatResponse, None, None], list[NodeWithScore]]: agent = FunctionAgent( @@ -202,6 +202,7 @@ def _run_non_openai_streamer( async def agen() -> AsyncGenerator[ChatResponse, None]: handler = agent.run(user_msg=enhanced_query, chat_history=chat_messages) + async for event in handler.stream_events(): if isinstance(event, AgentSetup): data = f"Agent {event.current_agent_name} setup with input: {event.input[-1].content!s}" @@ -224,7 +225,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ), }, ) - if isinstance(event, AgentInput): + elif isinstance(event, AgentInput): data = f"Agent {event.current_agent_name} started with input: {event.input[-1].content!s}" if verbose: logger.info("=== Agent Input ===") @@ -245,7 +246,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ), }, ) - if isinstance(event, ToolCall) and not isinstance(event, ToolCallResult): + elif isinstance(event, ToolCall) and not isinstance(event, ToolCallResult): data = f"Calling function: {event.tool_name} with args: {event.tool_kwargs}" if verbose: logger.info("=== Calling Function ===") @@ -263,7 +264,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ), }, ) - if isinstance(event, ToolCallResult): + elif isinstance(event, ToolCallResult): data = f"Got output: {event.tool_output!s}" if verbose: logger.info(data) @@ -292,7 +293,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ), }, ) - if isinstance(event, AgentOutput): + elif isinstance(event, AgentOutput): data = f"Agent {event.current_agent_name} response: {event.response!s}" if verbose: logger.info("=== LLM Response ===") @@ -317,7 +318,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: ), }, ) - if isinstance(event, AgentStream): + elif isinstance(event, AgentStream): if event.response: # Yield the delta response as a ChatResponse yield ChatResponse( @@ -328,6 +329,11 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: delta=event.delta, raw=event.raw, ) + else: + logger.info(f"Unhandled event of type: {type(event)}: {event}") + await handler + if e := handler.exception(): + raise e if handler.ctx: await handler.ctx.shutdown() From b2458c4d04e2bd2a01a63d3aaedfe3c7ee36d796 Mon Sep 17 00:00:00 2001 From: jwatson Date: Fri, 13 Jun 2025 13:28:42 -0700 Subject: [PATCH 22/29] cleanup --- .../app/routers/index/sessions/__init__.py | 29 ------------------- .../query/agents/tool_calling_querier.py | 2 +- 2 files changed, 1 insertion(+), 30 deletions(-) diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index e06b9980..93669a2c 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -263,35 +263,6 @@ def stream_chat_completion( # Create a cancellation event to signal when the client disconnects cancel_event = threading.Event() - def tools_callback(chat_future: Future[Any]) -> Generator[str, None, None]: - while True: - # Check if client has disconnected - if cancel_event.is_set(): - logger.info("Client disconnected, stopping tool callback") - # Try to cancel the future if it's still running - if not chat_future.done(): - chat_future.cancel() - break - - if chat_future.done() and (e := chat_future.exception()): - raise e - - try: - event_data = chat_event_queue.get(block=True, timeout=1.0) - print(event_data) - if event_data.type == poison_pill: - break - event_json = json.dumps({"event": event_data.model_dump()}) - yield f"data: {event_json}\n\n" - except queue.Empty: - # Send a heartbeat event every second to keep the connection alive - heartbeat = ChatEvent( - type="event", name="Processing", timestamp=time.time() - ) - event_json = json.dumps({"event": heartbeat.model_dump()}) - yield f"data: {event_json}\n\n" - time.sleep(1) - def generate_stream() -> Generator[str, None, None]: response_id: str = "" executor = None diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 21eb2cd0..e6c524f5 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -234,7 +234,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: yield ChatResponse( message=ChatMessage( role=MessageRole.FUNCTION, - content=data, + content="", ), delta="", raw="", From 2ad8039adc27ede421aa308c190c19ef98837ed8 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 13 Jun 2025 14:54:28 -0600 Subject: [PATCH 23/29] render contents of a tags and remove chat event queue --- llm-service/app/routers/index/sessions/__init__.py | 12 +++++------- llm-service/app/services/chat/streaming_chat.py | 7 +------ .../services/query/agents/tool_calling_querier.py | 7 ++----- llm-service/app/services/query/querier.py | 4 ---- .../ChatOutput/ChatMessages/MarkdownResponse.tsx | 13 ++++++++----- 5 files changed, 16 insertions(+), 27 deletions(-) diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index 93669a2c..55827886 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -38,11 +38,10 @@ import base64 import json import logging -import queue import threading import time -from concurrent.futures import Future, ThreadPoolExecutor -from typing import Optional, Generator, Any, cast +from concurrent.futures import ThreadPoolExecutor +from typing import Optional, Generator, Any from fastapi import APIRouter, Header, HTTPException from fastapi.responses import StreamingResponse @@ -64,7 +63,6 @@ from ....services.chat_history.paginator import paginate from ....services.metadata_apis import session_metadata_api from ....services.mlflow import rating_mlflow_log_metric, feedback_mlflow_log_table -from ....services.query.agents.tool_calling_querier import poison_pill from ....services.query.chat_events import ChatEvent from ....services.session import rename_session @@ -259,7 +257,6 @@ def stream_chat_completion( session = session_metadata_api.get_session(session_id, user_name=origin_remote_user) configuration = request.configuration or RagPredictConfiguration() - chat_event_queue: queue.Queue[ChatEvent] = queue.Queue() # Create a cancellation event to signal when the client disconnects cancel_event = threading.Event() @@ -276,7 +273,6 @@ def generate_stream() -> Generator[str, None, None]: query=request.query, configuration=configuration, user_name=origin_remote_user, - chat_event_queue=chat_event_queue, ) # If we get here and the cancel_event is set, the client has disconnected @@ -299,7 +295,9 @@ def generate_stream() -> Generator[str, None, None]: continue # send an initial message to let the client know the response stream is starting if first_message: - done = ChatEvent(type="done", name="agent_done", timestamp=time.time()) + done = ChatEvent( + type="done", name="agent_done", timestamp=time.time() + ) event_json = json.dumps({"event": done.model_dump()}) yield f"data: {event_json}\n\n" first_message = False diff --git a/llm-service/app/services/chat/streaming_chat.py b/llm-service/app/services/chat/streaming_chat.py index 709fe95f..0a3902f8 100644 --- a/llm-service/app/services/chat/streaming_chat.py +++ b/llm-service/app/services/chat/streaming_chat.py @@ -37,7 +37,6 @@ # import time import uuid -from queue import Queue from typing import Optional, Generator from llama_index.core.base.llms.types import ChatResponse, ChatMessage @@ -63,7 +62,6 @@ FlexibleContextChatEngine, build_flexible_chat_engine, ) -from app.services.query.chat_events import ChatEvent from app.services.query.querier import ( build_retriever, ) @@ -75,7 +73,6 @@ def stream_chat( query: str, configuration: RagPredictConfiguration, user_name: Optional[str], - chat_event_queue: Queue[ChatEvent], ) -> Generator[ChatResponse, None, None]: query_configuration = QueryConfiguration( top_k=session.response_chunks, @@ -102,7 +99,7 @@ def stream_chat( return _stream_direct_llm_chat(session, response_id, query, user_name) condensed_question, streaming_chat_response = build_streamer( - chat_event_queue, query, query_configuration, session + query, query_configuration, session ) return _run_streaming_chat( @@ -149,7 +146,6 @@ def _run_streaming_chat( def build_streamer( - chat_events_queue: Queue[ChatEvent], query: str, query_configuration: QueryConfiguration, session: Session, @@ -178,7 +174,6 @@ def build_streamer( query, query_configuration, chat_messages, - chat_event_queue=chat_events_queue, session=session, ) return condensed_question, streaming_chat_response diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index e6c524f5..1033e357 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -38,7 +38,6 @@ import asyncio import logging import os -from queue import Queue from typing import Optional, Generator, AsyncGenerator, Callable, cast, Any import opik @@ -57,7 +56,6 @@ from llama_index.core.llms.function_calling import FunctionCallingLLM from llama_index.core.schema import NodeWithScore from llama_index.core.tools import BaseTool, ToolOutput -from llama_index.core.workflow import StopEvent from llama_index.llms.openai import OpenAI from app.ai.indexing.summary_indexer import SummaryIndexer @@ -114,7 +112,7 @@ def should_use_retrieval( * Cite from node_ids in the given format: the node_id \ should be in an html anchor tag () with an html 'class' of 'rag_citation'. \ Do not use filenames as citations. Only node ids should be used. \ -For example: 2. Do not make up node ids that are not present +For example: . Do not make up node ids that are not present in the context. * All citations should be either in-line citations or markdown links. @@ -152,7 +150,6 @@ def stream_chat( chat_messages: list[ChatMessage], session: Session, data_source_summaries: dict[int, str], - chat_event_queue: Queue[ChatEvent], ) -> StreamingAgentChatResponse: mcp_tools: list[BaseTool] = [] if session.query_configuration and session.query_configuration.selected_tools: @@ -333,7 +330,7 @@ async def agen() -> AsyncGenerator[ChatResponse, None]: logger.info(f"Unhandled event of type: {type(event)}: {event}") await handler if e := handler.exception(): - raise e + raise e if handler.ctx: await handler.ctx.shutdown() diff --git a/llm-service/app/services/query/querier.py b/llm-service/app/services/query/querier.py index d5f9ae9a..898120b7 100644 --- a/llm-service/app/services/query/querier.py +++ b/llm-service/app/services/query/querier.py @@ -30,7 +30,6 @@ from __future__ import annotations import re -from queue import Queue from typing import Optional, TYPE_CHECKING, cast from llama_index.core.base.base_retriever import BaseRetriever @@ -43,7 +42,6 @@ should_use_retrieval, stream_chat, ) -from .chat_events import ChatEvent from .flexible_retriever import FlexibleRetriever from .multi_retriever import MultiSourceRetriever from ..metadata_apis.session_metadata_api import Session @@ -75,7 +73,6 @@ def streaming_query( query_str: str, configuration: QueryConfiguration, chat_messages: list[ChatMessage], - chat_event_queue: Queue[ChatEvent], session: Session, ) -> StreamingAgentChatResponse: llm = models.LLM.get(model_name=configuration.model_name) @@ -94,7 +91,6 @@ def streaming_query( chat_messages, session, data_source_summaries, - chat_event_queue, ) return chat_response if not chat_engine: diff --git a/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/MarkdownResponse.tsx b/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/MarkdownResponse.tsx index d8488085..7da784db 100644 --- a/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/MarkdownResponse.tsx +++ b/ui/src/pages/RagChatTab/ChatOutput/ChatMessages/MarkdownResponse.tsx @@ -58,7 +58,7 @@ export const MarkdownResponse = ({ data }: { data: ChatMessageType }) => { const { href, className, children, ...other } = props; if (className === "rag_citation") { if (data.source_nodes.length === 0) { - return undefined; + return {props.children}; } const { source_nodes } = data; const sourceNodeIndex = source_nodes.findIndex( @@ -66,10 +66,13 @@ export const MarkdownResponse = ({ data }: { data: ChatMessageType }) => { ); if (sourceNodeIndex >= 0) { return ( - + + {props.children} + + ); } if (!href?.startsWith("http")) { From 0f5b9ac25706257f184359dd75353d35a91e20a0 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 13 Jun 2025 16:20:12 -0600 Subject: [PATCH 24/29] input for date tool --- .../services/query/agents/agent_tools/date.py | 20 ++++++++++++++----- .../query/agents/tool_calling_querier.py | 4 ++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/llm-service/app/services/query/agents/agent_tools/date.py b/llm-service/app/services/query/agents/agent_tools/date.py index 6cf91c04..9c6987b2 100644 --- a/llm-service/app/services/query/agents/agent_tools/date.py +++ b/llm-service/app/services/query/agents/agent_tools/date.py @@ -36,7 +36,6 @@ # DATA. # from datetime import datetime -from typing import Any from llama_index.core.tools import BaseTool, ToolOutput, ToolMetadata from pydantic import BaseModel @@ -46,17 +45,28 @@ class DateToolInput(BaseModel): """ Input schema for the DateTool """ + input: None = None + class DateTool(BaseTool): """ A tool that provides the current date and time. """ + @property def metadata(self) -> ToolMetadata: - return ToolMetadata(name="date_tool", description="A tool that provides the current date and time.", fn_schema=DateToolInput) + return ToolMetadata( + name="date_tool", + description="A tool that provides the current date and time.", + fn_schema=DateToolInput, + ) - def __call__(self, input_: Any) -> ToolOutput: + def __call__(self, input) -> ToolOutput: now = datetime.now() - return ToolOutput(content=f"The current date is {now.strftime('%Y-%m-%d %H:%M:%S')}", tool_name="date_tool", raw_input={}, raw_output=now) - + return ToolOutput( + content=f"The current date is {now.strftime('%Y-%m-%d %H:%M:%S')}", + tool_name="date_tool", + raw_input={}, + raw_output=now, + ) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index 1033e357..a9451318 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -374,6 +374,10 @@ def _openai_agent_streamer( completed_tasks = agent.get_completed_tasks() print(f"Completed tasks: {completed_tasks}") + for task in completed_tasks: + print(f"Task {task.input} completed with result: {task.task_id}") + completed_steps = agent.get_completed_steps(task_id=task.task_id) + print(f"Completed steps: {completed_steps}") def gen() -> Generator[ChatResponse, None, None]: response = "" From 85ee90ad266421bc05f74d90798e2d5024508105 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 13 Jun 2025 16:32:29 -0600 Subject: [PATCH 25/29] default input --- llm-service/app/services/query/agents/agent_tools/date.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llm-service/app/services/query/agents/agent_tools/date.py b/llm-service/app/services/query/agents/agent_tools/date.py index 9c6987b2..01ce3ca7 100644 --- a/llm-service/app/services/query/agents/agent_tools/date.py +++ b/llm-service/app/services/query/agents/agent_tools/date.py @@ -62,7 +62,7 @@ def metadata(self) -> ToolMetadata: fn_schema=DateToolInput, ) - def __call__(self, input) -> ToolOutput: + def __call__(self, input=None) -> ToolOutput: now = datetime.now() return ToolOutput( content=f"The current date is {now.strftime('%Y-%m-%d %H:%M:%S')}", From 1d88bac2946027dcba3b9997fc25c818b872431b Mon Sep 17 00:00:00 2001 From: jwatson Date: Fri, 13 Jun 2025 15:45:40 -0700 Subject: [PATCH 26/29] fix duplicated timestamp issue --- llm-service/app/services/query/chat_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/llm-service/app/services/query/chat_events.py b/llm-service/app/services/query/chat_events.py index dfbb6ecd..e8fe3936 100644 --- a/llm-service/app/services/query/chat_events.py +++ b/llm-service/app/services/query/chat_events.py @@ -40,14 +40,14 @@ from queue import Queue from typing import Optional, Any -from pydantic import BaseModel +from pydantic import BaseModel, Field class ChatEvent(BaseModel): type: str name: str data: Optional[str] = None - timestamp: float = time.time() + timestamp: float = Field(default_factory=lambda : time.time()) def step_callback(output: Any, agent: str, tool_events_queue: Queue[ChatEvent]) -> None: From 084409b072f19e60c9786cf91655317bd9d165ec Mon Sep 17 00:00:00 2001 From: jwatson Date: Fri, 13 Jun 2025 15:51:29 -0700 Subject: [PATCH 27/29] mypy --- llm-service/app/services/query/agents/agent_tools/date.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llm-service/app/services/query/agents/agent_tools/date.py b/llm-service/app/services/query/agents/agent_tools/date.py index 01ce3ca7..ac308196 100644 --- a/llm-service/app/services/query/agents/agent_tools/date.py +++ b/llm-service/app/services/query/agents/agent_tools/date.py @@ -62,7 +62,7 @@ def metadata(self) -> ToolMetadata: fn_schema=DateToolInput, ) - def __call__(self, input=None) -> ToolOutput: + def __call__(self, input: None=None) -> ToolOutput: now = datetime.now() return ToolOutput( content=f"The current date is {now.strftime('%Y-%m-%d %H:%M:%S')}", From 0fa1371c7e963ddf631257a5df56df17ba052df4 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 13 Jun 2025 16:56:12 -0600 Subject: [PATCH 28/29] remove openaiagent --- .../query/agents/tool_calling_querier.py | 67 ++----------------- llm-service/pyproject.toml | 1 - llm-service/uv.lock | 16 ----- 3 files changed, 4 insertions(+), 80 deletions(-) diff --git a/llm-service/app/services/query/agents/tool_calling_querier.py b/llm-service/app/services/query/agents/tool_calling_querier.py index a9451318..053493c8 100644 --- a/llm-service/app/services/query/agents/tool_calling_querier.py +++ b/llm-service/app/services/query/agents/tool_calling_querier.py @@ -41,7 +41,6 @@ from typing import Optional, Generator, AsyncGenerator, Callable, cast, Any import opik -from llama_index.agent.openai import OpenAIAgent from llama_index.core.agent.workflow import ( FunctionAgent, AgentStream, @@ -55,8 +54,7 @@ from llama_index.core.chat_engine.types import StreamingAgentChatResponse from llama_index.core.llms.function_calling import FunctionCallingLLM from llama_index.core.schema import NodeWithScore -from llama_index.core.tools import BaseTool, ToolOutput -from llama_index.llms.openai import OpenAI +from llama_index.core.tools import BaseTool from app.ai.indexing.summary_indexer import SummaryIndexer from app.services.metadata_apis.session_metadata_api import Session @@ -170,19 +168,13 @@ def stream_chat( ) tools.append(retrieval_tool) tools.extend(mcp_tools) - if isinstance(llm, OpenAI): - gen, source_nodes = _openai_agent_streamer( - chat_messages, enhanced_query, llm, tools - ) - else: - gen, source_nodes = _run_non_openai_streamer( - chat_messages, enhanced_query, llm, tools - ) + + gen, source_nodes = _run_streamer(chat_messages, enhanced_query, llm, tools) return StreamingAgentChatResponse(chat_stream=gen, source_nodes=source_nodes) -def _run_non_openai_streamer( +def _run_streamer( chat_messages: list[ChatMessage], enhanced_query: str, llm: FunctionCallingLLM, @@ -353,54 +345,3 @@ def gen() -> Generator[ChatResponse, None, None]: loop.close() return gen(), source_nodes - - -def _openai_agent_streamer( - chat_messages: list[ChatMessage], - enhanced_query: str, - llm: OpenAI, - tools: list[BaseTool], - verbose: bool = True, -) -> tuple[Generator[ChatResponse, None, None], list[NodeWithScore]]: - agent = OpenAIAgent.from_tools( - tools=tools, - llm=llm, - verbose=verbose, - system_prompt=DEFAULT_AGENT_PROMPT, - ) - stream_chat_response: StreamingAgentChatResponse = agent.stream_chat( - message=enhanced_query, chat_history=chat_messages - ) - - completed_tasks = agent.get_completed_tasks() - print(f"Completed tasks: {completed_tasks}") - for task in completed_tasks: - print(f"Task {task.input} completed with result: {task.task_id}") - completed_steps = agent.get_completed_steps(task_id=task.task_id) - print(f"Completed steps: {completed_steps}") - - def gen() -> Generator[ChatResponse, None, None]: - response = "" - res = stream_chat_response.response_gen - for chunk in res: - response += chunk - finalize_response = ChatResponse( - message=ChatMessage(role="assistant", content=response), - delta=chunk, - ) - yield finalize_response - - source_nodes = [] - if stream_chat_response.sources: - for tool_output in stream_chat_response.sources: - if isinstance(tool_output, ToolOutput): - if ( - tool_output.raw_output - and isinstance(tool_output.raw_output, list) - and all( - isinstance(elem, NodeWithScore) - for elem in tool_output.raw_output - ) - ): - source_nodes.extend(tool_output.raw_output) - return gen(), source_nodes diff --git a/llm-service/pyproject.toml b/llm-service/pyproject.toml index 93845168..7aae639b 100644 --- a/llm-service/pyproject.toml +++ b/llm-service/pyproject.toml @@ -45,7 +45,6 @@ dependencies = [ "llama-index-callbacks-opik>=1.1.0", "mcp[cli]>=1.9.1", "pysqlite3-binary==0.5.4; platform_system == 'Linux'", - "llama-index-agent-openai>=0.4.8", "llama-index-tools-mcp>=0.2.5", ] requires-python = ">=3.10,<=3.12" diff --git a/llm-service/uv.lock b/llm-service/uv.lock index bbe65713..976acc6e 100644 --- a/llm-service/uv.lock +++ b/llm-service/uv.lock @@ -2044,20 +2044,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/10/af/1e344bc8aee41445272e677d802b774b1f8b34bdc3bb5697ba30f0fb5d52/litellm-1.68.0-py3-none-any.whl", hash = "sha256:3bca38848b1a5236b11aa6b70afa4393b60880198c939e582273f51a542d4759", size = 7684460 }, ] -[[package]] -name = "llama-index-agent-openai" -version = "0.4.8" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "llama-index-core" }, - { name = "llama-index-llms-openai" }, - { name = "openai" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/2c/10/34454bd6563ff7fb63dec264a34e2749486194f9b4fb1ea8c2e4b9f8e2e9/llama_index_agent_openai-0.4.8.tar.gz", hash = "sha256:ba76f21e1b7f0f66e326dc419c2cc403cbb614ae28f7904540b1103695965f68", size = 12230 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/ce/b8/1d7f50b6471fd73ff7309e6abd808c935a8d9d8547b192ce56ed3a05c142/llama_index_agent_openai-0.4.8-py3-none-any.whl", hash = "sha256:a03e8609ada0355b408d4173cd7663708f826f23328f9719fba00ea20b6851b6", size = 14212 }, -] - [[package]] name = "llama-index-callbacks-opik" version = "1.1.0" @@ -2350,7 +2336,6 @@ dependencies = [ { name = "docx2txt" }, { name = "fastapi", extra = ["standard"] }, { name = "fastapi-utils" }, - { name = "llama-index-agent-openai" }, { name = "llama-index-callbacks-opik" }, { name = "llama-index-core" }, { name = "llama-index-embeddings-azure-openai" }, @@ -2407,7 +2392,6 @@ requires-dist = [ { name = "docx2txt", specifier = ">=0.8" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.111.0" }, { name = "fastapi-utils", specifier = ">=0.8.0" }, - { name = "llama-index-agent-openai", specifier = ">=0.4.8" }, { name = "llama-index-callbacks-opik", specifier = ">=1.1.0" }, { name = "llama-index-core", specifier = ">=0.10.68" }, { name = "llama-index-embeddings-azure-openai", specifier = ">=0.3.0" }, From 28fc83c2d2b468f1af55343ee2f9e67e2f32f43d Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 13 Jun 2025 17:00:07 -0600 Subject: [PATCH 29/29] update lock file --- llm-service/uv.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/llm-service/uv.lock b/llm-service/uv.lock index 976acc6e..f7c0d7db 100644 --- a/llm-service/uv.lock +++ b/llm-service/uv.lock @@ -2363,7 +2363,7 @@ dependencies = [ { name = "presidio-anonymizer" }, { name = "pydantic" }, { name = "pydantic-settings" }, - { name = "pysqlite3-binary", marker = "platform_system == 'Linux'" }, + { name = "pysqlite3-binary", marker = "platform_machine != 'aarch64' and platform_system == 'Linux'" }, { name = "python-pptx" }, { name = "qdrant-client" }, { name = "torch" }, @@ -2419,7 +2419,7 @@ requires-dist = [ { name = "presidio-anonymizer", specifier = ">=2.2.355" }, { name = "pydantic", specifier = ">=2.8.2" }, { name = "pydantic-settings", specifier = ">=2.3.4" }, - { name = "pysqlite3-binary", marker = "platform_system == 'Linux'", specifier = "==0.5.4" }, + { name = "pysqlite3-binary", marker = "platform_machine != 'aarch64' and platform_system == 'Linux'", specifier = "==0.5.4" }, { name = "python-pptx", specifier = ">=1.0.2" }, { name = "qdrant-client", specifier = "<1.13.0" }, { name = "torch", specifier = ">=2.5.1" },