@@ -513,45 +513,61 @@ async def _call_llm_async(
513
513
# Calls the LLM.
514
514
llm = self .__get_llm (invocation_context )
515
515
with tracer .start_as_current_span ('call_llm' ):
516
- if invocation_context .run_config .support_cfc :
517
- invocation_context .live_request_queue = LiveRequestQueue ()
518
- async for llm_response in self .run_live (invocation_context ):
519
- # Runs after_model_callback if it exists.
520
- if altered_llm_response := await self ._handle_after_model_callback (
521
- invocation_context , llm_response , model_response_event
522
- ):
523
- llm_response = altered_llm_response
524
- # only yield partial response in SSE streaming mode
525
- if (
526
- invocation_context .run_config .streaming_mode == StreamingMode .SSE
527
- or not llm_response .partial
528
- ):
529
- yield llm_response
530
- if llm_response .turn_complete :
531
- invocation_context .live_request_queue .close ()
532
- else :
533
- # Check if we can make this llm call or not. If the current call pushes
534
- # the counter beyond the max set value, then the execution is stopped
535
- # right here, and exception is thrown.
536
- invocation_context .increment_llm_call_count ()
537
- async for llm_response in llm .generate_content_async (
538
- llm_request ,
539
- stream = invocation_context .run_config .streaming_mode
540
- == StreamingMode .SSE ,
516
+ try :
517
+ if (
518
+ invocation_context .run_config
519
+ and invocation_context .run_config .support_cfc
541
520
):
542
- trace_call_llm (
543
- invocation_context ,
544
- model_response_event .id ,
521
+ invocation_context .live_request_queue = LiveRequestQueue ()
522
+ async for llm_response in self .run_live (invocation_context ):
523
+ # Runs after_model_callback if it exists.
524
+ if altered_llm_response := await self ._handle_after_model_callback (
525
+ invocation_context , llm_response , model_response_event
526
+ ):
527
+ llm_response = altered_llm_response
528
+ # only yield partial response in SSE streaming mode
529
+ if (
530
+ invocation_context .run_config .streaming_mode
531
+ == StreamingMode .SSE
532
+ or not llm_response .partial
533
+ ):
534
+ yield llm_response
535
+ if llm_response .turn_complete :
536
+ invocation_context .live_request_queue .close ()
537
+ else :
538
+ # Check if we can make this llm call or not. If the current call pushes
539
+ # the counter beyond the max set value, then the execution is stopped
540
+ # right here, and exception is thrown.
541
+ invocation_context .increment_llm_call_count ()
542
+ async for llm_response in llm .generate_content_async (
545
543
llm_request ,
546
- llm_response ,
547
- )
548
- # Runs after_model_callback if it exists.
549
- if altered_llm_response := await self ._handle_after_model_callback (
550
- invocation_context , llm_response , model_response_event
544
+ stream = invocation_context .run_config .streaming_mode
545
+ == StreamingMode .SSE ,
551
546
):
552
- llm_response = altered_llm_response
547
+ trace_call_llm (
548
+ invocation_context ,
549
+ model_response_event .id ,
550
+ llm_request ,
551
+ llm_response ,
552
+ )
553
+ # Runs after_model_callback if it exists.
554
+ if altered_llm_response := await self ._handle_after_model_callback (
555
+ invocation_context , llm_response , model_response_event
556
+ ):
557
+ llm_response = altered_llm_response
553
558
554
- yield llm_response
559
+ yield llm_response
560
+ except Exception as model_error :
561
+ if (
562
+ invocation_context .run_config
563
+ and invocation_context .run_config .support_cfc
564
+ and invocation_context .live_request_queue
565
+ ):
566
+ invocation_context .live_request_queue .close ()
567
+ error_response = await self ._handle_model_error (
568
+ invocation_context , llm_request , model_response_event , model_error
569
+ )
570
+ yield error_response
555
571
556
572
async def _handle_before_model_callback (
557
573
self ,
@@ -592,6 +608,29 @@ async def _handle_before_model_callback(
592
608
if callback_response :
593
609
return callback_response
594
610
611
+ async def _handle_model_error (
612
+ self ,
613
+ invocation_context : InvocationContext ,
614
+ llm_request : LlmRequest ,
615
+ model_response_event : Event ,
616
+ model_error : Exception ,
617
+ ) -> LlmResponse :
618
+ """Handle model errors through plugin system."""
619
+ callback_context = CallbackContext (
620
+ invocation_context , event_actions = model_response_event .actions
621
+ )
622
+ error_response = (
623
+ await invocation_context .plugin_manager .run_on_model_error_callback (
624
+ callback_context = callback_context ,
625
+ llm_request = llm_request ,
626
+ error = model_error ,
627
+ )
628
+ )
629
+ if error_response is not None :
630
+ return error_response
631
+ else :
632
+ raise model_error
633
+
595
634
async def _handle_after_model_callback (
596
635
self ,
597
636
invocation_context : InvocationContext ,
0 commit comments