-
Notifications
You must be signed in to change notification settings - Fork 46
Origin/audio for waiting #256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release
Are you sure you want to change the base?
Changes from 5 commits
cf9afad
702ece2
bcc5a10
3a1de70
2c7ed25
e55280a
bd96a7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -2,9 +2,13 @@ | |||||
| import asyncio | ||||||
| import os | ||||||
| import random | ||||||
| import argparse | ||||||
| from dotenv import load_dotenv | ||||||
| import wave | ||||||
| from datetime import datetime | ||||||
| from zoneinfo import ZoneInfo | ||||||
| from pydub import AudioSegment | ||||||
| import audioop | ||||||
|
|
||||||
| from dotenv import load_dotenv | ||||||
| from langfuse import get_client | ||||||
|
|
@@ -20,7 +24,12 @@ | |||||
| LLMFullResponseEndFrame, | ||||||
| OutputAudioRawFrame, | ||||||
| TTSSpeakFrame, | ||||||
| OutputAudioRawFrame, | ||||||
| LLMFullResponseStartFrame, | ||||||
| FunctionCallInProgressFrame, | ||||||
| FunctionCallResultFrame | ||||||
| ) | ||||||
|
|
||||||
| from pipecat.pipeline.pipeline import Pipeline | ||||||
| from pipecat.pipeline.runner import PipelineRunner | ||||||
| from pipecat.pipeline.task import PipelineParams, PipelineTask | ||||||
|
|
@@ -54,10 +63,23 @@ | |||||
|
|
||||||
| from .processors import LLMSpyProcessor | ||||||
| from .processors.ptt_vad_filter import PTTVADFilter | ||||||
| from .processors.user_speaking_audio import UserSpeakingAudioProcessor | ||||||
| from .prompts import get_system_prompt | ||||||
| from .stt import get_stt_service | ||||||
| from .tools import initialize_tools | ||||||
| from .tts import get_tts_service | ||||||
| from .stt import get_stt_service | ||||||
| from .audio.audio_manager import initialize_audio_manager, get_audio_manager | ||||||
| from app.agents.voice.automatic.processors.llm_spy import handle_confirmation_response | ||||||
| from app.agents.voice.automatic.types import ( | ||||||
| TTSProvider, | ||||||
| Mode, | ||||||
| decode_tts_provider, | ||||||
| decode_voice_name, | ||||||
| decode_mode, | ||||||
| ) | ||||||
| from opentelemetry import trace | ||||||
| from langfuse import get_client | ||||||
| from .types import ( | ||||||
| Mode, | ||||||
| TTSProvider, | ||||||
|
|
@@ -212,6 +234,9 @@ async def main(): | |||||
| enable_chart_text_filter=config.ENABLE_CHARTS, | ||||||
| ) | ||||||
|
|
||||||
| # Initialize audio manager for looping waiting audio | ||||||
| audio_manager = initialize_audio_manager(tts) | ||||||
|
|
||||||
| llm = LLMServiceWrapper( | ||||||
| AzureLLMService( | ||||||
| api_key=config.AZURE_OPENAI_API_KEY, | ||||||
|
|
@@ -273,39 +298,23 @@ async def main(): | |||||
|
|
||||||
| rtvi = RTVIProcessor(config=RTVIConfig(config=[])) | ||||||
|
|
||||||
| # Simplified event handler for TTS feedback | ||||||
| @llm.event_handler("on_function_calls_started") | ||||||
| async def on_function_calls_started(service, function_calls): | ||||||
| # Only play the "checking" message if using Google TTS | ||||||
| if tts_provider == TTSProvider.GOOGLE: | ||||||
| for function_call in function_calls: | ||||||
| # Skip "checking" message for instant functions and chart tools | ||||||
| instant_functions = [ | ||||||
| "get_current_time", | ||||||
| "utility__getCurrentTime", # NeuroLink equivalent | ||||||
| "utility__generateTimestamp", # NeuroLink timestamp tool | ||||||
| "generate_bar_chart", | ||||||
| "generate_line_chart", | ||||||
| "generate_donut_chart", | ||||||
| "generate_single_stat_card", | ||||||
| ] | ||||||
| if function_call.function_name not in instant_functions: | ||||||
| # Play tool call sound if enabled, otherwise use phrases | ||||||
| if tool_call_sound: | ||||||
| await transport.send_audio(tool_call_sound) | ||||||
| else: | ||||||
| phrases = [ | ||||||
| "Let me check on that.", | ||||||
| "Give me a moment to do that.", | ||||||
| "I'll get right on that.", | ||||||
| "Working on that for you.", | ||||||
| "One moment — I'm on it", | ||||||
| "One second, boss.", | ||||||
| "On it, boss!", | ||||||
| "Just a second, captain.", | ||||||
| ] | ||||||
| await tts.queue_frame(TTSSpeakFrame(random.choice(phrases))) | ||||||
| break | ||||||
| # LLM response started handler - DO NOT start audio here | ||||||
| # Audio should only start when user stops speaking, not when LLM processes | ||||||
| @llm.event_handler("on_llm_response_started") | ||||||
| async def on_llm_response_started(service,function_calls): | ||||||
|
||||||
| async def on_llm_response_started(service,function_calls): | |
| async def on_llm_response_started(service, function_calls): |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,234 @@ | ||
| """ | ||
| Simplified Audio Manager - Minimal variables, seamless playback, immediate stop capability | ||
| """ | ||
|
|
||
| import asyncio | ||
| from typing import Optional | ||
| from pydub import AudioSegment | ||
| from pipecat.frames.frames import OutputAudioRawFrame | ||
| from app.core.logger import logger | ||
|
|
||
| # Configurable audio length constant | ||
| AUDIO_LENGTH_SECONDS = 6 # Default duration in seconds | ||
|
||
|
|
||
|
|
||
| class AudioManager: | ||
| """Simplified audio manager with minimal state and seamless chunked playback.""" | ||
|
|
||
| def __init__(self, tts_service, transport=None): | ||
| self.tts_service = tts_service | ||
| self.transport = transport # Keep for compatibility | ||
|
|
||
| # MINIMAL STATE - Only 4 variables needed | ||
| self.is_playing = False | ||
| self.loop_count = 0 # Track completed loops (max 3 = 18 seconds total) | ||
| self.audio_chunks = [] # 1-second audio chunks for seamless playback | ||
| self.user_has_input = False # Ensure audio only starts after user input | ||
|
|
||
| # Internal task management | ||
| self.loop_task: Optional[asyncio.Task] = None | ||
|
|
||
| self._load_waiting_audio() | ||
|
|
||
| def _load_waiting_audio(self): | ||
| """Load waiting audio and split into 1-second chunks for seamless playback.""" | ||
| try: | ||
| wav_file_path = f"app/agents/voice/automatic/audio/waiting_{int(AUDIO_LENGTH_SECONDS)}sec.wav" | ||
|
|
||
| audio = AudioSegment.from_wav(wav_file_path) | ||
| # Convert to pipeline format | ||
| audio = audio.set_frame_rate(16000).set_channels(1).set_sample_width(2) | ||
|
|
||
| # Split into clean 1-second chunks | ||
| self.audio_chunks = [] | ||
| chunk_duration_ms = 1000 # 1 second in milliseconds | ||
|
|
||
| for i in range(0, len(audio), chunk_duration_ms): | ||
| chunk = audio[i:i + chunk_duration_ms] | ||
|
|
||
| # Ensure chunk is exactly 1 second (pad if necessary) | ||
| if len(chunk) < chunk_duration_ms: | ||
| silence_needed = chunk_duration_ms - len(chunk) | ||
| silence = AudioSegment.silent(duration=silence_needed, frame_rate=16000) | ||
| chunk = chunk + silence | ||
|
|
||
| self.audio_chunks.append(chunk.raw_data) | ||
|
|
||
| logger.info(f"Loaded {AUDIO_LENGTH_SECONDS}-second waiting audio: {len(self.audio_chunks)} chunks") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to load waiting_{int(AUDIO_LENGTH_SECONDS)}sec.wav: {e}") | ||
| self.audio_chunks = [] | ||
|
|
||
| def set_user_input(self): | ||
| """Mark that user has provided input - required for audio to start.""" | ||
| self.user_has_input = True | ||
| logger.info("User input detected - audio enabled") | ||
|
|
||
| async def start_audio(self): | ||
| """Start seamless chunked audio playback (only if user has provided input).""" | ||
|
|
||
| # Check prerequisites | ||
| if not self.user_has_input: | ||
| return | ||
|
|
||
| if not self.audio_chunks: | ||
| return | ||
|
|
||
| if self.is_playing: | ||
| return | ||
|
|
||
| # Start audio | ||
| self.is_playing = True | ||
| self.loop_count = 0 | ||
|
|
||
| # Cancel any existing task | ||
| if self.loop_task and not self.loop_task.done(): | ||
| self.loop_task.cancel() | ||
|
|
||
| # Start seamless audio streaming | ||
| self.loop_task = asyncio.create_task(self._stream_seamless_audio()) | ||
| # logger.info("Started audio playback") | ||
|
|
||
| async def stop_and_disable_audio(self): | ||
| """Stop audio immediately and disable until next user input.""" | ||
| # Set stop flags | ||
| self.is_playing = False | ||
| self.user_has_input = False # Require new user input for next audio | ||
| self.loop_count = 0 | ||
|
|
||
| # Cancel audio task | ||
| if self.loop_task and not self.loop_task.done(): | ||
| self.loop_task.cancel() | ||
| try: | ||
| await self.loop_task | ||
| except asyncio.CancelledError: | ||
| pass | ||
|
|
||
| # Clear audio queue aggressively | ||
| await self._clear_audio_queue() | ||
|
|
||
| def reset(self): | ||
| """Reset for new conversation cycle.""" | ||
| self.is_playing = False | ||
| self.user_has_input = False | ||
| self.loop_count = 0 | ||
|
|
||
| if self.loop_task and not self.loop_task.done(): | ||
| self.loop_task.cancel() | ||
|
|
||
| def set_bot_speaking(self, speaking: bool): | ||
| """Track when bot starts/stops speaking to prevent audio during speech.""" | ||
| if speaking and self.is_playing: | ||
| logger.info("Bot started speaking - stopping audio") | ||
| asyncio.create_task(self.stop_and_disable_audio()) | ||
|
|
||
| async def _stream_seamless_audio(self): | ||
| """Stream audio chunks seamlessly with minimal buffer for immediate stopping.""" | ||
| try: | ||
| if not self.audio_chunks: | ||
| return | ||
|
|
||
| total_chunks = len(self.audio_chunks) # 6 chunks = 6 seconds | ||
| max_loops = 3 # Maximum 3 loops = 18 seconds total | ||
| chunk_index = 0 | ||
|
|
||
| # Pre-queue 1 chunk for seamless start (minimal buffer) | ||
| if self.is_playing: | ||
| audio_frame = OutputAudioRawFrame( | ||
| audio=self.audio_chunks[chunk_index], | ||
| sample_rate=16000, | ||
| num_channels=1 | ||
| ) | ||
| await self.tts_service.queue_frame(audio_frame) | ||
| chunk_index = 1 | ||
|
|
||
| # Continue streaming with optimal timing for seamless playback | ||
| while self.is_playing and self.loop_count < max_loops: | ||
|
|
||
| # Wait 0.8 seconds before queuing next chunk (seamless timing) | ||
| await asyncio.sleep(0.8) | ||
|
|
||
| # Check if we should stop | ||
| if not self.is_playing: | ||
| break | ||
|
|
||
| # Queue next chunk | ||
| current_chunk = self.audio_chunks[chunk_index % total_chunks] | ||
| audio_frame = OutputAudioRawFrame( | ||
| audio=current_chunk, | ||
| sample_rate=16000, | ||
| num_channels=1 | ||
| ) | ||
| await self.tts_service.queue_frame(audio_frame) | ||
|
|
||
| chunk_index += 1 | ||
|
|
||
| # Update loop count when we complete a full cycle | ||
| if chunk_index % total_chunks == 0: | ||
| self.loop_count += 1 | ||
|
|
||
| except asyncio.CancelledError: | ||
| pass | ||
| except Exception as e: | ||
| logger.error(f"Error in audio streaming: {e}") | ||
| finally: | ||
| self.is_playing = False | ||
|
|
||
| async def _clear_audio_queue(self): | ||
| """Clear audio queue to prevent delayed audio after stop.""" | ||
| try: | ||
| # Try to interrupt TTS service | ||
| if hasattr(self.tts_service, 'interrupt'): | ||
| if asyncio.iscoroutinefunction(self.tts_service.interrupt): | ||
| await self.tts_service.interrupt() | ||
| else: | ||
| self.tts_service.interrupt() | ||
|
|
||
| # Send brief silence to flush pipeline | ||
| silence_duration = 0.01 # 10ms silence | ||
| sample_rate = 16000 | ||
| silence_samples = int(silence_duration * sample_rate) | ||
| silence_data = b'\x00' * (silence_samples * 2) # 16-bit silence | ||
|
|
||
| silence_frame = OutputAudioRawFrame( | ||
| audio=silence_data, | ||
| sample_rate=16000, | ||
| num_channels=1 | ||
| ) | ||
| await self.tts_service.queue_frame(silence_frame) | ||
|
|
||
| except Exception as e: | ||
| logger.debug(f"Audio queue clearing failed: {e}") | ||
|
|
||
|
|
||
| # Global audio manager instance | ||
| _audio_manager: Optional[AudioManager] = None | ||
|
|
||
|
|
||
| def get_audio_manager() -> Optional[AudioManager]: | ||
| """Get the global audio manager instance.""" | ||
| return _audio_manager | ||
|
|
||
|
|
||
| def set_audio_manager(audio_manager: AudioManager): | ||
| """Set the global audio manager instance.""" | ||
| global _audio_manager | ||
| _audio_manager = audio_manager | ||
|
|
||
|
|
||
| def initialize_audio_manager(tts_service, transport=None) -> AudioManager: | ||
| """Initialize and return the audio manager.""" | ||
| audio_manager = AudioManager(tts_service, transport) | ||
| set_audio_manager(audio_manager) | ||
| return audio_manager | ||
|
|
||
|
|
||
| # Simple helper functions for external use | ||
|
|
||
|
|
||
| def reset_for_new_input(): | ||
| """Legacy method - reset for new input.""" | ||
| audio_manager = get_audio_manager() | ||
| if audio_manager: | ||
| audio_manager.reset() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate import of
load_dotenv- it's imported again on line 13 and was already imported earlier. Remove the duplicate import.