Skip to content
76 changes: 42 additions & 34 deletions app/agents/voice/automatic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
OutputAudioRawFrame,
TTSSpeakFrame,
)

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -56,10 +57,12 @@

from .processors import LLMSpyProcessor
from .processors.ptt_vad_filter import PTTVADFilter
from .processors.user_speaking_audio import UserSpeakingAudioProcessor
from .prompts import get_system_prompt
from .stt import get_stt_service
from .tools import initialize_tools
from .tts import get_tts_service
from .audio.audio_manager import initialize_audio_manager

# Load tool call sound
tool_call_sound = None
Expand Down Expand Up @@ -361,6 +364,9 @@ async def run_normal_mode(args):
enable_chart_text_filter=config.ENABLE_CHARTS,
)

# Initialize audio manager for looping waiting audio
audio_manager = initialize_audio_manager(tts)

llm = LLMServiceWrapper(
AzureLLMService(
api_key=config.AZURE_OPENAI_API_KEY,
Expand Down Expand Up @@ -423,39 +429,28 @@ async def run_normal_mode(args):

rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

# Simplified event handler for TTS feedback
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
# Only play the "checking" message if using Google TTS
if tts_provider == TTSProvider.GOOGLE:
for function_call in function_calls:
# Skip "checking" message for instant functions and chart tools
instant_functions = [
"get_current_time",
"utility__getCurrentTime", # NeuroLink equivalent
"utility__generateTimestamp", # NeuroLink timestamp tool
"generate_bar_chart",
"generate_line_chart",
"generate_donut_chart",
"generate_single_stat_card",
]
if function_call.function_name not in instant_functions:
# Play tool call sound if enabled, otherwise use phrases
if tool_call_sound:
await transport.send_audio(tool_call_sound)
else:
phrases = [
"Let me check on that.",
"Give me a moment to do that.",
"I'll get right on that.",
"Working on that for you.",
"One moment — I'm on it",
"One second, boss.",
"On it, boss!",
"Just a second, captain.",
]
await tts.queue_frame(TTSSpeakFrame(random.choice(phrases)))
break
# LLM response started handler - DO NOT start audio here
# Audio should only start when user stops speaking, not when LLM processes
@llm.event_handler("on_llm_response_started")
async def on_llm_response_started(service, function_calls):
logger.info(f"Function calls started event triggered with {len(function_calls)} calls")

# Only stop audio when function calls complete if audio is actually playing
@llm.event_handler("on_function_calls_finished")
async def on_function_calls_finished(_service):
# Stop audio when function calls finish but keep it enabled for potential resumption
if not audio_manager:
logger.warning("Audio manager not available, skipping audio control")
return

if audio_manager.is_playing:
# Stop current audio but don't disable - allow resumption if no LLM text follows
audio_manager.is_playing = False
if audio_manager.loop_task and not audio_manager.loop_task.done():
audio_manager.loop_task.cancel()
logger.info("Function calls finished - stopped audio but kept enabled for resumption")
else:
logger.debug("Function calls finished - audio not playing, no action needed")

messages = [
{"role": "system", "content": system_prompt},
Expand Down Expand Up @@ -484,6 +479,10 @@ async def on_function_calls_started(service, function_calls):
ptt_vad_filter = PTTVADFilter("PTTVADFilter")
pipeline_components.append(ptt_vad_filter) # Filter VAD frames after STT

# Add user speaking audio processor - manages audio based on user speech
user_speaking_processor = UserSpeakingAudioProcessor("UserSpeakingAudioProcessor")
pipeline_components.append(user_speaking_processor) # Add after STT/PTT, before RTVI

pipeline_components.extend([rtvi, context_aggregator.user()])
if (
config.MEM0_ENABLED
Expand Down Expand Up @@ -657,10 +656,19 @@ async def on_app_message(transport, message, sender):

@task.event_handler("on_pipeline_cancelled")
async def on_pipeline_cancelled(task, frame):
logger.info("Pipeline task cancelled. Cancelling main task.")
logger.info("Pipeline task cancelled. Cleaning up processors.")

# Clean up user speaking processor
try:
await user_speaking_processor.cleanup()
logger.info("User speaking processor cleaned up successfully")
except Exception as e:
logger.error(f"Error cleaning up user speaking processor: {e}")

# Clean up Fal.ai Smart Turn session
if fal_smart_turn_service:
await fal_smart_turn_service.cleanup(fal_session)

main_task = asyncio.current_task()
main_task.cancel()

Expand Down
Loading