28
28
import java .util .concurrent .ConcurrentHashMap ;
29
29
import java .util .function .Consumer ;
30
30
31
- import reactor .core .publisher .Flux ;
32
-
33
- import org .springframework .ai .model .Media ;
31
+ import org .springframework .ai .chat .client .observation .ChatClientObservationContext ;
32
+ import org .springframework .ai .chat .client .observation .ChatClientObservationConvention ;
33
+ import org .springframework .ai .chat .client .observation .ChatClientObservationDocumentation ;
34
+ import org .springframework .ai .chat .client .observation .DefaultChatClientObservationConvention ;
34
35
import org .springframework .ai .chat .messages .Message ;
35
36
import org .springframework .ai .chat .messages .SystemMessage ;
36
37
import org .springframework .ai .chat .messages .UserMessage ;
42
43
import org .springframework .ai .chat .prompt .PromptTemplate ;
43
44
import org .springframework .ai .converter .BeanOutputConverter ;
44
45
import org .springframework .ai .converter .StructuredOutputConverter ;
46
+ import org .springframework .ai .model .Media ;
45
47
import org .springframework .ai .model .function .FunctionCallback ;
46
48
import org .springframework .ai .model .function .FunctionCallbackWrapper ;
47
49
import org .springframework .ai .model .function .FunctionCallingOptions ;
52
54
import org .springframework .util .MimeType ;
53
55
import org .springframework .util .StringUtils ;
54
56
57
+ import io .micrometer .observation .Observation ;
58
+ import io .micrometer .observation .ObservationRegistry ;
59
+ import io .micrometer .observation .contextpropagation .ObservationThreadLocalAccessor ;
60
+ import reactor .core .publisher .Flux ;
61
+
55
62
/**
56
63
* The default implementation of {@link ChatClient} as created by the
57
64
* {@link Builder#build()} } method.
65
72
*/
66
73
public class DefaultChatClient implements ChatClient {
67
74
75
+ private static final ChatClientObservationConvention DEFAULT_CHAT_CLIENT_OBSERVATION_CONVENTION = new DefaultChatClientObservationConvention ();
76
+
68
77
private final ChatModel chatModel ;
69
78
70
79
private final DefaultChatClientRequestSpec defaultChatClientRequest ;
@@ -281,7 +290,7 @@ public <T> ResponseEntity<ChatResponse, T> responseEntity(
281
290
}
282
291
283
292
protected <T > ResponseEntity <ChatResponse , T > doResponseEntity (StructuredOutputConverter <T > boc ) {
284
- var chatResponse = doGetChatResponse (this .request , boc .getFormat ());
293
+ var chatResponse = doGetObservableChatResponse (this .request , boc .getFormat ());
285
294
var responseContent = chatResponse .getResult ().getOutput ().getContent ();
286
295
T entity = boc .convert (responseContent );
287
296
@@ -297,7 +306,7 @@ public <T> T entity(StructuredOutputConverter<T> structuredOutputConverter) {
297
306
}
298
307
299
308
private <T > T doSingleWithBeanOutputConverter (StructuredOutputConverter <T > boc ) {
300
- var chatResponse = doGetChatResponse (this .request , boc .getFormat ());
309
+ var chatResponse = doGetObservableChatResponse (this .request , boc .getFormat ());
301
310
var stringResponse = chatResponse .getResult ().getOutput ().getContent ();
302
311
return boc .convert (stringResponse );
303
312
}
@@ -309,7 +318,23 @@ public <T> T entity(Class<T> type) {
309
318
}
310
319
311
320
private ChatResponse doGetChatResponse () {
312
- return this .doGetChatResponse (this .request , "" );
321
+ return this .doGetObservableChatResponse (this .request , "" );
322
+ }
323
+
324
+ private ChatResponse doGetObservableChatResponse (DefaultChatClientRequestSpec inputRequest ,
325
+ String formatParam ) {
326
+
327
+ ChatClientObservationContext observationContext = new ChatClientObservationContext (inputRequest ,
328
+ formatParam , false );
329
+
330
+ return ChatClientObservationDocumentation .AI_CHAT_CLIENT
331
+ .observation (inputRequest .customObservationConvention , DEFAULT_CHAT_CLIENT_OBSERVATION_CONVENTION ,
332
+ () -> observationContext , inputRequest .observationRegistry )
333
+ .observe (() -> {
334
+ ChatResponse chatResponse = doGetChatResponse (inputRequest , formatParam );
335
+ return chatResponse ;
336
+ });
337
+
313
338
}
314
339
315
340
private ChatResponse doGetChatResponse (DefaultChatClientRequestSpec inputRequest , String formatParam ) {
@@ -395,6 +420,29 @@ public DefaultStreamResponseSpec(ChatModel chatModel, DefaultChatClientRequestSp
395
420
}
396
421
397
422
private Flux <ChatResponse > doGetFluxChatResponse (DefaultChatClientRequestSpec inputRequest ) {
423
+ return Flux .deferContextual (contextView -> {
424
+ ChatClientObservationContext observationContext = new ChatClientObservationContext (inputRequest , "" ,
425
+ true );
426
+
427
+ Observation observation = ChatClientObservationDocumentation .AI_CHAT_CLIENT .observation (
428
+ inputRequest .customObservationConvention , DEFAULT_CHAT_CLIENT_OBSERVATION_CONVENTION ,
429
+ () -> observationContext , inputRequest .observationRegistry );
430
+
431
+ observation .parentObservation (contextView .getOrDefault (ObservationThreadLocalAccessor .KEY , null ))
432
+ .start ();
433
+
434
+ // @formatter:off
435
+ return doGetFluxChatResponse2 (inputRequest )
436
+ .doOnError (observation ::error )
437
+ .doFinally (s -> {
438
+ observation .stop ();
439
+ })
440
+ .contextWrite (ctx -> ctx .put (ObservationThreadLocalAccessor .KEY , observation ));
441
+ // @formatter:on
442
+ });
443
+ }
444
+
445
+ private Flux <ChatResponse > doGetFluxChatResponse2 (DefaultChatClientRequestSpec inputRequest ) {
398
446
399
447
Map <String , Object > context = new ConcurrentHashMap <>();
400
448
context .putAll (inputRequest .getAdvisorParams ());
@@ -426,9 +474,7 @@ private Flux<ChatResponse> doGetFluxChatResponse(DefaultChatClientRequestSpec in
426
474
messages .add (userMessage );
427
475
}
428
476
429
- if (advisedRequest .getChatOptions () instanceof
430
-
431
- FunctionCallingOptions functionCallingOptions ) {
477
+ if (advisedRequest .getChatOptions () instanceof FunctionCallingOptions functionCallingOptions ) {
432
478
if (!advisedRequest .getFunctionNames ().isEmpty ()) {
433
479
functionCallingOptions .setFunctions (new HashSet <>(advisedRequest .getFunctionNames ()));
434
480
}
@@ -470,6 +516,10 @@ public Flux<String> content() {
470
516
471
517
public static class DefaultChatClientRequestSpec implements ChatClientRequestSpec {
472
518
519
+ private final ObservationRegistry observationRegistry ;
520
+
521
+ private final ChatClientObservationConvention customObservationConvention ;
522
+
473
523
private final ChatModel chatModel ;
474
524
475
525
private String userText = "" ;
@@ -494,6 +544,14 @@ public static class DefaultChatClientRequestSpec implements ChatClientRequestSpe
494
544
495
545
private final Map <String , Object > advisorParams = new HashMap <>();
496
546
547
+ private ObservationRegistry getObservationRegistry () {
548
+ return observationRegistry ;
549
+ }
550
+
551
+ private ChatClientObservationConvention getCustomObservationConvention () {
552
+ return customObservationConvention ;
553
+ }
554
+
497
555
public String getUserText () {
498
556
return userText ;
499
557
}
@@ -541,13 +599,15 @@ public List<FunctionCallback> getFunctionCallbacks() {
541
599
/* copy constructor */
542
600
DefaultChatClientRequestSpec (DefaultChatClientRequestSpec ccr ) {
543
601
this (ccr .chatModel , ccr .userText , ccr .userParams , ccr .systemText , ccr .systemParams , ccr .functionCallbacks ,
544
- ccr .messages , ccr .functionNames , ccr .media , ccr .chatOptions , ccr .advisors , ccr .advisorParams );
602
+ ccr .messages , ccr .functionNames , ccr .media , ccr .chatOptions , ccr .advisors , ccr .advisorParams ,
603
+ ccr .observationRegistry , ccr .customObservationConvention );
545
604
}
546
605
547
606
public DefaultChatClientRequestSpec (ChatModel chatModel , String userText , Map <String , Object > userParams ,
548
607
String systemText , Map <String , Object > systemParams , List <FunctionCallback > functionCallbacks ,
549
608
List <Message > messages , List <String > functionNames , List <Media > media , ChatOptions chatOptions ,
550
- List <RequestResponseAdvisor > advisors , Map <String , Object > advisorParams ) {
609
+ List <RequestResponseAdvisor > advisors , Map <String , Object > advisorParams ,
610
+ ObservationRegistry observationRegistry , ChatClientObservationConvention customObservationConvention ) {
551
611
552
612
this .chatModel = chatModel ;
553
613
this .chatOptions = chatOptions != null ? chatOptions .copy ()
@@ -564,14 +624,17 @@ public DefaultChatClientRequestSpec(ChatModel chatModel, String userText, Map<St
564
624
this .media .addAll (media );
565
625
this .advisors .addAll (advisors );
566
626
this .advisorParams .putAll (advisorParams );
627
+ this .observationRegistry = observationRegistry ;
628
+ this .customObservationConvention = customObservationConvention ;
567
629
}
568
630
569
631
/**
570
632
* Return a {@code ChatClient2Builder} to create a new {@code ChatClient2} whose
571
633
* settings are replicated from this {@code ChatClientRequest}.
572
634
*/
573
635
public Builder mutate () {
574
- DefaultChatClientBuilder builder = (DefaultChatClientBuilder ) ChatClient .builder (chatModel )
636
+ DefaultChatClientBuilder builder = (DefaultChatClientBuilder ) ChatClient
637
+ .builder (chatModel , this .observationRegistry , this .customObservationConvention )
575
638
.defaultSystem (s -> s .text (this .systemText ).params (this .systemParams ))
576
639
.defaultUser (u -> u .text (this .userText )
577
640
.params (this .userParams )
@@ -756,7 +819,8 @@ public static DefaultChatClientRequestSpec adviseOnRequest(DefaultChatClientRequ
756
819
adviseRequest .userParams (), adviseRequest .systemText (), adviseRequest .systemParams (),
757
820
adviseRequest .functionCallbacks (), adviseRequest .messages (), adviseRequest .functionNames (),
758
821
adviseRequest .media (), adviseRequest .chatOptions (), adviseRequest .advisors (),
759
- adviseRequest .advisorParams ());
822
+ adviseRequest .advisorParams (), inputRequest .getObservationRegistry (),
823
+ inputRequest .getCustomObservationConvention ());
760
824
}
761
825
762
826
return advisedRequest ;
0 commit comments