Skip to content

Commit 6071b34

Browse files
hangfeicopybara-github
authored andcommitted
feat: Implement Activity Start and End signals in LiveRequestQueue and BaseLLMConnection
This change adds activity start and end signals to the LiveRequestQueue, allowing clients to manually control the start and end of user input in streaming sessions when automatic voice activity detection is disabled. The LiveRequestQueue allows users to send messages to the model with the following semantics: - `content`: sends turn-by-turn content. - `blob`: sends a media blob for realtime streaming (e.g., audio). - `activity_start`: indicates the beginning of an activity. - `activity_end`: indicates the end of an activity. - `close`: closes the connection. GeminiLLMConnection has been updated to send the new activity signals to the backend. This change is a necessary to support clients (e.g. voice assistants) that do not want to use automatic voice activity detection. In this case, the client will be responsible to send the `activity_start` signal when the user starts talking, and `activity_end` when the user finishes talking. To test the change: run_config = RunConfig( realtime_input_config=types.RealtimeInputConfig( automatic_activity_detection=types.AutomaticActivityDetection( disabled=True, ), ) ) import threading # Add this import def thread_target(): # Define the async operations to run in the background. async def background_task(): live_request_queue.send_activity_start() # live_request_queue.send_content( # content=types.Content( # role='user', # parts=[types.Part.from_text(text="hi, what's the time?")], # ) # ) await asyncio.sleep(3) live_request_queue.send_activity_end() PiperOrigin-RevId: 783882447
1 parent b89aac9 commit 6071b34

File tree

4 files changed

+41
-7
lines changed

4 files changed

+41
-7
lines changed

src/google/adk/agents/live_request_queue.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from __future__ import annotations
16+
1517
import asyncio
1618
from typing import Optional
1719

1820
from google.genai import types
1921
from pydantic import BaseModel
2022
from pydantic import ConfigDict
23+
from pydantic import field_validator
2124

2225

2326
class LiveRequest(BaseModel):
@@ -30,6 +33,10 @@ class LiveRequest(BaseModel):
3033
"""If set, send the content to the model in turn-by-turn mode."""
3134
blob: Optional[types.Blob] = None
3235
"""If set, send the blob to the model in realtime mode."""
36+
activity_start: Optional[types.ActivityStart] = None
37+
"""If set, signal the start of user activity to the model."""
38+
activity_end: Optional[types.ActivityEnd] = None
39+
"""If set, signal the end of user activity to the model."""
3340
close: bool = False
3441
"""If set, close the queue. queue.shutdown() is only supported in Python 3.13+."""
3542

@@ -58,6 +65,14 @@ def send_content(self, content: types.Content):
5865
def send_realtime(self, blob: types.Blob):
5966
self._queue.put_nowait(LiveRequest(blob=blob))
6067

68+
def send_activity_start(self):
69+
"""Sends an activity start signal to mark the beginning of user input."""
70+
self._queue.put_nowait(LiveRequest(activity_start=types.ActivityStart()))
71+
72+
def send_activity_end(self):
73+
"""Sends an activity end signal to mark the end of user input."""
74+
self._queue.put_nowait(LiveRequest(activity_end=types.ActivityEnd()))
75+
6176
def send(self, req: LiveRequest):
6277
self._queue.put_nowait(req)
6378

src/google/adk/flows/llm_flows/base_llm_flow.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,12 @@ async def _send_to_model(
194194
if live_request.close:
195195
await llm_connection.close()
196196
return
197-
if live_request.blob:
197+
198+
if live_request.activity_start:
199+
await llm_connection.send_realtime(types.ActivityStart())
200+
elif live_request.activity_end:
201+
await llm_connection.send_realtime(types.ActivityEnd())
202+
elif live_request.blob:
198203
# Cache audio data here for transcription
199204
if not invocation_context.transcription_cache:
200205
invocation_context.transcription_cache = []
@@ -205,6 +210,7 @@ async def _send_to_model(
205210
TranscriptionEntry(role='user', data=live_request.blob)
206211
)
207212
await llm_connection.send_realtime(live_request.blob)
213+
208214
if live_request.content:
209215
await llm_connection.send_content(live_request.content)
210216

src/google/adk/models/base_llm_connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from __future__ import annotations
16+
1517
from abc import abstractmethod
1618
from typing import AsyncGenerator
1719

src/google/adk/models/gemini_llm_connection.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import logging
1818
from typing import AsyncGenerator
19+
from typing import Union
1920

2021
from google.genai import live
2122
from google.genai import types
@@ -25,6 +26,8 @@
2526

2627
logger = logging.getLogger('google_adk.' + __name__)
2728

29+
RealtimeInput = Union[types.Blob, types.ActivityStart, types.ActivityEnd]
30+
2831

2932
class GeminiLlmConnection(BaseLlmConnection):
3033
"""The Gemini model connection."""
@@ -93,16 +96,24 @@ async def send_content(self, content: types.Content):
9396
)
9497
)
9598

96-
async def send_realtime(self, blob: types.Blob):
99+
async def send_realtime(self, input: RealtimeInput):
97100
"""Sends a chunk of audio or a frame of video to the model in realtime.
98101
99102
Args:
100-
blob: The blob to send to the model.
103+
input: The input to send to the model.
101104
"""
102-
103-
input_blob = blob.model_dump()
104-
logger.debug('Sending LLM Blob: %s', input_blob)
105-
await self._gemini_session.send(input=input_blob)
105+
if isinstance(input, types.Blob):
106+
input_blob = input.model_dump()
107+
logger.debug('Sending LLM Blob: %s', input_blob)
108+
await self._gemini_session.send(input=input_blob)
109+
elif isinstance(input, types.ActivityStart):
110+
logger.debug('Sending LLM activity start signal')
111+
await self._gemini_session.send_realtime_input(activity_start=input)
112+
elif isinstance(input, types.ActivityEnd):
113+
logger.debug('Sending LLM activity end signal')
114+
await self._gemini_session.send_realtime_input(activity_end=input)
115+
else:
116+
raise ValueError('Unsupported input type: %s' % type(input))
106117

107118
def __build_full_text_response(self, text: str):
108119
"""Builds a full text response.

0 commit comments

Comments
 (0)