Skip to content

Commit 4ce33c7

Browse files
committed
soniox fallback with deepgram
1 parent 168139d commit 4ce33c7

File tree

10 files changed

+1660
-128
lines changed

10 files changed

+1660
-128
lines changed

app/agents/voice/automatic/__init__.py

Lines changed: 369 additions & 112 deletions
Large diffs are not rendered by default.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
"""
2+
Pipeline restart manager with generic error detection for STT fallback scenarios.
3+
"""
4+
5+
from pipecat.frames.frames import ErrorFrame
6+
7+
from app.core.logger import logger
8+
9+
10+
class PipelineRestartManager:
11+
"""Manages pipeline restart decisions based on STT provider errors."""
12+
13+
def __init__(self):
14+
pass
15+
16+
def is_soniox_error(self, error_frame: ErrorFrame) -> bool:
17+
"""
18+
Check if the error is from Soniox STT service.
19+
20+
Args:
21+
error_frame: The ErrorFrame object from the pipeline
22+
23+
Returns:
24+
True if the error is from Soniox, False otherwise
25+
"""
26+
if not isinstance(error_frame, ErrorFrame):
27+
return False
28+
29+
# Check error message for Soniox-specific patterns
30+
error_message = str(error_frame.error).lower()
31+
32+
# Look for Soniox-specific error patterns
33+
soniox_patterns = [
34+
"soniox",
35+
"timed out during handshake",
36+
"websocket connection failed",
37+
"connection timeout",
38+
"handshake timeout",
39+
]
40+
41+
for pattern in soniox_patterns:
42+
if pattern in error_message:
43+
logger.debug(
44+
f"Detected Soniox error pattern: {pattern} in {error_message}"
45+
)
46+
return True
47+
48+
return False
49+
50+
def should_enable_fallback(
51+
self, error_frame: ErrorFrame, current_stt_provider: str, fallback_enabled: bool
52+
) -> bool:
53+
"""
54+
Determine if fallback should be enabled based on the error and current configuration.
55+
56+
Args:
57+
error_frame: The ErrorFrame object from the pipeline
58+
current_stt_provider: The currently active STT provider name
59+
fallback_enabled: Whether fallback is enabled in configuration
60+
61+
Returns:
62+
True if fallback should be triggered, False otherwise
63+
"""
64+
if not fallback_enabled:
65+
logger.debug("Fallback disabled in configuration")
66+
return False
67+
68+
if not isinstance(error_frame, ErrorFrame):
69+
logger.debug("Not an ErrorFrame, skipping fallback")
70+
return False
71+
72+
# For now, only handle Soniox errors, but this can be extended
73+
if current_stt_provider.lower() == "soniox":
74+
if self.is_soniox_error(error_frame):
75+
logger.info(
76+
f"Soniox error detected, enabling fallback: {error_frame.error}"
77+
)
78+
return True
79+
80+
logger.debug(
81+
f"No fallback needed for {current_stt_provider} error: {error_frame.error}"
82+
)
83+
return False
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
"""
2+
Fallback session manager for handling automatic session restart during STT fallback scenarios.
3+
"""
4+
5+
import asyncio
6+
from dataclasses import asdict, dataclass
7+
from typing import Any, Dict, Optional
8+
9+
from app.core.logger import logger
10+
11+
12+
@dataclass
13+
class FallbackSessionContext:
14+
"""Context information for fallback sessions."""
15+
16+
original_session_id: str
17+
room_url: str
18+
token: str
19+
bot_name: str
20+
original_stt_provider: str
21+
fallback_stt_provider: str
22+
error_reason: str
23+
session_args: Dict[str, Any]
24+
should_auto_restart: bool = True
25+
26+
27+
class FallbackSessionManager:
28+
"""Manages fallback session state and auto-restart logic."""
29+
30+
def __init__(self):
31+
# Store fallback contexts for sessions that need auto-restart
32+
self._fallback_contexts: Dict[str, FallbackSessionContext] = {}
33+
self._restart_delay = 1.0 # seconds to wait before restarting
34+
35+
def register_fallback_session(self, context: FallbackSessionContext):
36+
"""Register a session for fallback auto-restart.
37+
38+
Args:
39+
context: Fallback session context with restart parameters
40+
"""
41+
self._fallback_contexts[context.original_session_id] = context
42+
logger.info(
43+
f"Registered fallback session {context.original_session_id} for auto-restart"
44+
)
45+
logger.debug(
46+
f"Fallback contexts now contains: {list(self._fallback_contexts.keys())}"
47+
)
48+
49+
def get_fallback_context(self, session_id: str) -> Optional[FallbackSessionContext]:
50+
"""Get fallback context for a session.
51+
52+
Args:
53+
session_id: The session ID to get context for
54+
55+
Returns:
56+
FallbackSessionContext if session is registered for fallback, None otherwise
57+
"""
58+
logger.debug(f"Looking up fallback context for session {session_id}")
59+
logger.debug(
60+
f"Available fallback contexts: {list(self._fallback_contexts.keys())}"
61+
)
62+
context = self._fallback_contexts.get(session_id)
63+
logger.debug(f"Found context: {context is not None}")
64+
return context
65+
66+
def remove_fallback_session(self, session_id: str):
67+
"""Remove a session from fallback tracking.
68+
69+
Args:
70+
session_id: The session ID to remove
71+
"""
72+
if session_id in self._fallback_contexts:
73+
del self._fallback_contexts[session_id]
74+
logger.debug(f"Removed fallback session {session_id} from tracking")
75+
76+
def is_fallback_session(self, session_id: str) -> bool:
77+
"""Check if a session is registered for fallback auto-restart.
78+
79+
Args:
80+
session_id: The session ID to check
81+
82+
Returns:
83+
True if session should auto-restart, False otherwise
84+
"""
85+
return session_id in self._fallback_contexts
86+
87+
async def schedule_auto_restart(
88+
self, context: FallbackSessionContext, restart_callback
89+
):
90+
"""Schedule automatic restart of a fallback session.
91+
92+
Args:
93+
context: Fallback session context
94+
restart_callback: Async function to call for restarting the session
95+
"""
96+
logger.info(
97+
f"Scheduling auto-restart for session {context.original_session_id} in {self._restart_delay}s"
98+
)
99+
100+
# Wait before restarting to allow cleanup
101+
await asyncio.sleep(self._restart_delay)
102+
103+
try:
104+
# Call the restart callback with updated session parameters
105+
await restart_callback(context)
106+
logger.info(
107+
f"Auto-restart completed for session {context.original_session_id}"
108+
)
109+
except Exception as e:
110+
logger.error(
111+
f"Failed to auto-restart session {context.original_session_id}: {e}"
112+
)
113+
finally:
114+
# Clean up the fallback context
115+
self.remove_fallback_session(context.original_session_id)
116+
117+
def create_restart_args(self, context: FallbackSessionContext) -> Dict[str, Any]:
118+
"""Create new session arguments for restart with fallback STT provider.
119+
120+
Args:
121+
context: Fallback session context
122+
123+
Returns:
124+
Dictionary of session arguments for the restarted session
125+
"""
126+
# Copy original session args and update STT provider context
127+
restart_args = context.session_args.copy()
128+
129+
# Mark this as a fallback session restart
130+
restart_args["is_fallback_restart"] = True
131+
restart_args["original_stt_provider"] = context.original_stt_provider
132+
restart_args["fallback_stt_provider"] = context.fallback_stt_provider
133+
restart_args["fallback_reason"] = context.error_reason
134+
135+
return restart_args
136+
137+
def get_session_stats(self) -> Dict[str, Any]:
138+
"""Get statistics about fallback sessions.
139+
140+
Returns:
141+
Dictionary with fallback session statistics
142+
"""
143+
return {
144+
"active_fallback_sessions": len(self._fallback_contexts),
145+
"session_ids": list(self._fallback_contexts.keys()),
146+
"restart_delay": self._restart_delay,
147+
}
148+
149+
150+
# Global fallback session manager instance
151+
_fallback_manager: Optional[FallbackSessionManager] = None
152+
153+
154+
def get_fallback_session_manager() -> FallbackSessionManager:
155+
"""Get the global fallback session manager instance.
156+
157+
Returns:
158+
FallbackSessionManager singleton instance
159+
"""
160+
global _fallback_manager
161+
if _fallback_manager is None:
162+
logger.debug("Creating new FallbackSessionManager instance")
163+
_fallback_manager = FallbackSessionManager()
164+
else:
165+
logger.debug(
166+
f"Returning existing FallbackSessionManager instance with {len(_fallback_manager._fallback_contexts)} contexts"
167+
)
168+
return _fallback_manager

app/agents/voice/automatic/stt/__init__.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,15 @@ def parse_soniox_context() -> Optional[SonioxContextObject]:
9090
return None
9191

9292

93-
def get_stt_service(voice_name: Optional[str] = None):
93+
def get_stt_service(
94+
voice_name: Optional[str] = None, fallback_stt_provider: Optional[str] = None
95+
):
9496
"""
9597
Returns an STT service instance based on the environment configuration.
9698
9799
Args:
98100
voice_name: Voice name to determine STT provider override for specific voices
101+
fallback_stt_provider: STT provider to use when in fallback mode (overrides config)
99102
"""
100103
# Check for MIA voice with OpenAI override
101104
if voice_name == VoiceName.MIA.value and config.ENABLE_OPENAI_FOR_MIA:
@@ -116,8 +119,13 @@ def get_stt_service(voice_name: Optional[str] = None):
116119
temperature=0.0, # Deterministic output for consistency
117120
)
118121

119-
# Default behavior - use configured STT provider
120-
if config.STT_PROVIDER == "assemblyai":
122+
# Determine which STT provider to use (fallback override or config)
123+
effective_stt_provider = (
124+
fallback_stt_provider if fallback_stt_provider else config.STT_PROVIDER
125+
)
126+
127+
# Default behavior - use configured or fallback STT provider
128+
if effective_stt_provider == "assemblyai":
121129
if not config.ASSEMBLYAI_API_KEY:
122130
raise ValueError(
123131
"ASSEMBLYAI_API_KEY is required when STT_PROVIDER=assemblyai"
@@ -130,7 +138,7 @@ def get_stt_service(voice_name: Optional[str] = None):
130138
vad_force_turn_endpoint=True,
131139
# No connection_params needed since we're using VAD for turn detection
132140
)
133-
elif config.STT_PROVIDER == "openai":
141+
elif effective_stt_provider == "openai":
134142
if not config.OPENAI_STT_API_KEY:
135143
raise ValueError(
136144
"OPENAI_STT_API_KEY or OPENAI_API_KEY is required when STT_PROVIDER=openai"
@@ -147,7 +155,7 @@ def get_stt_service(voice_name: Optional[str] = None):
147155
prompt=config.AUTOMATIC_OPENAI_STT_PROMPT,
148156
temperature=0.0, # Deterministic output for consistency
149157
)
150-
elif config.STT_PROVIDER == "deepgram":
158+
elif effective_stt_provider == "deepgram":
151159
if not config.DEEPGRAM_API_KEY:
152160
raise ValueError("DEEPGRAM_API_KEY is required when STT_PROVIDER=deepgram")
153161

@@ -184,7 +192,7 @@ def get_stt_service(voice_name: Optional[str] = None):
184192
return DeepgramSTTService(
185193
api_key=config.DEEPGRAM_API_KEY, live_options=live_options
186194
)
187-
elif config.STT_PROVIDER == "soniox":
195+
elif effective_stt_provider == "soniox":
188196
if not config.SONIOX_API_KEY:
189197
raise ValueError("SONIOX_API_KEY is required when STT_PROVIDER=soniox")
190198

app/agents/voice/automatic/tools/juspay/analytics.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,9 @@ async def create_euler_offer(params: FunctionCallParams):
595595
}
596596

597597
# Convert IST dates to ISO format for API payload
598-
logger.info(f"[DEBUG] Starting date conversion for start_date: {start_date}, end_date: {end_date}")
598+
logger.info(
599+
f"[DEBUG] Starting date conversion for start_date: {start_date}, end_date: {end_date}"
600+
)
599601
try:
600602
ist = pytz.timezone("Asia/Kolkata")
601603

@@ -625,7 +627,9 @@ async def create_euler_offer(params: FunctionCallParams):
625627
return
626628

627629
# Build payment instruments payload
628-
logger.info(f"[DEBUG] Building payment instruments payload for: {payment_instruments}")
630+
logger.info(
631+
f"[DEBUG] Building payment instruments payload for: {payment_instruments}"
632+
)
629633
if payment_instruments:
630634
payment_instruments_payload = [
631635
instrument_map[instrument]
@@ -713,11 +717,16 @@ async def create_euler_offer(params: FunctionCallParams):
713717

714718
# Make API request
715719
endpoint = f"{EULER_DASHBOARD_API_URL}/api/offers/dashboard/create?merchant_id={merchant_id}"
716-
headers = {"Content-Type": "application/json", "x-web-logintoken": euler_token}
720+
headers = {
721+
"Content-Type": "application/json",
722+
"x-web-logintoken": euler_token,
723+
}
717724
logger.info(f"[DEBUG] Endpoint and headers prepared: {endpoint}")
718725

719726
except Exception as e:
720-
logger.error(f"[DEBUG] Error during API payload construction: {e}", exc_info=True)
727+
logger.error(
728+
f"[DEBUG] Error during API payload construction: {e}", exc_info=True
729+
)
721730
await params.result_callback(
722731
{
723732
"error": f"Tool Error: [create_euler_offer] Error constructing API payload: {str(e)}"
@@ -728,20 +737,24 @@ async def create_euler_offer(params: FunctionCallParams):
728737
logger.info(f"Making offer creation request to: {endpoint}")
729738
logger.info(f"Payload size: {len(str(api_payload))} characters")
730739
try:
731-
logger.info(f"Payload preview: {json.dumps(api_payload, indent=2)[:500]}...")
740+
logger.info(
741+
f"Payload preview: {json.dumps(api_payload, indent=2)[:500]}..."
742+
)
732743
except Exception as e:
733744
logger.error(f"Failed to serialize payload for logging: {e}")
734745

735746
async with create_http_client(timeout=10.0) as client:
736747
response = await client.post(endpoint, json=api_payload, headers=headers)
737-
logger.info(f"Received response from Euler API: Status {response.status_code}")
748+
logger.info(
749+
f"Received response from Euler API: Status {response.status_code}"
750+
)
738751
logger.info(f"Response headers: {dict(response.headers)}")
739752
try:
740753
response_text = response.text
741754
logger.info(f"Response body: {response_text}")
742755
except Exception as e:
743756
logger.error(f"Failed to read response body: {e}")
744-
757+
745758
if response.status_code == 200:
746759
response_data = response.json()
747760
offer_id = response_data.get("offer_id")

0 commit comments

Comments
 (0)