Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
481 changes: 369 additions & 112 deletions app/agents/voice/automatic/__init__.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Pipeline restart manager with generic error detection for STT fallback scenarios.
"""

from pipecat.frames.frames import ErrorFrame

from app.core.logger import logger


class PipelineRestartManager:
"""Manages pipeline restart decisions based on STT provider errors."""

def __init__(self):
pass

def is_soniox_error(self, error_frame: ErrorFrame) -> bool:
"""
Check if the error is from Soniox STT service.
Args:
error_frame: The ErrorFrame object from the pipeline
Returns:
True if the error is from Soniox, False otherwise
"""
if not isinstance(error_frame, ErrorFrame):
return False

# Check error message for Soniox-specific patterns
error_message = str(error_frame.error).lower()

# Look for Soniox-specific error patterns
soniox_patterns = [
"soniox",
"timed out during handshake",
"websocket connection failed",
"connection timeout",
"handshake timeout",
]

for pattern in soniox_patterns:
if pattern in error_message:
logger.debug(
f"Detected Soniox error pattern: {pattern} in {error_message}"
)
return True

return False

def should_enable_fallback(
self, error_frame: ErrorFrame, current_stt_provider: str, fallback_enabled: bool
) -> bool:
"""
Determine if fallback should be enabled based on the error and current configuration.
Args:
error_frame: The ErrorFrame object from the pipeline
current_stt_provider: The currently active STT provider name
fallback_enabled: Whether fallback is enabled in configuration
Returns:
True if fallback should be triggered, False otherwise
"""
if not fallback_enabled:
logger.debug("Fallback disabled in configuration")
return False

if not isinstance(error_frame, ErrorFrame):
logger.debug("Not an ErrorFrame, skipping fallback")
return False

# For now, only handle Soniox errors, but this can be extended
if current_stt_provider.lower() == "soniox":
if self.is_soniox_error(error_frame):
logger.info(
f"Soniox error detected, enabling fallback: {error_frame.error}"
)
return True

logger.debug(
f"No fallback needed for {current_stt_provider} error: {error_frame.error}"
)
return False
168 changes: 168 additions & 0 deletions app/agents/voice/automatic/services/fallback/session_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""
Fallback session manager for handling automatic session restart during STT fallback scenarios.
"""

import asyncio
from dataclasses import asdict, dataclass
from typing import Any, Dict, Optional

from app.core.logger import logger


@dataclass
class FallbackSessionContext:
"""Context information for fallback sessions."""

original_session_id: str
room_url: str
token: str
bot_name: str
original_stt_provider: str
fallback_stt_provider: str
error_reason: str
session_args: Dict[str, Any]
should_auto_restart: bool = True


class FallbackSessionManager:
"""Manages fallback session state and auto-restart logic."""

def __init__(self):
# Store fallback contexts for sessions that need auto-restart
self._fallback_contexts: Dict[str, FallbackSessionContext] = {}
self._restart_delay = 1.0 # seconds to wait before restarting

def register_fallback_session(self, context: FallbackSessionContext):
"""Register a session for fallback auto-restart.
Args:
context: Fallback session context with restart parameters
"""
self._fallback_contexts[context.original_session_id] = context
logger.info(
f"Registered fallback session {context.original_session_id} for auto-restart"
)
logger.debug(
f"Fallback contexts now contains: {list(self._fallback_contexts.keys())}"
)

def get_fallback_context(self, session_id: str) -> Optional[FallbackSessionContext]:
"""Get fallback context for a session.
Args:
session_id: The session ID to get context for
Returns:
FallbackSessionContext if session is registered for fallback, None otherwise
"""
logger.debug(f"Looking up fallback context for session {session_id}")
logger.debug(
f"Available fallback contexts: {list(self._fallback_contexts.keys())}"
)
context = self._fallback_contexts.get(session_id)
logger.debug(f"Found context: {context is not None}")
return context

def remove_fallback_session(self, session_id: str):
"""Remove a session from fallback tracking.
Args:
session_id: The session ID to remove
"""
if session_id in self._fallback_contexts:
del self._fallback_contexts[session_id]
logger.debug(f"Removed fallback session {session_id} from tracking")

def is_fallback_session(self, session_id: str) -> bool:
"""Check if a session is registered for fallback auto-restart.
Args:
session_id: The session ID to check
Returns:
True if session should auto-restart, False otherwise
"""
return session_id in self._fallback_contexts

async def schedule_auto_restart(
self, context: FallbackSessionContext, restart_callback
):
"""Schedule automatic restart of a fallback session.
Args:
context: Fallback session context
restart_callback: Async function to call for restarting the session
"""
logger.info(
f"Scheduling auto-restart for session {context.original_session_id} in {self._restart_delay}s"
)

# Wait before restarting to allow cleanup
await asyncio.sleep(self._restart_delay)

try:
# Call the restart callback with updated session parameters
await restart_callback(context)
logger.info(
f"Auto-restart completed for session {context.original_session_id}"
)
except Exception as e:
logger.error(
f"Failed to auto-restart session {context.original_session_id}: {e}"
)
finally:
# Clean up the fallback context
self.remove_fallback_session(context.original_session_id)

def create_restart_args(self, context: FallbackSessionContext) -> Dict[str, Any]:
"""Create new session arguments for restart with fallback STT provider.
Args:
context: Fallback session context
Returns:
Dictionary of session arguments for the restarted session
"""
# Copy original session args and update STT provider context
restart_args = context.session_args.copy()

# Mark this as a fallback session restart
restart_args["is_fallback_restart"] = True
restart_args["original_stt_provider"] = context.original_stt_provider
restart_args["fallback_stt_provider"] = context.fallback_stt_provider
restart_args["fallback_reason"] = context.error_reason

return restart_args

def get_session_stats(self) -> Dict[str, Any]:
"""Get statistics about fallback sessions.
Returns:
Dictionary with fallback session statistics
"""
return {
"active_fallback_sessions": len(self._fallback_contexts),
"session_ids": list(self._fallback_contexts.keys()),
"restart_delay": self._restart_delay,
}


# Global fallback session manager instance
_fallback_manager: Optional[FallbackSessionManager] = None


def get_fallback_session_manager() -> FallbackSessionManager:
"""Get the global fallback session manager instance.
Returns:
FallbackSessionManager singleton instance
"""
global _fallback_manager
if _fallback_manager is None:
logger.debug("Creating new FallbackSessionManager instance")
_fallback_manager = FallbackSessionManager()
else:
logger.debug(
f"Returning existing FallbackSessionManager instance with {len(_fallback_manager._fallback_contexts)} contexts"
)
return _fallback_manager
20 changes: 14 additions & 6 deletions app/agents/voice/automatic/stt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,15 @@ def parse_soniox_context() -> Optional[SonioxContextObject]:
return None


def get_stt_service(voice_name: Optional[str] = None):
def get_stt_service(
voice_name: Optional[str] = None, fallback_stt_provider: Optional[str] = None
):
"""
Returns an STT service instance based on the environment configuration.
Args:
voice_name: Voice name to determine STT provider override for specific voices
fallback_stt_provider: STT provider to use when in fallback mode (overrides config)
"""
# Check for MIA voice with OpenAI override
if voice_name == VoiceName.MIA.value and config.ENABLE_OPENAI_FOR_MIA:
Expand All @@ -116,8 +119,13 @@ def get_stt_service(voice_name: Optional[str] = None):
temperature=0.0, # Deterministic output for consistency
)

# Default behavior - use configured STT provider
if config.STT_PROVIDER == "assemblyai":
# Determine which STT provider to use (fallback override or config)
effective_stt_provider = (
fallback_stt_provider if fallback_stt_provider else config.STT_PROVIDER
)

# Default behavior - use configured or fallback STT provider
if effective_stt_provider == "assemblyai":
if not config.ASSEMBLYAI_API_KEY:
raise ValueError(
"ASSEMBLYAI_API_KEY is required when STT_PROVIDER=assemblyai"
Expand All @@ -130,7 +138,7 @@ def get_stt_service(voice_name: Optional[str] = None):
vad_force_turn_endpoint=True,
# No connection_params needed since we're using VAD for turn detection
)
elif config.STT_PROVIDER == "openai":
elif effective_stt_provider == "openai":
if not config.OPENAI_STT_API_KEY:
raise ValueError(
"OPENAI_STT_API_KEY or OPENAI_API_KEY is required when STT_PROVIDER=openai"
Expand All @@ -147,7 +155,7 @@ def get_stt_service(voice_name: Optional[str] = None):
prompt=config.AUTOMATIC_OPENAI_STT_PROMPT,
temperature=0.0, # Deterministic output for consistency
)
elif config.STT_PROVIDER == "deepgram":
elif effective_stt_provider == "deepgram":
if not config.DEEPGRAM_API_KEY:
raise ValueError("DEEPGRAM_API_KEY is required when STT_PROVIDER=deepgram")

Expand Down Expand Up @@ -184,7 +192,7 @@ def get_stt_service(voice_name: Optional[str] = None):
return DeepgramSTTService(
api_key=config.DEEPGRAM_API_KEY, live_options=live_options
)
elif config.STT_PROVIDER == "soniox":
elif effective_stt_provider == "soniox":
if not config.SONIOX_API_KEY:
raise ValueError("SONIOX_API_KEY is required when STT_PROVIDER=soniox")

Expand Down
27 changes: 20 additions & 7 deletions app/agents/voice/automatic/tools/juspay/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,9 @@ async def create_euler_offer(params: FunctionCallParams):
}

# Convert IST dates to ISO format for API payload
logger.info(f"[DEBUG] Starting date conversion for start_date: {start_date}, end_date: {end_date}")
logger.info(
f"[DEBUG] Starting date conversion for start_date: {start_date}, end_date: {end_date}"
)
try:
ist = pytz.timezone("Asia/Kolkata")

Expand Down Expand Up @@ -625,7 +627,9 @@ async def create_euler_offer(params: FunctionCallParams):
return

# Build payment instruments payload
logger.info(f"[DEBUG] Building payment instruments payload for: {payment_instruments}")
logger.info(
f"[DEBUG] Building payment instruments payload for: {payment_instruments}"
)
if payment_instruments:
payment_instruments_payload = [
instrument_map[instrument]
Expand Down Expand Up @@ -713,11 +717,16 @@ async def create_euler_offer(params: FunctionCallParams):

# Make API request
endpoint = f"{EULER_DASHBOARD_API_URL}/api/offers/dashboard/create?merchant_id={merchant_id}"
headers = {"Content-Type": "application/json", "x-web-logintoken": euler_token}
headers = {
"Content-Type": "application/json",
"x-web-logintoken": euler_token,
}
logger.info(f"[DEBUG] Endpoint and headers prepared: {endpoint}")

except Exception as e:
logger.error(f"[DEBUG] Error during API payload construction: {e}", exc_info=True)
logger.error(
f"[DEBUG] Error during API payload construction: {e}", exc_info=True
)
await params.result_callback(
{
"error": f"Tool Error: [create_euler_offer] Error constructing API payload: {str(e)}"
Expand All @@ -728,20 +737,24 @@ async def create_euler_offer(params: FunctionCallParams):
logger.info(f"Making offer creation request to: {endpoint}")
logger.info(f"Payload size: {len(str(api_payload))} characters")
try:
logger.info(f"Payload preview: {json.dumps(api_payload, indent=2)[:500]}...")
logger.info(
f"Payload preview: {json.dumps(api_payload, indent=2)[:500]}..."
)
except Exception as e:
logger.error(f"Failed to serialize payload for logging: {e}")

async with create_http_client(timeout=10.0) as client:
response = await client.post(endpoint, json=api_payload, headers=headers)
logger.info(f"Received response from Euler API: Status {response.status_code}")
logger.info(
f"Received response from Euler API: Status {response.status_code}"
)
logger.info(f"Response headers: {dict(response.headers)}")
try:
response_text = response.text
logger.info(f"Response body: {response_text}")
except Exception as e:
logger.error(f"Failed to read response body: {e}")

if response.status_code == 200:
response_data = response.json()
offer_id = response_data.get("offer_id")
Expand Down
Loading