Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 46 additions & 33 deletions app/agents/voice/automatic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
import asyncio
import os
import random
import argparse
from dotenv import load_dotenv
import wave
from datetime import datetime
from zoneinfo import ZoneInfo
from pydub import AudioSegment
import audioop

from dotenv import load_dotenv
Copy link

Copilot AI Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate import of load_dotenv - it's imported again on line 13 and was already imported earlier. Remove the duplicate import.

Suggested change
from dotenv import load_dotenv

Copilot uses AI. Check for mistakes.
from langfuse import get_client
Expand All @@ -20,7 +24,12 @@
LLMFullResponseEndFrame,
OutputAudioRawFrame,
TTSSpeakFrame,
OutputAudioRawFrame,
LLMFullResponseStartFrame,
FunctionCallInProgressFrame,
FunctionCallResultFrame
)

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -54,10 +63,23 @@

from .processors import LLMSpyProcessor
from .processors.ptt_vad_filter import PTTVADFilter
from .processors.user_speaking_audio import UserSpeakingAudioProcessor
from .prompts import get_system_prompt
from .stt import get_stt_service
from .tools import initialize_tools
from .tts import get_tts_service
from .stt import get_stt_service
from .audio.audio_manager import initialize_audio_manager, get_audio_manager
from app.agents.voice.automatic.processors.llm_spy import handle_confirmation_response
from app.agents.voice.automatic.types import (
TTSProvider,
Mode,
decode_tts_provider,
decode_voice_name,
decode_mode,
)
from opentelemetry import trace
from langfuse import get_client
from .types import (
Mode,
TTSProvider,
Expand Down Expand Up @@ -212,6 +234,9 @@ async def main():
enable_chart_text_filter=config.ENABLE_CHARTS,
)

# Initialize audio manager for looping waiting audio
audio_manager = initialize_audio_manager(tts)

llm = LLMServiceWrapper(
AzureLLMService(
api_key=config.AZURE_OPENAI_API_KEY,
Expand Down Expand Up @@ -273,39 +298,23 @@ async def main():

rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

# Simplified event handler for TTS feedback
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
# Only play the "checking" message if using Google TTS
if tts_provider == TTSProvider.GOOGLE:
for function_call in function_calls:
# Skip "checking" message for instant functions and chart tools
instant_functions = [
"get_current_time",
"utility__getCurrentTime", # NeuroLink equivalent
"utility__generateTimestamp", # NeuroLink timestamp tool
"generate_bar_chart",
"generate_line_chart",
"generate_donut_chart",
"generate_single_stat_card",
]
if function_call.function_name not in instant_functions:
# Play tool call sound if enabled, otherwise use phrases
if tool_call_sound:
await transport.send_audio(tool_call_sound)
else:
phrases = [
"Let me check on that.",
"Give me a moment to do that.",
"I'll get right on that.",
"Working on that for you.",
"One moment — I'm on it",
"One second, boss.",
"On it, boss!",
"Just a second, captain.",
]
await tts.queue_frame(TTSSpeakFrame(random.choice(phrases)))
break
# LLM response started handler - DO NOT start audio here
# Audio should only start when user stops speaking, not when LLM processes
@llm.event_handler("on_llm_response_started")
async def on_llm_response_started(service,function_calls):
Copy link

Copilot AI Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space after comma in function parameter list. Should be service, function_calls.

Suggested change
async def on_llm_response_started(service,function_calls):
async def on_llm_response_started(service, function_calls):

Copilot uses AI. Check for mistakes.
logger.info(f"Function calls started event triggered with {len(function_calls)} calls")

# Only stop audio when function calls complete if audio is actually playing
@llm.event_handler("on_function_calls_finished")
async def on_function_calls_finished(service):
# IMMEDIATELY stop audio when function calls finish (response coming)
if audio_manager.user_has_input or audio_manager.is_playing:

# Use simplified stop method
await audio_manager.stop_and_disable_audio()

else:
logger.info("Function calls finished - audio not playing, no action needed")

messages = [
{"role": "system", "content": system_prompt},
Expand Down Expand Up @@ -334,6 +343,10 @@ async def on_function_calls_started(service, function_calls):
ptt_vad_filter = PTTVADFilter("PTTVADFilter")
pipeline_components.append(ptt_vad_filter) # Filter VAD frames after STT

# Add user speaking audio processor - manages audio based on user speech
user_speaking_processor = UserSpeakingAudioProcessor("UserSpeakingAudioProcessor")
pipeline_components.append(user_speaking_processor) # Add after STT/PTT, before RTVI

pipeline_components.extend([rtvi, context_aggregator.user()])
if (
config.MEM0_ENABLED
Expand Down
234 changes: 234 additions & 0 deletions app/agents/voice/automatic/audio/audio_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
"""
Simplified Audio Manager - Minimal variables, seamless playback, immediate stop capability
"""

import asyncio
from typing import Optional
from pydub import AudioSegment
from pipecat.frames.frames import OutputAudioRawFrame
from app.core.logger import logger

# Configurable audio length constant
AUDIO_LENGTH_SECONDS = 6 # Default duration in seconds
Copy link

Copilot AI Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This global constant should be configurable through the configuration system rather than hardcoded. Consider moving it to the config module or making it a parameter.

Copilot uses AI. Check for mistakes.


class AudioManager:
"""Simplified audio manager with minimal state and seamless chunked playback."""

def __init__(self, tts_service, transport=None):
self.tts_service = tts_service
self.transport = transport # Keep for compatibility

# MINIMAL STATE - Only 4 variables needed
self.is_playing = False
self.loop_count = 0 # Track completed loops (max 3 = 18 seconds total)
self.audio_chunks = [] # 1-second audio chunks for seamless playback
self.user_has_input = False # Ensure audio only starts after user input

# Internal task management
self.loop_task: Optional[asyncio.Task] = None

self._load_waiting_audio()

def _load_waiting_audio(self):
"""Load waiting audio and split into 1-second chunks for seamless playback."""
try:
wav_file_path = f"app/agents/voice/automatic/audio/waiting_{int(AUDIO_LENGTH_SECONDS)}sec.wav"

audio = AudioSegment.from_wav(wav_file_path)
# Convert to pipeline format
audio = audio.set_frame_rate(16000).set_channels(1).set_sample_width(2)

# Split into clean 1-second chunks
self.audio_chunks = []
chunk_duration_ms = 1000 # 1 second in milliseconds

for i in range(0, len(audio), chunk_duration_ms):
chunk = audio[i:i + chunk_duration_ms]

# Ensure chunk is exactly 1 second (pad if necessary)
if len(chunk) < chunk_duration_ms:
silence_needed = chunk_duration_ms - len(chunk)
silence = AudioSegment.silent(duration=silence_needed, frame_rate=16000)
chunk = chunk + silence

self.audio_chunks.append(chunk.raw_data)

logger.info(f"Loaded {AUDIO_LENGTH_SECONDS}-second waiting audio: {len(self.audio_chunks)} chunks")

except Exception as e:
logger.error(f"Failed to load waiting_{int(AUDIO_LENGTH_SECONDS)}sec.wav: {e}")
self.audio_chunks = []

def set_user_input(self):
"""Mark that user has provided input - required for audio to start."""
self.user_has_input = True
logger.info("User input detected - audio enabled")

async def start_audio(self):
"""Start seamless chunked audio playback (only if user has provided input)."""

# Check prerequisites
if not self.user_has_input:
return

if not self.audio_chunks:
return

if self.is_playing:
return

# Start audio
self.is_playing = True
self.loop_count = 0

# Cancel any existing task
if self.loop_task and not self.loop_task.done():
self.loop_task.cancel()

# Start seamless audio streaming
self.loop_task = asyncio.create_task(self._stream_seamless_audio())
# logger.info("Started audio playback")

async def stop_and_disable_audio(self):
"""Stop audio immediately and disable until next user input."""
# Set stop flags
self.is_playing = False
self.user_has_input = False # Require new user input for next audio
self.loop_count = 0

# Cancel audio task
if self.loop_task and not self.loop_task.done():
self.loop_task.cancel()
try:
await self.loop_task
except asyncio.CancelledError:
pass

# Clear audio queue aggressively
await self._clear_audio_queue()

def reset(self):
"""Reset for new conversation cycle."""
self.is_playing = False
self.user_has_input = False
self.loop_count = 0

if self.loop_task and not self.loop_task.done():
self.loop_task.cancel()

def set_bot_speaking(self, speaking: bool):
"""Track when bot starts/stops speaking to prevent audio during speech."""
if speaking and self.is_playing:
logger.info("Bot started speaking - stopping audio")
asyncio.create_task(self.stop_and_disable_audio())

async def _stream_seamless_audio(self):
"""Stream audio chunks seamlessly with minimal buffer for immediate stopping."""
try:
if not self.audio_chunks:
return

total_chunks = len(self.audio_chunks) # 6 chunks = 6 seconds
max_loops = 3 # Maximum 3 loops = 18 seconds total
chunk_index = 0

# Pre-queue 1 chunk for seamless start (minimal buffer)
if self.is_playing:
audio_frame = OutputAudioRawFrame(
audio=self.audio_chunks[chunk_index],
sample_rate=16000,
num_channels=1
)
await self.tts_service.queue_frame(audio_frame)
chunk_index = 1

# Continue streaming with optimal timing for seamless playback
while self.is_playing and self.loop_count < max_loops:

# Wait 0.8 seconds before queuing next chunk (seamless timing)
await asyncio.sleep(0.8)

# Check if we should stop
if not self.is_playing:
break

# Queue next chunk
current_chunk = self.audio_chunks[chunk_index % total_chunks]
audio_frame = OutputAudioRawFrame(
audio=current_chunk,
sample_rate=16000,
num_channels=1
)
await self.tts_service.queue_frame(audio_frame)

chunk_index += 1

# Update loop count when we complete a full cycle
if chunk_index % total_chunks == 0:
self.loop_count += 1

except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error in audio streaming: {e}")
finally:
self.is_playing = False

async def _clear_audio_queue(self):
"""Clear audio queue to prevent delayed audio after stop."""
try:
# Try to interrupt TTS service
if hasattr(self.tts_service, 'interrupt'):
if asyncio.iscoroutinefunction(self.tts_service.interrupt):
await self.tts_service.interrupt()
else:
self.tts_service.interrupt()

# Send brief silence to flush pipeline
silence_duration = 0.01 # 10ms silence
sample_rate = 16000
silence_samples = int(silence_duration * sample_rate)
silence_data = b'\x00' * (silence_samples * 2) # 16-bit silence

silence_frame = OutputAudioRawFrame(
audio=silence_data,
sample_rate=16000,
num_channels=1
)
await self.tts_service.queue_frame(silence_frame)

except Exception as e:
logger.debug(f"Audio queue clearing failed: {e}")


# Global audio manager instance
_audio_manager: Optional[AudioManager] = None


def get_audio_manager() -> Optional[AudioManager]:
"""Get the global audio manager instance."""
return _audio_manager


def set_audio_manager(audio_manager: AudioManager):
"""Set the global audio manager instance."""
global _audio_manager
_audio_manager = audio_manager


def initialize_audio_manager(tts_service, transport=None) -> AudioManager:
"""Initialize and return the audio manager."""
audio_manager = AudioManager(tts_service, transport)
set_audio_manager(audio_manager)
return audio_manager


# Simple helper functions for external use


def reset_for_new_input():
"""Legacy method - reset for new input."""
audio_manager = get_audio_manager()
if audio_manager:
audio_manager.reset()
Binary file not shown.
Loading