Skip to content

Commit 1dfeee1

Browse files
badri-singhalswaroopvarma1
authored andcommitted
Update Call status during pipeline task timeout
1 parent d9c794c commit 1dfeee1

File tree

1 file changed

+64
-157
lines changed

1 file changed

+64
-157
lines changed

app/agents/voice/breeze_buddy/workflows/order_confirmation/websocket_bot.py

Lines changed: 64 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -310,98 +310,12 @@ async def on_client_connected(transport, client):
310310
@self.transport.event_handler("on_client_disconnected")
311311
async def on_client_disconnected(transport, client):
312312
logger.info(f"Client disconnected: {client}")
313-
if not self.conversation_ended:
314-
self.conversation_ended = True
315-
logger.info(
316-
"Client disconnected unexpectedly. Updating call status directly."
317-
)
318-
try:
319-
if self.call_sid:
320-
transcription = []
321-
if self.context:
322-
history = self.context.messages
323-
for msg in history:
324-
if (
325-
isinstance(msg, dict)
326-
and "role" in msg
327-
and "content" in msg
328-
and isinstance(msg["content"], str)
329-
):
330-
transcription.append(
331-
{"role": msg["role"], "content": msg["content"]}
332-
)
333-
334-
await self.completion_function(
335-
call_id=self.call_sid,
336-
outcome=(
337-
LeadCallOutcome.BUSY
338-
if self.outcome == "unknown"
339-
else OUTCOME_TO_ENUM.get(self.outcome)
340-
),
341-
transcription={
342-
"messages": transcription,
343-
"call_sid": self.call_sid,
344-
},
345-
call_end_time=datetime.now(),
346-
)
347-
logger.info(
348-
f"Updated database for call_id: {self.call_sid} with outcome: INTERRUPTED"
349-
)
350-
summary_data = {
351-
"callSid": self.call_sid,
352-
"outcome": (
353-
LeadCallOutcome.BUSY
354-
if self.outcome == "unknown"
355-
else OUTCOME_TO_ENUM.get(self.outcome)
356-
),
357-
"orderId": self.order_id,
358-
}
359-
if (
360-
self.reporting_webhook_url
361-
and summary_data["outcome"] != LeadCallOutcome.BUSY
362-
):
363-
try:
364-
payload = json.dumps(
365-
summary_data, separators=(",", ":")
366-
)
367-
signature = calculate_hmac_sha256(
368-
payload, ORDER_CONFIRMATION_WEBHOOK_SECRET_KEY
369-
)
370-
headers = {
371-
"Content-Type": "application/json",
372-
}
373-
374-
if signature:
375-
headers["checksum"] = signature
376-
377-
async with self.aiohttp_session.post(
378-
self.reporting_webhook_url,
379-
json=summary_data,
380-
headers=headers,
381-
) as response:
382-
if response.status == 200:
383-
logger.info(
384-
"Successfully sent call summary webhook on disconnect."
385-
)
386-
else:
387-
response_text = await response.text()
388-
logger.error(
389-
f"Failed to send call summary webhook on disconnect. Status: {response.status}, Body: {response_text}"
390-
)
391-
except Exception as e:
392-
logger.error(
393-
f"Error sending webhook on disconnect: {e}"
394-
)
395-
else:
396-
logger.warning(
397-
"No call_id found, skipping database update on disconnect."
398-
)
399-
except Exception as e:
400-
logger.error(
401-
f"Error during direct DB update on disconnect for call_id {self.call_sid}: {e}"
402-
)
313+
await self._handle_unexpected_disconnect("Client disconnected unexpectedly")
403314

404-
await self.task.cancel()
315+
@self.task.event_handler("on_idle_timeout")
316+
async def on_idle_timeout(task):
317+
logger.info("Idle timeout detected.")
318+
await self._handle_unexpected_disconnect("Idle timeout")
405319

406320
runner = PipelineRunner(handle_sigint=False, force_gc=True)
407321

@@ -468,10 +382,20 @@ def _get_system_prompt(
468382
"""
469383

470384
async def _end_conversation_handler(self, flow_manager, args):
471-
self.conversation_ended = True
472-
logger.info(f"Ending conversation with outcome: {self.outcome}")
385+
if not self.conversation_ended:
386+
self.conversation_ended = True
387+
logger.info(f"Ending conversation with outcome: {self.outcome}")
388+
await self._finalize_call()
389+
390+
async def _handle_unexpected_disconnect(self, reason: str):
391+
if not self.conversation_ended:
392+
self.conversation_ended = True
393+
logger.info(f"{reason}. Updating call status directly.")
394+
self.outcome = "busy"
395+
await self._finalize_call()
396+
397+
async def _finalize_call(self):
473398
try:
474-
# Prepare transcription and outcome data
475399
transcription = []
476400
if self.context:
477401
history = self.context.messages
@@ -485,78 +409,61 @@ async def _end_conversation_handler(self, flow_manager, args):
485409
transcription.append(
486410
{"role": msg["role"], "content": msg["content"]}
487411
)
488-
summary_data = {
489-
"callSid": self.call_sid,
490-
"outcome": OUTCOME_TO_ENUM.get(self.outcome),
491-
"updatedAddress": self.updated_address,
492-
"orderId": self.order_id,
493-
}
494-
logger.info(f"Call summary data: {summary_data}")
495-
if (
496-
self.reporting_webhook_url
497-
and summary_data["outcome"] != LeadCallOutcome.BUSY
498-
):
499-
try:
500-
payload = json.dumps(summary_data, separators=(",", ":"))
501-
signature = calculate_hmac_sha256(
502-
payload, ORDER_CONFIRMATION_WEBHOOK_SECRET_KEY
503-
)
504-
headers = {
505-
"Content-Type": "application/json",
506-
}
507412

508-
if signature:
509-
headers["checksum"] = signature
510-
511-
async with self.aiohttp_session.post(
512-
self.reporting_webhook_url,
513-
json=summary_data,
514-
headers=headers,
515-
) as response:
516-
if response.status == 200:
517-
logger.info("Successfully sent call summary webhook.")
518-
else:
519-
response_text = await response.text()
520-
logger.error(
521-
f"Failed to send call summary webhook. Status: {response.status}, Body: {response_text}"
522-
)
523-
except Exception as e:
524-
logger.error(f"Error sending webhook: {e}")
413+
call_outcome = OUTCOME_TO_ENUM.get(self.outcome, LeadCallOutcome.BUSY)
414+
415+
summary_data = {
416+
"callSid": self.call_sid,
417+
"outcome": call_outcome,
418+
"updatedAddress": self.updated_address,
419+
"orderId": self.order_id,
420+
}
421+
logger.info(f"Call summary data: {summary_data}")
422+
423+
if self.reporting_webhook_url and call_outcome != LeadCallOutcome.BUSY:
424+
try:
425+
payload = json.dumps(summary_data, separators=(",", ":"))
426+
signature = calculate_hmac_sha256(
427+
payload, ORDER_CONFIRMATION_WEBHOOK_SECRET_KEY
428+
)
429+
headers = {"Content-Type": "application/json"}
430+
if signature:
431+
headers["checksum"] = signature
432+
433+
async with self.aiohttp_session.post(
434+
self.reporting_webhook_url, json=summary_data, headers=headers
435+
) as response:
436+
if response.status == 200:
437+
logger.info("Successfully sent call summary webhook.")
438+
else:
439+
response_text = await response.text()
440+
logger.error(
441+
f"Failed to send call summary webhook. Status: {response.status}, Body: {response_text}"
442+
)
443+
except Exception as e:
444+
logger.error(f"Error sending webhook: {e}")
525445

526446
if self.hangup_function:
527447
self.hangup_function(self.call_sid)
528448
logger.info(f"Call {self.call_sid} hung up successfully.")
529449

530-
# Update database with call completion
531450
if self.call_sid:
532-
try:
533-
call_outcome = OUTCOME_TO_ENUM.get(self.outcome)
534-
535-
if call_outcome:
536-
await self.completion_function(
537-
call_id=self.call_sid,
538-
outcome=call_outcome,
539-
transcription={
540-
"messages": transcription,
541-
"call_sid": self.call_sid,
542-
},
543-
call_end_time=datetime.now(),
544-
updated_address=self.updated_address,
545-
)
546-
logger.info(
547-
f"Updated database for call_id: {self.call_sid} with outcome: {call_outcome}"
548-
)
549-
else:
550-
logger.warning(
551-
f"Unknown outcome '{self.outcome}' for call_id: {self.call_sid}"
552-
)
553-
554-
except Exception as e:
555-
logger.error(
556-
f"Error updating database for call_id {self.call_sid}: {e}"
557-
)
451+
await self.completion_function(
452+
call_id=self.call_sid,
453+
outcome=call_outcome,
454+
transcription={
455+
"messages": transcription,
456+
"call_sid": self.call_sid,
457+
},
458+
call_end_time=datetime.now(),
459+
updated_address=self.updated_address,
460+
)
461+
logger.info(
462+
f"Updated database for call_id: {self.call_sid} with outcome: {call_outcome}"
463+
)
558464
else:
559465
logger.warning("No call_id found, skipping database update")
466+
560467
except Exception as e:
561468
logger.error(f"Failed to hang up call {self.call_sid}: {str(e)}")
562469
finally:

0 commit comments

Comments
 (0)