Skip to content

Commit 4ca77bc

Browse files
hangfeicopybara-github
authored andcommitted
fix: Use inspect.signature() instead of typing.get_type_hints
fix: Use `inspect.signature()` instead of `typing.get_type_hints()` to find the LiveRequestQueue. Motivation: `typing.get_type_hints()` may raise errors in complex scenarios where type annotation is not available. Add live_bidi_streaming_tools_agent example to show how to use the live streaming feature. PiperOrigin-RevId: 780160921
1 parent dc414cb commit 4ca77bc

File tree

4 files changed

+201
-10
lines changed

4 files changed

+201
-10
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from . import agent
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
from typing import AsyncGenerator
17+
18+
from google.adk.agents import LiveRequestQueue
19+
from google.adk.agents.llm_agent import Agent
20+
from google.adk.tools.function_tool import FunctionTool
21+
from google.genai import Client
22+
from google.genai import types as genai_types
23+
24+
25+
async def monitor_stock_price(stock_symbol: str) -> AsyncGenerator[str, None]:
26+
"""This function will monitor the price for the given stock_symbol in a continuous, streaming and asynchronously way."""
27+
print(f"Start monitor stock price for {stock_symbol}!")
28+
29+
# Let's mock stock price change.
30+
await asyncio.sleep(4)
31+
price_alert1 = f"the price for {stock_symbol} is 300"
32+
yield price_alert1
33+
print(price_alert1)
34+
35+
await asyncio.sleep(4)
36+
price_alert1 = f"the price for {stock_symbol} is 400"
37+
yield price_alert1
38+
print(price_alert1)
39+
40+
await asyncio.sleep(20)
41+
price_alert1 = f"the price for {stock_symbol} is 900"
42+
yield price_alert1
43+
print(price_alert1)
44+
45+
await asyncio.sleep(20)
46+
price_alert1 = f"the price for {stock_symbol} is 500"
47+
yield price_alert1
48+
print(price_alert1)
49+
50+
51+
# for video streaming, `input_stream: LiveRequestQueue` is required and reserved key parameter for ADK to pass the video streams in.
52+
async def monitor_video_stream(
53+
input_stream: LiveRequestQueue,
54+
) -> AsyncGenerator[str, None]:
55+
"""Monitor how many people are in the video streams."""
56+
print("start monitor_video_stream!")
57+
client = Client(vertexai=False)
58+
prompt_text = (
59+
"Count the number of people in this image. Just respond with a numeric"
60+
" number."
61+
)
62+
last_count = None
63+
while True:
64+
last_valid_req = None
65+
print("Start monitoring loop")
66+
67+
# use this loop to pull the latest images and discard the old ones
68+
while input_stream._queue.qsize() != 0:
69+
live_req = await input_stream.get()
70+
71+
if live_req.blob is not None and live_req.blob.mime_type == "image/jpeg":
72+
last_valid_req = live_req
73+
74+
# If we found a valid image, process it
75+
if last_valid_req is not None:
76+
print("Processing the most recent frame from the queue")
77+
78+
# Create an image part using the blob's data and mime type
79+
image_part = genai_types.Part.from_bytes(
80+
data=last_valid_req.blob.data, mime_type=last_valid_req.blob.mime_type
81+
)
82+
83+
contents = genai_types.Content(
84+
role="user",
85+
parts=[image_part, genai_types.Part.from_text(text=prompt_text)],
86+
)
87+
88+
# Call the model to generate content based on the provided image and prompt
89+
response = client.models.generate_content(
90+
model="gemini-2.0-flash-exp",
91+
contents=contents,
92+
config=genai_types.GenerateContentConfig(
93+
system_instruction=(
94+
"You are a helpful video analysis assistant. You can count"
95+
" the number of people in this image or video. Just respond"
96+
" with a numeric number."
97+
)
98+
),
99+
)
100+
if not last_count:
101+
last_count = response.candidates[0].content.parts[0].text
102+
elif last_count != response.candidates[0].content.parts[0].text:
103+
last_count = response.candidates[0].content.parts[0].text
104+
yield response
105+
print("response:", response)
106+
107+
# Wait before checking for new images
108+
await asyncio.sleep(0.5)
109+
110+
111+
# Use this exact function to help ADK stop your streaming tools when requested.
112+
# for example, if we want to stop `monitor_stock_price`, then the agent will
113+
# invoke this function with stop_streaming(function_name=monitor_stock_price).
114+
def stop_streaming(function_name: str):
115+
"""Stop the streaming
116+
117+
Args:
118+
function_name: The name of the streaming function to stop.
119+
"""
120+
pass
121+
122+
123+
root_agent = Agent(
124+
model="gemini-live-2.5-flash-preview",
125+
name="video_streaming_agent",
126+
instruction="""
127+
You are a monitoring agent. You can do video monitoring and stock price monitoring
128+
using the provided tools/functions.
129+
When users want to monitor a video stream,
130+
You can use monitor_video_stream function to do that. When monitor_video_stream
131+
returns the alert, you should tell the users.
132+
When users want to monitor a stock price, you can use monitor_stock_price.
133+
Don't ask too many questions. Don't be too talkative.
134+
""",
135+
tools=[
136+
monitor_video_stream,
137+
monitor_stock_price,
138+
FunctionTool(stop_streaming),
139+
],
140+
)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
This is only supported in streaming(live) agents/api.
2+
3+
Streaming tools allows tools(functions) to stream intermediate results back to agents and agents can respond to those intermediate results.
4+
For example, we can use streaming tools to monitor the changes of the stock price and have the agent react to it. Another example is we can have the agent monitor the video stream, and when there is changes in video stream, the agent can report the changes.
5+
6+
To define a streaming tool, you must adhere to the following:
7+
8+
1. **Asynchronous Function:** The tool must be an `async` Python function.
9+
2. **AsyncGenerator Return Type:** The function must be typed to return an `AsyncGenerator`. The first type parameter to `AsyncGenerator` is the type of the data you `yield` (e.g., `str` for text messages, or a custom object for structured data). The second type parameter is typically `None` if the generator doesn't receive values via `send()`.
10+
11+
12+
We support two types of streaming tools:
13+
- Simple type. This is a one type of streaming tools that only take non video/audio streams(the streams that you feed to adk web or adk runner) as input.
14+
- Video streaming tools. This only works in video streaming and the video stream(the streams that you feed to adk web or adk runner) will be passed into this function.
15+
16+
17+
Here are some sample queries to test:
18+
- Help me monitor the stock price for $XYZ stock.
19+
- Help me monitor how many people are there in the video stream.

src/google/adk/runners.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -307,27 +307,44 @@ async def run_live(
307307
root_agent = self.agent
308308
invocation_context.agent = self._find_agent_to_run(session, root_agent)
309309

310+
# Pre-processing for live streaming tools
311+
# Inspect the tool's parameters to find if it uses LiveRequestQueue
310312
invocation_context.active_streaming_tools = {}
311313
# TODO(hangfei): switch to use canonical_tools.
312314
# for shell agents, there is no tools associated with it so we should skip.
313315
if hasattr(invocation_context.agent, 'tools'):
314-
for tool in invocation_context.agent.tools:
315-
# replicate a LiveRequestQueue for streaming tools that relis on
316-
# LiveRequestQueue
317-
from typing import get_type_hints
316+
import inspect
318317

319-
type_hints = get_type_hints(tool)
320-
for arg_type in type_hints.values():
321-
if arg_type is LiveRequestQueue:
318+
for tool in invocation_context.agent.tools:
319+
# We use `inspect.signature()` to examine the tool's underlying function (`tool.func`).
320+
# This approach is deliberately chosen over `typing.get_type_hints()` for robustness.
321+
#
322+
# The Problem with `get_type_hints()`:
323+
# `get_type_hints()` attempts to resolve forward-referenced (string-based) type
324+
# annotations. This resolution can easily fail with a `NameError` (e.g., "Union not found")
325+
# if the type isn't available in the scope where `get_type_hints()` is called.
326+
# This is a common and brittle issue in framework code that inspects functions
327+
# defined in separate user modules.
328+
#
329+
# Why `inspect.signature()` is Better Here:
330+
# `inspect.signature()` does NOT resolve the annotations; it retrieves the raw
331+
# annotation object as it was defined on the function. This allows us to
332+
# perform a direct and reliable identity check (`param.annotation is LiveRequestQueue`)
333+
# without risking a `NameError`.
334+
callable_to_inspect = tool.func if hasattr(tool, 'func') else tool
335+
# Ensure the target is actually callable before inspecting to avoid errors.
336+
if not callable(callable_to_inspect):
337+
continue
338+
for param in inspect.signature(callable_to_inspect).parameters.values():
339+
if param.annotation is LiveRequestQueue:
322340
if not invocation_context.active_streaming_tools:
323341
invocation_context.active_streaming_tools = {}
324-
active_streaming_tools = ActiveStreamingTool(
342+
active_streaming_tool = ActiveStreamingTool(
325343
stream=LiveRequestQueue()
326344
)
327345
invocation_context.active_streaming_tools[tool.__name__] = (
328-
active_streaming_tools
346+
active_streaming_tool
329347
)
330-
331348
async for event in invocation_context.agent.run_live(invocation_context):
332349
await self.session_service.append_event(session=session, event=event)
333350
yield event

0 commit comments

Comments
 (0)