diff --git a/app/agents/voice/automatic/__init__.py b/app/agents/voice/automatic/__init__.py index 5c31f9bc..9b69a1ce 100644 --- a/app/agents/voice/automatic/__init__.py +++ b/app/agents/voice/automatic/__init__.py @@ -21,6 +21,7 @@ OutputAudioRawFrame, TTSSpeakFrame, ) + from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -54,10 +55,13 @@ from .processors import LLMSpyProcessor from .processors.ptt_vad_filter import PTTVADFilter +from .processors.user_speaking_audio import UserSpeakingAudioProcessor from .prompts import get_system_prompt from .stt import get_stt_service from .tools import initialize_tools from .tts import get_tts_service +from .audio.audio_manager import initialize_audio_manager + from .types import ( Mode, TTSProvider, @@ -212,6 +216,9 @@ async def main(): enable_chart_text_filter=config.ENABLE_CHARTS, ) + # Initialize audio manager for looping waiting audio + audio_manager = initialize_audio_manager(tts) + llm = LLMServiceWrapper( AzureLLMService( api_key=config.AZURE_OPENAI_API_KEY, @@ -273,39 +280,24 @@ async def main(): rtvi = RTVIProcessor(config=RTVIConfig(config=[])) - # Simplified event handler for TTS feedback - @llm.event_handler("on_function_calls_started") - async def on_function_calls_started(service, function_calls): - # Only play the "checking" message if using Google TTS - if tts_provider == TTSProvider.GOOGLE: - for function_call in function_calls: - # Skip "checking" message for instant functions and chart tools - instant_functions = [ - "get_current_time", - "utility__getCurrentTime", # NeuroLink equivalent - "utility__generateTimestamp", # NeuroLink timestamp tool - "generate_bar_chart", - "generate_line_chart", - "generate_donut_chart", - "generate_single_stat_card", - ] - if function_call.function_name not in instant_functions: - # Play tool call sound if enabled, otherwise use phrases - if tool_call_sound: - await transport.send_audio(tool_call_sound) - else: - phrases = [ - "Let me check on that.", - "Give me a moment to do that.", - "I'll get right on that.", - "Working on that for you.", - "One moment — I'm on it", - "One second, boss.", - "On it, boss!", - "Just a second, captain.", - ] - await tts.queue_frame(TTSSpeakFrame(random.choice(phrases))) - break + # LLM response started handler - DO NOT start audio here + # Audio should only start when user stops speaking, not when LLM processes + @llm.event_handler("on_llm_response_started") + async def on_llm_response_started(service, function_calls): + logger.info(f"Function calls started event triggered with {len(function_calls)} calls") + + # Only stop audio when function calls complete if audio is actually playing + @llm.event_handler("on_function_calls_finished") + async def on_function_calls_finished(service): + # Stop audio when function calls finish but keep it enabled for potential resumption + if audio_manager.is_playing: + # Stop current audio but don't disable - allow resumption if no LLM text follows + audio_manager.is_playing = False + if audio_manager.loop_task and not audio_manager.loop_task.done(): + audio_manager.loop_task.cancel() + logger.info("Function calls finished - stopped audio but kept enabled for resumption") + else: + logger.debug("Function calls finished - audio not playing, no action needed") messages = [ {"role": "system", "content": system_prompt}, @@ -334,6 +326,10 @@ async def on_function_calls_started(service, function_calls): ptt_vad_filter = PTTVADFilter("PTTVADFilter") pipeline_components.append(ptt_vad_filter) # Filter VAD frames after STT + # Add user speaking audio processor - manages audio based on user speech + user_speaking_processor = UserSpeakingAudioProcessor("UserSpeakingAudioProcessor") + pipeline_components.append(user_speaking_processor) # Add after STT/PTT, before RTVI + pipeline_components.extend([rtvi, context_aggregator.user()]) if ( config.MEM0_ENABLED diff --git a/app/agents/voice/automatic/audio/audio_manager.py b/app/agents/voice/automatic/audio/audio_manager.py new file mode 100644 index 00000000..ed48704f --- /dev/null +++ b/app/agents/voice/automatic/audio/audio_manager.py @@ -0,0 +1,232 @@ +""" +Simplified Audio Manager - Minimal variables, seamless playback, immediate stop capability +""" + +import asyncio +from typing import Optional +from pydub import AudioSegment +from pipecat.frames.frames import OutputAudioRawFrame +from app.core.logger import logger + +# Configurable audio length constant +AUDIO_LENGTH_SECONDS = 6 # Default duration in seconds + + +class AudioManager: + """Simplified audio manager with minimal state and seamless chunked playback.""" + + def __init__(self, tts_service, transport=None): + self.tts_service = tts_service + self.transport = transport # Keep for compatibility + + # MINIMAL STATE - Only 4 variables needed + self.is_playing = False + self.loop_count = 0 # Track completed loops (max 3 = 18 seconds total) + self.audio_chunks = [] # 1-second audio chunks for seamless playback + self.user_has_input = False # Ensure audio only starts after user input + + # Internal task management + self.loop_task: Optional[asyncio.Task] = None + + self._load_waiting_audio() + + def _load_waiting_audio(self): + """Load waiting audio and split into 1-second chunks for seamless playback.""" + try: + wav_file_path = f"app/agents/voice/automatic/audio/waiting_{int(AUDIO_LENGTH_SECONDS)}sec.wav" + + audio = AudioSegment.from_wav(wav_file_path) + # Convert to pipeline format + audio = audio.set_frame_rate(16000).set_channels(1).set_sample_width(2) + + # Split into clean 1-second chunks + self.audio_chunks = [] + chunk_duration_ms = 1000 # 1 second in milliseconds + + for i in range(0, len(audio), chunk_duration_ms): + chunk = audio[i:i + chunk_duration_ms] + + # Ensure chunk is exactly 1 second (pad if necessary) + if len(chunk) < chunk_duration_ms: + silence_needed = chunk_duration_ms - len(chunk) + silence = AudioSegment.silent(duration=silence_needed, frame_rate=16000) + chunk = chunk + silence + + self.audio_chunks.append(chunk.raw_data) + + logger.info(f"Loaded {AUDIO_LENGTH_SECONDS}-second waiting audio: {len(self.audio_chunks)} chunks") + + except Exception as e: + logger.error(f"Failed to load waiting_{int(AUDIO_LENGTH_SECONDS)}sec.wav: {e}") + self.audio_chunks = [] + + def set_user_input(self): + """Mark that user has provided input - required for audio to start.""" + self.user_has_input = True + logger.info("User input detected - audio enabled") + + async def start_audio(self): + """Start seamless chunked audio playback (only if user has provided input).""" + + # Check prerequisites + if not self.user_has_input: + return + + if not self.audio_chunks: + return + + if self.is_playing: + return + + # Start audio + self.is_playing = True + self.loop_count = 0 + + # Cancel any existing task + if self.loop_task and not self.loop_task.done(): + self.loop_task.cancel() + + # Start seamless audio streaming + self.loop_task = asyncio.create_task(self._stream_seamless_audio()) + # logger.info("Started audio playback") + + async def stop_and_disable_audio(self): + """Stop audio immediately and disable until next user input.""" + # Set stop flags + self.is_playing = False + self.user_has_input = False # Require new user input for next audio + self.loop_count = 0 + + # Cancel audio task + if self.loop_task and not self.loop_task.done(): + self.loop_task.cancel() + try: + await self.loop_task + except asyncio.CancelledError: + pass + + # Clear audio queue aggressively + await self._clear_audio_queue() + + def reset(self): + """Reset for new conversation cycle.""" + self.is_playing = False + self.user_has_input = False + self.loop_count = 0 + + if self.loop_task and not self.loop_task.done(): + self.loop_task.cancel() + + def set_bot_speaking(self, speaking: bool): + """Track when bot starts/stops speaking to prevent audio during speech.""" + if speaking and self.is_playing: + logger.info("Bot started speaking - stopping audio") + asyncio.create_task(self.stop_and_disable_audio()) + + async def _stream_seamless_audio(self): + """Stream audio chunks seamlessly with minimal buffer for immediate stopping.""" + try: + if not self.audio_chunks: + return + + total_chunks = len(self.audio_chunks) # 6 chunks = 6 seconds + max_loops = 3 # Maximum 3 loops = 18 seconds total + chunk_index = 0 + + # Pre-queue 1 chunk for seamless start (minimal buffer) + if self.is_playing: + audio_frame = OutputAudioRawFrame( + audio=self.audio_chunks[chunk_index], + sample_rate=16000, + num_channels=1 + ) + await self.tts_service.queue_frame(audio_frame) + chunk_index = 1 + + # Continue streaming with optimal timing for seamless playback + while self.is_playing and self.loop_count < max_loops: + + # Wait 0.8 seconds before queuing next chunk (seamless timing) + await asyncio.sleep(0.8) + + # Check if we should stop + if not self.is_playing: + break + + # Queue next chunk + current_chunk = self.audio_chunks[chunk_index % total_chunks] + audio_frame = OutputAudioRawFrame( + audio=current_chunk, + sample_rate=16000, + num_channels=1 + ) + await self.tts_service.queue_frame(audio_frame) + + chunk_index += 1 + + # Update loop count when we complete a full cycle + if chunk_index % total_chunks == 0: + self.loop_count += 1 + + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error in audio streaming: {e}") + finally: + self.is_playing = False + + async def _clear_audio_queue(self): + """Clear audio queue to prevent delayed audio after stop.""" + try: + # Try to interrupt TTS service + if hasattr(self.tts_service, 'interrupt'): + if asyncio.iscoroutinefunction(self.tts_service.interrupt): + await self.tts_service.interrupt() + else: + self.tts_service.interrupt() + + # Send brief silence to flush pipeline + silence_duration = 0.01 # 10ms silence + sample_rate = 16000 + silence_samples = int(silence_duration * sample_rate) + silence_data = b'\x00' * (silence_samples * 2) # 16-bit silence + + silence_frame = OutputAudioRawFrame( + audio=silence_data, + sample_rate=16000, + num_channels=1 + ) + await self.tts_service.queue_frame(silence_frame) + + except Exception as e: + logger.debug(f"Audio queue clearing failed: {e}") + + +# Global audio manager instance +_audio_manager: Optional[AudioManager] = None + + +def get_audio_manager() -> Optional[AudioManager]: + """Get the global audio manager instance.""" + return _audio_manager + + +def set_audio_manager(audio_manager: AudioManager): + """Set the global audio manager instance.""" + global _audio_manager + _audio_manager = audio_manager + + +def initialize_audio_manager(tts_service, transport=None) -> AudioManager: + """Initialize and return the audio manager.""" + audio_manager = AudioManager(tts_service, transport) + set_audio_manager(audio_manager) + return audio_manager + +# Simple helper functions for external use + +def reset_for_new_input(): + """Legacy method - reset for new input.""" + audio_manager = get_audio_manager() + if audio_manager: + audio_manager.reset() diff --git a/app/agents/voice/automatic/audio/waiting_6sec.wav b/app/agents/voice/automatic/audio/waiting_6sec.wav new file mode 100644 index 00000000..b2422bf7 Binary files /dev/null and b/app/agents/voice/automatic/audio/waiting_6sec.wav differ diff --git a/app/agents/voice/automatic/processors/llm_spy.py b/app/agents/voice/automatic/processors/llm_spy.py index d10a5c84..f6564ddb 100644 --- a/app/agents/voice/automatic/processors/llm_spy.py +++ b/app/agents/voice/automatic/processors/llm_spy.py @@ -18,6 +18,7 @@ LLMTextFrame, TextFrame, UserStartedSpeakingFrame, + TTSSpeakFrame ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.frameworks.rtvi import RTVIProcessor, RTVIServerMessageFrame @@ -34,6 +35,18 @@ from app.core.logger import logger from ..features.text_sanitizer.tts_sanitizer import sanitize_markdown +from app.agents.voice.automatic.audio.audio_manager import get_audio_manager + +def _stop_audio_immediately(context: str = "unknown") -> bool: + """INSTANT audio stopping using simplified AudioManager API.""" + audio_manager = get_audio_manager() + if audio_manager and (audio_manager.user_has_input or audio_manager.is_playing): + # Use the simplified stop method + asyncio.create_task(audio_manager.stop_and_disable_audio()) + + logger.info(f"INSTANT AUDIO STOP: {context}") + return True + return False # Global RTVI processor reference for function confirmations _rtvi_processor = None @@ -161,7 +174,16 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames and delegate conversation logic to ConversationManager.""" await super().process_frame(frame, direction) + # TextFrame - INSTANT AUDIO INTERRUPTION (highest priority) if isinstance(frame, TextFrame): + # INSTANT STOP: Any text output means immediate audio interruption + if frame.text.strip(): # Only stop for non-empty text + if _stop_audio_immediately("TextFrame - Bot Speaking"): + audio_manager = get_audio_manager() + if audio_manager: + audio_manager.set_bot_speaking(True) + logger.info("INSTANT STOP: TextFrame detected - audio interrupted immediately") + if config.SANITIZE_TEXT_FOR_TTS: await self.push_frame( TextFrame(text=sanitize_markdown(frame.text)), direction @@ -173,43 +195,62 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): reset_chart_turn_count(self._session_id) await self.push_frame(frame, direction) - # LLM Response Start - begin collecting text and start conversation turn - elif isinstance(frame, LLMFullResponseStartFrame) and self._enable_charts: + # LLM Response Start - begin collecting text and start conversation turn and PREPARE FOR INSTANT STOP (but don't stop yet) + elif isinstance(frame, LLMFullResponseStartFrame): + # logger.debug(f"🤖 LLM processing started - audio continues, preparing for instant stop on text output") self._is_collecting_response = True self._accumulated_text = "" # Start conversation turn via ConversationManager event = await self._conversation_manager.start_turn_with_events( self._session_id - ) + ) if event: await emit_rtvi_event(self._rtvi, event, self._session_id) - await self.push_frame(frame, direction) - # LLM Output - accumulate streaming text - elif ( - isinstance(frame, LLMTextFrame) - and self._is_collecting_response - and self._enable_charts - ): - self._accumulated_text += frame.text + # LLM Output - accumulate streaming text & INSTANT AUDIO INTERRUPTION (zero-delay) + elif isinstance(frame, LLMTextFrame): + # INSTANT STOP: Any LLM text output triggers immediate audio stop + if frame.text.strip(): # Only stop for non-empty text + _stop_audio_immediately("LLMTextFrame - LLM Output") + + if self._is_collecting_response and self._enable_charts: + self._accumulated_text += frame.text + await self.push_frame(frame, direction) - # LLM Response Complete - send to ConversationManager + # LLM Response Complete - send to ConversationManager and FINAL AUDIO STOP CONFIRMATION elif isinstance(frame, LLMFullResponseEndFrame) and self._enable_charts: - if self._accumulated_text.strip(): + # Check if there was actual text output BEFORE clearing it + has_text_output = self._accumulated_text.strip() + + if has_text_output: + _stop_audio_immediately("Response Complete - Final Stop") + # logger.info("FINAL STOP: LLM response complete with text output - audio fully stopped") + + # Send the response to conversation manager event = await self._conversation_manager.add_llm_response_with_events( - self._session_id, self._accumulated_text.strip() + self._session_id, has_text_output ) if event: await emit_rtvi_event(self._rtvi, event, self._session_id) self._accumulated_text = "" self._is_collecting_response = False + + # Handle case where no text was produced (e.g., function calls only) + audio_manager = get_audio_manager() + if audio_manager: + audio_manager.set_bot_speaking(False) + # If no text output and user has input, resume audio + if not has_text_output and audio_manager.user_has_input: + await audio_manager.start_audio() + logger.info("LLM response ended with no text - resuming audio") + await self.push_frame(frame, direction) - # Function Call Start - emit RTVI event and track in conversation + # Function Call Start - emit RTVI event and track in conversation, NO ACTION (let audio continue) elif isinstance(frame, FunctionCallInProgressFrame): if self._tracer: span = self._tracer.start_span( @@ -254,7 +295,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if event: await emit_rtvi_event(self._rtvi, event, self._session_id) - # Function Call Result - emit RTVI event and track in conversation + # Function Call Result - emit RTVI event and track in conversation and NO ACTION (let audio continue) elif isinstance(frame, FunctionCallResultFrame): # Emit tool-call-result event if self._tracer and frame.tool_call_id in self._active_spans: diff --git a/app/agents/voice/automatic/processors/user_speaking_audio.py b/app/agents/voice/automatic/processors/user_speaking_audio.py new file mode 100644 index 00000000..c13021ee --- /dev/null +++ b/app/agents/voice/automatic/processors/user_speaking_audio.py @@ -0,0 +1,131 @@ +""" +User Speaking Audio Processor + +Handles audio management based on user speaking events: +- User starts speaking → Enable audio +- User stops speaking → Start playing audio +- Clean, simple flow tied to user speech patterns +""" + +import asyncio +import time +from typing import Optional +from pipecat.frames.frames import ( + Frame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, + VADUserStartedSpeakingFrame, + VADUserStoppedSpeakingFrame, + EmulateUserStartedSpeakingFrame, + EmulateUserStoppedSpeakingFrame, + TranscriptionFrame, + InterimTranscriptionFrame, +) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from app.core.logger import logger +from app.agents.voice.automatic.audio.audio_manager import get_audio_manager + + +class UserSpeakingAudioProcessor(FrameProcessor): + """ + Processor that manages audio based on user speaking events. + + Flow: + 1. User starts speaking → Enable audio + start speech session timer + 2. User stops speaking → Check speech duration and start audio if reasonable duration + 3. Transcription received → Immediately mark as valid speech + 4. Bot starts speaking → Audio stops (handled elsewhere) + """ + + def __init__(self, name: str = "UserSpeakingAudioProcessor"): + super().__init__(name=name) + self._user_currently_speaking = False + self._actual_speech_detected = False # Track if transcription was received + self._speech_start_time = None # Track when speech started + self._pending_audio_task = None # Task waiting for transcription + self._transcription_timeout = 3.0 # Wait up to 3 seconds for transcription after PTT release + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process frames and manage audio based on user speaking events.""" + await super().process_frame(frame, direction) + + + # Handle transcription frames - detect actual speech content + if isinstance(frame, (TranscriptionFrame, InterimTranscriptionFrame)): + # Accept transcription even after PTT release (delayed transcription) + if frame.text.strip(): + if not self._actual_speech_detected: + self._actual_speech_detected = True + + # If we have a pending audio task waiting for transcription, start audio now + if self._pending_audio_task and not self._pending_audio_task.done(): + # logger.info("Starting audio due to delayed transcription") + audio_manager = get_audio_manager() + if audio_manager: + await audio_manager.start_audio() + + # Handle user started speaking events + elif isinstance(frame, ( + UserStartedSpeakingFrame, + VADUserStartedSpeakingFrame, + EmulateUserStartedSpeakingFrame + )): + if not self._user_currently_speaking: + self._user_currently_speaking = True + self._actual_speech_detected = False # Reset speech detection for new session + self._speech_start_time = time.time() # Record when speech started + audio_manager = get_audio_manager() + if audio_manager: + # First stop any currently playing audio immediately + if audio_manager.is_playing: + await audio_manager.stop_and_disable_audio() + # logger.info("Stopped playing audio - user started speaking") + + # Then enable for new input + audio_manager.set_user_input() + # logger.info(f"Audio enabled - user started speaking ({type(frame).__name__})") + else: + logger.error("No audio manager found!") + + # Handle user stopped speaking events + elif isinstance(frame, ( + UserStoppedSpeakingFrame, + VADUserStoppedSpeakingFrame, + EmulateUserStoppedSpeakingFrame + )): + if self._user_currently_speaking: + self._user_currently_speaking = False + audio_manager = get_audio_manager() + if audio_manager: + + # Check if transcription was already detected during speaking + if self._actual_speech_detected: + await audio_manager.start_audio() + else: + # Wait for delayed transcription (common case) + self._pending_audio_task = asyncio.create_task(self._wait_for_transcription()) + else: + logger.error("No audio manager found!") + + # Pass frame through to next processor + await self.push_frame(frame, direction) + + async def _wait_for_transcription(self): + """Wait for delayed transcription after PTT release.""" + try: + # Wait for transcription timeout + await asyncio.sleep(self._transcription_timeout) + + # Check if transcription arrived during wait + if self._actual_speech_detected: + logger.info("Audio started after transcription timeout - speech was detected") + audio_manager = get_audio_manager() + if audio_manager: + await audio_manager.start_audio() + else: + logger.info("No audio started - transcription timeout reached without speech detection") + + except asyncio.CancelledError: + logger.debug("Transcription wait cancelled") + except Exception as e: + logger.error(f"Error in transcription wait: {e}") diff --git a/app/agents/voice/automatic/utils/conversation_manager.py b/app/agents/voice/automatic/utils/conversation_manager.py index 5eba5f9e..7497dd76 100644 --- a/app/agents/voice/automatic/utils/conversation_manager.py +++ b/app/agents/voice/automatic/utils/conversation_manager.py @@ -127,6 +127,19 @@ def add_user_message( self, session_id: str, content: str, message_id: Optional[str] = None ) -> ConversationMessage: """Add a user message and start a new turn""" + # MINIMAL QUEUE: Only reset audio for ACTUAL user input, not function calls + from app.agents.voice.automatic.audio.audio_manager import get_audio_manager + audio_manager = get_audio_manager() + if audio_manager: + # Check if this is a real user input or just function call processing + is_real_user_input = not content.startswith("[Inferred from voice]") and "function" not in content.lower() + + if is_real_user_input: + audio_manager.reset_for_new_input() + logger.debug(f"MINIMAL QUEUE: Reset audio for real user input: {content[:50]}...") + else: + logger.debug(f"MINIMAL QUEUE: Skipping reset for function/inferred content: {content[:50]}...") + user_message = ConversationMessage.create_user_message(content, message_id) turn = self.start_turn(session_id, user_message) return user_message