Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 29 additions & 33 deletions app/agents/voice/automatic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
OutputAudioRawFrame,
TTSSpeakFrame,
)

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -54,10 +55,13 @@

from .processors import LLMSpyProcessor
from .processors.ptt_vad_filter import PTTVADFilter
from .processors.user_speaking_audio import UserSpeakingAudioProcessor
from .prompts import get_system_prompt
from .stt import get_stt_service
from .tools import initialize_tools
from .tts import get_tts_service
from .audio.audio_manager import initialize_audio_manager

from .types import (
Mode,
TTSProvider,
Expand Down Expand Up @@ -212,6 +216,9 @@ async def main():
enable_chart_text_filter=config.ENABLE_CHARTS,
)

# Initialize audio manager for looping waiting audio
audio_manager = initialize_audio_manager(tts)

llm = LLMServiceWrapper(
AzureLLMService(
api_key=config.AZURE_OPENAI_API_KEY,
Expand Down Expand Up @@ -273,39 +280,24 @@ async def main():

rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

# Simplified event handler for TTS feedback
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
# Only play the "checking" message if using Google TTS
if tts_provider == TTSProvider.GOOGLE:
for function_call in function_calls:
# Skip "checking" message for instant functions and chart tools
instant_functions = [
"get_current_time",
"utility__getCurrentTime", # NeuroLink equivalent
"utility__generateTimestamp", # NeuroLink timestamp tool
"generate_bar_chart",
"generate_line_chart",
"generate_donut_chart",
"generate_single_stat_card",
]
if function_call.function_name not in instant_functions:
# Play tool call sound if enabled, otherwise use phrases
if tool_call_sound:
await transport.send_audio(tool_call_sound)
else:
phrases = [
"Let me check on that.",
"Give me a moment to do that.",
"I'll get right on that.",
"Working on that for you.",
"One moment — I'm on it",
"One second, boss.",
"On it, boss!",
"Just a second, captain.",
]
await tts.queue_frame(TTSSpeakFrame(random.choice(phrases)))
break
# LLM response started handler - DO NOT start audio here
# Audio should only start when user stops speaking, not when LLM processes
@llm.event_handler("on_llm_response_started")
async def on_llm_response_started(service, function_calls):
logger.info(f"Function calls started event triggered with {len(function_calls)} calls")

# Only stop audio when function calls complete if audio is actually playing
@llm.event_handler("on_function_calls_finished")
async def on_function_calls_finished(service):
# Stop audio when function calls finish but keep it enabled for potential resumption
if audio_manager.is_playing:
# Stop current audio but don't disable - allow resumption if no LLM text follows
audio_manager.is_playing = False
if audio_manager.loop_task and not audio_manager.loop_task.done():
audio_manager.loop_task.cancel()
logger.info("Function calls finished - stopped audio but kept enabled for resumption")
else:
logger.debug("Function calls finished - audio not playing, no action needed")

messages = [
{"role": "system", "content": system_prompt},
Expand Down Expand Up @@ -334,6 +326,10 @@ async def on_function_calls_started(service, function_calls):
ptt_vad_filter = PTTVADFilter("PTTVADFilter")
pipeline_components.append(ptt_vad_filter) # Filter VAD frames after STT

# Add user speaking audio processor - manages audio based on user speech
user_speaking_processor = UserSpeakingAudioProcessor("UserSpeakingAudioProcessor")
pipeline_components.append(user_speaking_processor) # Add after STT/PTT, before RTVI

pipeline_components.extend([rtvi, context_aggregator.user()])
if (
config.MEM0_ENABLED
Expand Down
232 changes: 232 additions & 0 deletions app/agents/voice/automatic/audio/audio_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
"""
Simplified Audio Manager - Minimal variables, seamless playback, immediate stop capability
"""

import asyncio
from typing import Optional
from pydub import AudioSegment
from pipecat.frames.frames import OutputAudioRawFrame
from app.core.logger import logger

# Configurable audio length constant
AUDIO_LENGTH_SECONDS = 6 # Default duration in seconds
Copy link

Copilot AI Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This global constant should be configurable through the configuration system rather than hardcoded. Consider moving it to the config module or making it a parameter.

Copilot uses AI. Check for mistakes.


class AudioManager:
"""Simplified audio manager with minimal state and seamless chunked playback."""

def __init__(self, tts_service, transport=None):
self.tts_service = tts_service
self.transport = transport # Keep for compatibility

# MINIMAL STATE - Only 4 variables needed
self.is_playing = False
self.loop_count = 0 # Track completed loops (max 3 = 18 seconds total)
self.audio_chunks = [] # 1-second audio chunks for seamless playback
self.user_has_input = False # Ensure audio only starts after user input

# Internal task management
self.loop_task: Optional[asyncio.Task] = None

self._load_waiting_audio()

def _load_waiting_audio(self):
"""Load waiting audio and split into 1-second chunks for seamless playback."""
try:
wav_file_path = f"app/agents/voice/automatic/audio/waiting_{int(AUDIO_LENGTH_SECONDS)}sec.wav"

audio = AudioSegment.from_wav(wav_file_path)
# Convert to pipeline format
audio = audio.set_frame_rate(16000).set_channels(1).set_sample_width(2)

# Split into clean 1-second chunks
self.audio_chunks = []
chunk_duration_ms = 1000 # 1 second in milliseconds

for i in range(0, len(audio), chunk_duration_ms):
chunk = audio[i:i + chunk_duration_ms]

# Ensure chunk is exactly 1 second (pad if necessary)
if len(chunk) < chunk_duration_ms:
silence_needed = chunk_duration_ms - len(chunk)
silence = AudioSegment.silent(duration=silence_needed, frame_rate=16000)
chunk = chunk + silence

self.audio_chunks.append(chunk.raw_data)

logger.info(f"Loaded {AUDIO_LENGTH_SECONDS}-second waiting audio: {len(self.audio_chunks)} chunks")

except Exception as e:
logger.error(f"Failed to load waiting_{int(AUDIO_LENGTH_SECONDS)}sec.wav: {e}")
self.audio_chunks = []

def set_user_input(self):
"""Mark that user has provided input - required for audio to start."""
self.user_has_input = True
logger.info("User input detected - audio enabled")

async def start_audio(self):
"""Start seamless chunked audio playback (only if user has provided input)."""

# Check prerequisites
if not self.user_has_input:
return

if not self.audio_chunks:
return

if self.is_playing:
return

# Start audio
self.is_playing = True
self.loop_count = 0

# Cancel any existing task
if self.loop_task and not self.loop_task.done():
self.loop_task.cancel()

# Start seamless audio streaming
self.loop_task = asyncio.create_task(self._stream_seamless_audio())
# logger.info("Started audio playback")

async def stop_and_disable_audio(self):
"""Stop audio immediately and disable until next user input."""
# Set stop flags
self.is_playing = False
self.user_has_input = False # Require new user input for next audio
self.loop_count = 0

# Cancel audio task
if self.loop_task and not self.loop_task.done():
self.loop_task.cancel()
try:
await self.loop_task
except asyncio.CancelledError:
pass

# Clear audio queue aggressively
await self._clear_audio_queue()

def reset(self):
"""Reset for new conversation cycle."""
self.is_playing = False
self.user_has_input = False
self.loop_count = 0

if self.loop_task and not self.loop_task.done():
self.loop_task.cancel()

def set_bot_speaking(self, speaking: bool):
"""Track when bot starts/stops speaking to prevent audio during speech."""
if speaking and self.is_playing:
logger.info("Bot started speaking - stopping audio")
asyncio.create_task(self.stop_and_disable_audio())

async def _stream_seamless_audio(self):
"""Stream audio chunks seamlessly with minimal buffer for immediate stopping."""
try:
if not self.audio_chunks:
return

total_chunks = len(self.audio_chunks) # 6 chunks = 6 seconds
max_loops = 3 # Maximum 3 loops = 18 seconds total
chunk_index = 0

# Pre-queue 1 chunk for seamless start (minimal buffer)
if self.is_playing:
audio_frame = OutputAudioRawFrame(
audio=self.audio_chunks[chunk_index],
sample_rate=16000,
num_channels=1
)
await self.tts_service.queue_frame(audio_frame)
chunk_index = 1

# Continue streaming with optimal timing for seamless playback
while self.is_playing and self.loop_count < max_loops:

# Wait 0.8 seconds before queuing next chunk (seamless timing)
await asyncio.sleep(0.8)

# Check if we should stop
if not self.is_playing:
break

# Queue next chunk
current_chunk = self.audio_chunks[chunk_index % total_chunks]
audio_frame = OutputAudioRawFrame(
audio=current_chunk,
sample_rate=16000,
num_channels=1
)
await self.tts_service.queue_frame(audio_frame)

chunk_index += 1

# Update loop count when we complete a full cycle
if chunk_index % total_chunks == 0:
self.loop_count += 1

except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error in audio streaming: {e}")
finally:
self.is_playing = False

async def _clear_audio_queue(self):
"""Clear audio queue to prevent delayed audio after stop."""
try:
# Try to interrupt TTS service
if hasattr(self.tts_service, 'interrupt'):
if asyncio.iscoroutinefunction(self.tts_service.interrupt):
await self.tts_service.interrupt()
else:
self.tts_service.interrupt()

# Send brief silence to flush pipeline
silence_duration = 0.01 # 10ms silence
sample_rate = 16000
silence_samples = int(silence_duration * sample_rate)
silence_data = b'\x00' * (silence_samples * 2) # 16-bit silence

silence_frame = OutputAudioRawFrame(
audio=silence_data,
sample_rate=16000,
num_channels=1
)
await self.tts_service.queue_frame(silence_frame)

except Exception as e:
logger.debug(f"Audio queue clearing failed: {e}")


# Global audio manager instance
_audio_manager: Optional[AudioManager] = None


def get_audio_manager() -> Optional[AudioManager]:
"""Get the global audio manager instance."""
return _audio_manager


def set_audio_manager(audio_manager: AudioManager):
"""Set the global audio manager instance."""
global _audio_manager
_audio_manager = audio_manager


def initialize_audio_manager(tts_service, transport=None) -> AudioManager:
"""Initialize and return the audio manager."""
audio_manager = AudioManager(tts_service, transport)
set_audio_manager(audio_manager)
return audio_manager

# Simple helper functions for external use

def reset_for_new_input():
"""Legacy method - reset for new input."""
audio_manager = get_audio_manager()
if audio_manager:
audio_manager.reset()
Binary file not shown.
Loading