Skip to content

Commit 9a5e34d

Browse files
committed
Support authoring ADK agents using processors.
PiperOrigin-RevId: 816684684
1 parent be6e32a commit 9a5e34d

File tree

4 files changed

+481
-0
lines changed

4 files changed

+481
-0
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
# ==============================================================================
15+
16+
r"""Live commentator ADK agent based on GenAI Processors.
17+
18+
## Setup
19+
20+
To install the dependencies for this script, run:
21+
22+
```
23+
pip install genai-processors google-adk
24+
```
25+
26+
Before running this script, ensure the `GOOGLE_API_KEY` environment
27+
variable is set to the api-key you obtained from Google AI Studio.
28+
29+
## Run
30+
31+
Change directory to the parent folder (genai-processors/examples/live) and run
32+
`adk web`. then navigate to http://localhost:8000/ select "commentator_adk"
33+
agent and click on the "Use camera" button.
34+
35+
To restart a session click on the "New session" button and reload the page.
36+
"""
37+
38+
import os
39+
40+
from genai_processors.core import adk
41+
import commentator
42+
43+
44+
# You need to define the API key in the environment variables.
45+
# export GOOGLE_API_KEY=...
46+
API_KEY = os.environ['GOOGLE_API_KEY']
47+
48+
49+
root_agent = adk.ProcessorAgent(
50+
(lambda: commentator.create_live_commentator(API_KEY)),
51+
name='commentator_adk',
52+
)

examples/trip_request_adk/agent.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
# ==============================================================================
15+
16+
r"""ADK agent version of the trip request planner.
17+
18+
We use Gemini flash-lite to formalize freeform trip request into the dates and
19+
destination. Then we use a second model to compose the trip itinerary.
20+
21+
This simple example shows how we can reduce perceived latency by running a fast
22+
model to validate and acknowledge user request while the good but slow model is
23+
handling it.
24+
25+
The approach from this example also can be used as a defense mechanism against
26+
prompt injections. The first model without tool access formalizes the request
27+
into the TripRequest dataclass. The attack surface is significantly reduced by
28+
the narrowness of the output format and lack of tools. Then a second model is
29+
run on this cleanup up input.
30+
31+
## Setup
32+
33+
To install the dependencies for this script, run:
34+
35+
```
36+
pip install genai-processors google-adk
37+
```
38+
39+
Before running this script, ensure the `GOOGLE_API_KEY` environment
40+
variable is set to the api-key you obtained from Google AI Studio.
41+
42+
## Run
43+
44+
Change directory to `genai-processors/examples` and run `adk web`.
45+
Then navigate to http://localhost:8000/ select "trip_request_adk"
46+
agent and enter your trip request.
47+
"""
48+
49+
from collections.abc import AsyncIterable
50+
import datetime
51+
import os
52+
53+
import dataclasses_json
54+
from genai_processors import content_api
55+
from genai_processors import processor
56+
from genai_processors import switch
57+
from genai_processors.core import adk
58+
from genai_processors.core import genai_model
59+
from genai_processors.core import preamble
60+
from google.genai import types as genai_types
61+
from pydantic import dataclasses
62+
63+
# You need to define the API key in the environment variables.
64+
API_KEY = os.environ['GOOGLE_API_KEY']
65+
66+
67+
@dataclasses_json.dataclass_json
68+
@dataclasses.dataclass(frozen=True)
69+
class TripRequest:
70+
"""A trip request required for GenAI models to generate structured output."""
71+
72+
start_date: str
73+
end_date: str
74+
destination: str
75+
error: str
76+
77+
def info(self) -> str:
78+
"""Returns a string representation to be used in prompts."""
79+
return (
80+
'\nTrip information:\n'
81+
f'Start date: {self.start_date}\n'
82+
f'End date: {self.end_date}\n'
83+
f'Destination: {self.destination}\n'
84+
)
85+
86+
87+
# A processor can be easily defined as a function with a dedicated decorator.
88+
# This is the recommended way to define stateless processors.
89+
@processor.part_processor_function
90+
async def process_json_output(
91+
part: content_api.ProcessorPart,
92+
) -> AsyncIterable[content_api.ProcessorPart]:
93+
"""Process the json output of a GenAI model."""
94+
trip_request = part.get_dataclass(TripRequest)
95+
if trip_request.error:
96+
yield content_api.ProcessorPart(
97+
trip_request.error,
98+
substream_name='error',
99+
)
100+
else:
101+
yield content_api.ProcessorPart(trip_request.info())
102+
103+
104+
def create_trip_request_processor() -> processor.Processor:
105+
"""Creates a trip request processor."""
106+
# First processor extracts a json trip request from the user input.
107+
# We need a json dataclass (we use the wrapper from pydantic) to parse the
108+
# json output of the model. We add the current date to the prompt to make
109+
# sure the model uses the current date.
110+
extract_trip_request = preamble.Suffix(
111+
content_factory=lambda: f'Today is: {datetime.date.today()}'
112+
) + genai_model.GenaiModel(
113+
api_key=API_KEY,
114+
model_name='gemini-2.0-flash-lite',
115+
generate_content_config=genai_types.GenerateContentConfig(
116+
system_instruction=(
117+
'You are a travel agent. You are given a trip request from a'
118+
' user. You need to check if the user provided all necessary'
119+
' information. If the user request is missing any'
120+
' information, you need to return an error message. If the'
121+
' user request is complete, you need to return the user'
122+
' request with the start date, end date and the destination.'
123+
),
124+
response_schema=TripRequest,
125+
response_mime_type='application/json',
126+
),
127+
)
128+
# Second processor generates a trip itinerary based on a valid trip request.
129+
generate_trip = genai_model.GenaiModel(
130+
api_key=API_KEY,
131+
# NOTE: To reduce cost of running the demo we use the flash model.
132+
# The real application would use a better but slower thinking model.
133+
# The perceived latency of that model would be hidden by the fast answer
134+
# from extract_trip_request and acknowledging to the user that we've
135+
# started planning the trip.
136+
model_name='gemini-2.0-flash-lite',
137+
generate_content_config=genai_types.GenerateContentConfig(
138+
system_instruction=(
139+
'You are a travel agent. You are given a trip request from a user'
140+
' with dates and destination. Plan a trip with hotels and'
141+
' activities. Split the plan into daily section. Plan one'
142+
' activity per 1/2 day max.'
143+
),
144+
# Ground with Google Search
145+
tools=[genai_types.Tool(google_search=genai_types.GoogleSearch())],
146+
),
147+
)
148+
149+
# Returns a preamble part with a message to the user.
150+
msg_to_user = preamble.Preamble(
151+
content='OK, preparing a trip for the following request:\n',
152+
)
153+
154+
# Plumb everything together with a logical switch that lets us handle errors.
155+
return (
156+
extract_trip_request
157+
+ process_json_output
158+
+ switch.Switch(content_api.get_substream_name).case(
159+
# default substream name, no error.
160+
'',
161+
# For processors, the `parallel_concat` is a way to run them
162+
# concurrently while specify how their results should be merged, here
163+
# they should be concatenated.
164+
processor.parallel_concat([msg_to_user, generate_trip]),
165+
)
166+
# Any error substream name is handled by the default processor. Here we
167+
# return the input part unchanged.
168+
.default(processor.passthrough())
169+
)
170+
171+
172+
root_agent = adk.ProcessorAgent(
173+
create_trip_request_processor,
174+
name='trip_request_adk',
175+
)

genai_processors/core/adk.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# Copyright 2025 DeepMind Technologies Limited. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
# ==============================================================================
15+
"""ADK - GenAI Processors integration."""
16+
17+
from typing import AsyncGenerator, AsyncIterable, Callable, override
18+
19+
from genai_processors import content_api
20+
from genai_processors import processor
21+
from google.adk.agents import base_agent
22+
from google.adk.agents import invocation_context
23+
from google.adk.events import event as adk_event
24+
from google.adk.events import event_actions
25+
from google.genai import types as genai_types
26+
27+
28+
class ProcessorAgent(base_agent.BaseAgent):
29+
"""ADK Custom agent that delegates processing its input to a Processor.
30+
31+
Works both for turn-based and live modes. In case of turn-based mode the
32+
Processor will be called for each user turn with the whole conversation
33+
history up to that turn.
34+
35+
This agent does use ADK token streaming. "Token streaming" checkbox in the ADK
36+
UI must be enabled for the response to be rendered correctly. If used
37+
programmatically, consumer must either only use events marked as partial=True
38+
or turn_complete=True, but not both.
39+
"""
40+
41+
def __init__(
42+
self, processor_factory: Callable[[], processor.Processor], *, name: str
43+
):
44+
"""Initializes the ProcessorAgent.
45+
46+
Args:
47+
processor_factory: A function that returns a Processor to be applied to
48+
the incoming content. It will be called on each turn and each request.
49+
Unless the returned processor is stateless, it must return a new
50+
instance every time to avoid state sharing between users.
51+
name: The agent's name. It must be a valid Python identifier and unique
52+
within the agent tree. It can't be "user", since it's reserved for
53+
end-user's input.
54+
"""
55+
super().__init__(name=name)
56+
self._processor_factory = processor_factory
57+
58+
def _append_to_history(
59+
self,
60+
ctx: invocation_context.InvocationContext,
61+
content: genai_types.Content,
62+
) -> adk_event.Event:
63+
# Parsing conversation history from the Event log requires handling many
64+
# edge cases which ADK considers to be implementation details. Currently
65+
# only adk.LlmAgent is priviledged to do that. As a temporary workaround we
66+
# will accumulate the history in the state. Downside is that event log
67+
# (stored in memory) will grow as n^2.
68+
key = f'history_{self.name}'
69+
history = ctx.session.state.get(key, [])
70+
history.append(content)
71+
return adk_event.Event(
72+
actions=event_actions.EventActions(state_delta={key: history}),
73+
author=self.name,
74+
)
75+
76+
async def _stream_history(
77+
self, ctx: invocation_context.InvocationContext
78+
) -> AsyncIterable[content_api.ProcessorPart]:
79+
for content in ctx.session.state[f'history_{self.name}']:
80+
for part in content.parts:
81+
yield content_api.ProcessorPart(part, role=content.role)
82+
83+
@override
84+
async def _run_async_impl(
85+
self, ctx: invocation_context.InvocationContext
86+
) -> AsyncGenerator[adk_event.Event, None]:
87+
p = self._processor_factory()
88+
89+
yield self._append_to_history(ctx, ctx.user_content)
90+
response = genai_types.Content(parts=[], role='model')
91+
async for part in p(self._stream_history(ctx)):
92+
yield adk_event.Event(
93+
content=genai_types.Content(parts=[part.part], role='model'),
94+
author=self.name,
95+
partial=True,
96+
invocation_id=ctx.invocation_id,
97+
)
98+
response.parts.append(part.part)
99+
100+
final_event = self._append_to_history(ctx, response)
101+
final_event.content = response
102+
final_event.turn_complete = True
103+
yield final_event
104+
105+
@override
106+
async def _run_live_impl(
107+
self, ctx: invocation_context.InvocationContext
108+
) -> AsyncGenerator[adk_event.Event, None]:
109+
async def stream_content():
110+
while True:
111+
request = await ctx.live_request_queue.get()
112+
if request.blob:
113+
yield content_api.ProcessorPart(
114+
request.blob.data,
115+
mimetype=request.blob.mime_type,
116+
substream_name='realtime',
117+
role='user',
118+
)
119+
if request.close:
120+
# NOTE: Currently ADK Web doesn't close connection and this case is
121+
# unreachable. _run_live_impl handlers will leak. Fixing this is in
122+
# ADK backlog.
123+
break
124+
125+
p = self._processor_factory()
126+
async for part in p(stream_content()):
127+
if not part.role:
128+
part.role = 'model'
129+
yield adk_event.Event(
130+
content=genai_types.Content(parts=[part.part]), author=self.name
131+
)

0 commit comments

Comments
 (0)