@@ -507,7 +507,12 @@ async def append_message(self, message: Any) -> None:
507
507
await self ._append_message (message )
508
508
509
509
async def _append_message (
510
- self , message : Any , * , chunk : ChunkOption = False , stream_id : str | None = None
510
+ self ,
511
+ message : Any ,
512
+ * ,
513
+ chunk : ChunkOption = False ,
514
+ stream_id : str | None = None ,
515
+ normalizer : Callable [[object ], str ] | None = None ,
511
516
) -> None :
512
517
# If currently we're in a stream, handle other messages (outside the stream) later
513
518
if not self ._can_append_message (stream_id ):
@@ -519,6 +524,15 @@ async def _append_message(
519
524
if chunk == "end" :
520
525
self ._current_stream_id = None
521
526
527
+ # Apply the user provided normalizer, if any
528
+ if normalizer is not None :
529
+ res = normalizer (message )
530
+ if not isinstance (res , str ):
531
+ raise ValueError (
532
+ f"Normalizer function must return a string, got { type (res )} "
533
+ )
534
+ message = {"content" : res , "role" : "assistant" }
535
+
522
536
if chunk is False :
523
537
msg = normalize_message (message )
524
538
chunk_content = None
@@ -539,7 +553,11 @@ async def _append_message(
539
553
msg = self ._store_message (msg , chunk = chunk )
540
554
await self ._send_append_message (msg , chunk = chunk )
541
555
542
- async def append_message_stream (self , message : Iterable [Any ] | AsyncIterable [Any ]):
556
+ async def append_message_stream (
557
+ self ,
558
+ message : Iterable [Any ] | AsyncIterable [Any ],
559
+ normalizer : Callable [[object ], str ] | None = None ,
560
+ ) -> None :
543
561
"""
544
562
Append a message as a stream of message chunks.
545
563
@@ -550,6 +568,11 @@ async def append_message_stream(self, message: Iterable[Any] | AsyncIterable[Any
550
568
message chunk formats are supported, including a string, a dictionary with
551
569
`content` and `role` keys, or a relevant chat completion object from
552
570
platforms like OpenAI, Anthropic, Ollama, and others.
571
+ normalizer
572
+ A function to apply to each message chunk (i.e., each item of the `message`
573
+ iterator) before appending it to the chat. This is useful for handling
574
+ response formats that `Chat` may not already natively support. The function
575
+ should take a message chunk and return a string.
553
576
554
577
Note
555
578
----
@@ -562,7 +585,7 @@ async def append_message_stream(self, message: Iterable[Any] | AsyncIterable[Any
562
585
# Run the stream in the background to get non-blocking behavior
563
586
@reactive .extended_task
564
587
async def _stream_task ():
565
- await self ._append_message_stream (message )
588
+ await self ._append_message_stream (message , normalizer )
566
589
567
590
_stream_task ()
568
591
@@ -582,15 +605,21 @@ async def _handle_error():
582
605
ctx .on_invalidate (_handle_error .destroy )
583
606
self ._effects .append (_handle_error )
584
607
585
- async def _append_message_stream (self , message : AsyncIterable [Any ]):
608
+ async def _append_message_stream (
609
+ self ,
610
+ message : AsyncIterable [Any ],
611
+ normalizer : Callable [[object ], str ] | None = None ,
612
+ ) -> None :
586
613
id = _utils .private_random_id ()
587
614
588
615
empty = ChatMessage (content = "" , role = "assistant" )
589
616
await self ._append_message (empty , chunk = "start" , stream_id = id )
590
617
591
618
try :
592
619
async for msg in message :
593
- await self ._append_message (msg , chunk = True , stream_id = id )
620
+ await self ._append_message (
621
+ msg , chunk = True , stream_id = id , normalizer = normalizer
622
+ )
594
623
finally :
595
624
await self ._append_message (empty , chunk = "end" , stream_id = id )
596
625
await self ._flush_pending_messages ()
0 commit comments