@@ -2,6 +2,8 @@ use std::sync::Arc;
2
2
use std:: time:: {
3
3
Duration ,
4
4
Instant ,
5
+ SystemTime ,
6
+ UNIX_EPOCH ,
5
7
} ;
6
8
7
9
use eyre:: Result ;
@@ -37,8 +39,42 @@ use crate::api_client::{
37
39
ApiClientError ,
38
40
} ;
39
41
use crate :: telemetry:: ReasonCode ;
40
- use crate :: telemetry:: core:: ChatConversationType ;
42
+ use crate :: telemetry:: core:: {
43
+ ChatConversationType ,
44
+ MessageMetaTag ,
45
+ } ;
46
+
47
+ #[ derive( Debug , Error ) ]
48
+ pub struct SendMessageError {
49
+ #[ source]
50
+ pub source : ApiClientError ,
51
+ pub request_metadata : RequestMetadata ,
52
+ }
53
+
54
+ impl SendMessageError {
55
+ pub fn status_code ( & self ) -> Option < u16 > {
56
+ self . source . status_code ( )
57
+ }
58
+ }
59
+
60
+ impl ReasonCode for SendMessageError {
61
+ fn reason_code ( & self ) -> String {
62
+ self . source . reason_code ( )
63
+ }
64
+ }
65
+
66
+ impl std:: fmt:: Display for SendMessageError {
67
+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
68
+ write ! ( f, "Failed to send the request: " ) ?;
69
+ if let Some ( request_id) = self . request_metadata . request_id . as_ref ( ) {
70
+ write ! ( f, "request_id: {}, error: " , request_id) ?;
71
+ }
72
+ write ! ( f, "{}" , self . source) ?;
73
+ Ok ( ( ) )
74
+ }
75
+ }
41
76
77
+ /// Errors associated with consuming the response stream.
42
78
#[ derive( Debug , Error ) ]
43
79
pub struct RecvError {
44
80
#[ source]
@@ -163,16 +199,36 @@ impl SendMessageStream {
163
199
client : & ApiClient ,
164
200
conversation_state : ConversationState ,
165
201
request_metadata_lock : Arc < Mutex < Option < RequestMetadata > > > ,
166
- ) -> Result < Self , ApiClientError > {
202
+ message_meta_tags : Option < Vec < MessageMetaTag > > ,
203
+ ) -> Result < Self , SendMessageError > {
167
204
let message_id = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
168
205
info ! ( ?message_id, "Generated new message id" ) ;
206
+ let user_prompt_length = conversation_state. user_input_message . content . len ( ) ;
207
+ let model_id = conversation_state. user_input_message . model_id . clone ( ) ;
208
+ let message_meta_tags = message_meta_tags. unwrap_or_default ( ) ;
169
209
170
210
let cancel_token = CancellationToken :: new ( ) ;
171
211
let cancel_token_clone = cancel_token. clone ( ) ;
172
212
173
213
let start_time = Instant :: now ( ) ;
214
+ let start_time_sys = SystemTime :: now ( ) ;
174
215
debug ! ( ?start_time, "sending send_message request" ) ;
175
- let response = client. send_message ( conversation_state) . await ?;
216
+ let response = client
217
+ . send_message ( conversation_state)
218
+ . await
219
+ . map_err ( |err| SendMessageError {
220
+ source : err,
221
+ request_metadata : RequestMetadata {
222
+ message_id : message_id. clone ( ) ,
223
+ request_start_timestamp_ms : system_time_to_unix_ms ( start_time_sys) ,
224
+ stream_end_timestamp_ms : system_time_to_unix_ms ( SystemTime :: now ( ) ) ,
225
+ model_id : model_id. clone ( ) ,
226
+ user_prompt_length,
227
+ message_meta_tags : message_meta_tags. clone ( ) ,
228
+ // Other fields are irrelevant if we can't get a successful response
229
+ ..Default :: default ( )
230
+ } ,
231
+ } ) ?;
176
232
let elapsed = start_time. elapsed ( ) ;
177
233
debug ! ( ?elapsed, "send_message succeeded" ) ;
178
234
@@ -182,8 +238,12 @@ impl SendMessageStream {
182
238
ResponseParser :: new (
183
239
response,
184
240
message_id,
241
+ model_id,
242
+ user_prompt_length,
243
+ message_meta_tags,
185
244
ev_tx,
186
245
start_time,
246
+ start_time_sys,
187
247
cancel_token_clone,
188
248
request_metadata_lock,
189
249
)
@@ -221,9 +281,8 @@ struct ResponseParser {
221
281
222
282
/// Message identifier for the assistant's response. Randomly generated on creation.
223
283
message_id : String ,
224
-
284
+ /// Whether or not the stream has completed.
225
285
ended : bool ,
226
-
227
286
/// Buffer to hold the next event in [SendMessageOutput].
228
287
peek : Option < ChatResponseStream > ,
229
288
/// Buffer for holding the accumulated assistant response.
@@ -238,33 +297,50 @@ struct ResponseParser {
238
297
cancel_token : CancellationToken ,
239
298
240
299
// metadata fields
241
- /// Time immediately after sending the request.
242
- start_time : Instant ,
300
+ /// Id of the model used with this request.
301
+ model_id : Option < String > ,
302
+ /// Length of the user prompt for the initial request.
303
+ user_prompt_length : usize ,
304
+ /// Meta tags for the initial request.
305
+ message_meta_tags : Vec < MessageMetaTag > ,
306
+ /// Time immediately before sending the request.
307
+ request_start_time : Instant ,
308
+ /// Time immediately before sending the request, as a [SystemTime].
309
+ request_start_time_sys : SystemTime ,
243
310
/// Total size (in bytes) of the response received so far.
244
311
received_response_size : usize ,
245
312
time_to_first_chunk : Option < Duration > ,
246
313
time_between_chunks : Vec < Duration > ,
247
314
}
248
315
249
316
impl ResponseParser {
317
+ #[ allow( clippy:: too_many_arguments) ]
250
318
fn new (
251
319
response : SendMessageOutput ,
252
320
message_id : String ,
321
+ model_id : Option < String > ,
322
+ user_prompt_length : usize ,
323
+ message_meta_tags : Vec < MessageMetaTag > ,
253
324
event_tx : mpsc:: Sender < Result < ResponseEvent , RecvError > > ,
254
- start_time : Instant ,
325
+ request_start_time : Instant ,
326
+ request_start_time_sys : SystemTime ,
255
327
cancel_token : CancellationToken ,
256
328
request_metadata : Arc < Mutex < Option < RequestMetadata > > > ,
257
329
) -> Self {
258
330
Self {
259
331
response,
260
332
message_id,
333
+ model_id,
334
+ user_prompt_length,
335
+ message_meta_tags,
261
336
ended : false ,
262
337
event_tx,
263
338
peek : None ,
264
339
assistant_text : String :: new ( ) ,
265
340
tool_uses : Vec :: new ( ) ,
266
341
parsing_tool_use : None ,
267
- start_time,
342
+ request_start_time,
343
+ request_start_time_sys,
268
344
received_response_size : 0 ,
269
345
time_to_first_chunk : None ,
270
346
time_between_chunks : Vec :: new ( ) ,
@@ -481,7 +557,7 @@ impl ResponseParser {
481
557
482
558
// Track metadata about the chunk.
483
559
self . time_to_first_chunk
484
- . get_or_insert_with ( || self . start_time . elapsed ( ) ) ;
560
+ . get_or_insert_with ( || self . request_start_time . elapsed ( ) ) ;
485
561
self . time_between_chunks . push ( duration) ;
486
562
if let Some ( r) = ev. as_ref ( ) {
487
563
match r {
@@ -510,10 +586,6 @@ impl ResponseParser {
510
586
}
511
587
}
512
588
513
- fn request_id ( & self ) -> Option < & str > {
514
- self . response . request_id ( )
515
- }
516
-
517
589
/// Helper to create a new [RecvError] populated with the associated request id for the stream.
518
590
fn error ( & self , source : impl Into < RecvErrorKind > ) -> RecvError {
519
591
RecvError {
@@ -524,11 +596,24 @@ impl ResponseParser {
524
596
525
597
fn make_metadata ( & self , chat_conversation_type : Option < ChatConversationType > ) -> RequestMetadata {
526
598
RequestMetadata {
527
- request_id : self . request_id ( ) . map ( String :: from) ,
599
+ request_id : self . response . request_id ( ) . map ( String :: from) ,
600
+ message_id : self . message_id . clone ( ) ,
528
601
time_to_first_chunk : self . time_to_first_chunk ,
529
602
time_between_chunks : self . time_between_chunks . clone ( ) ,
530
603
response_size : self . received_response_size ,
531
604
chat_conversation_type,
605
+ request_start_timestamp_ms : system_time_to_unix_ms ( self . request_start_time_sys ) ,
606
+ // We always end the stream when this method is called, so just set the end timestamp
607
+ // here.
608
+ stream_end_timestamp_ms : system_time_to_unix_ms ( SystemTime :: now ( ) ) ,
609
+ user_prompt_length : self . user_prompt_length ,
610
+ message_meta_tags : self . message_meta_tags . clone ( ) ,
611
+ tool_use_ids_and_names : self
612
+ . tool_uses
613
+ . iter ( )
614
+ . map ( |t| ( t. id . clone ( ) , t. name . clone ( ) ) )
615
+ . collect :: < _ > ( ) ,
616
+ model_id : self . model_id . clone ( ) ,
532
617
}
533
618
}
534
619
}
@@ -554,18 +639,40 @@ pub enum ResponseEvent {
554
639
}
555
640
556
641
/// Metadata about the sent request and associated response stream.
557
- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
642
+ #[ derive( Debug , Clone , Serialize , Deserialize , Default ) ]
558
643
pub struct RequestMetadata {
559
644
/// The request id associated with the [SendMessageOutput] stream.
560
645
pub request_id : Option < String > ,
646
+ /// The randomly-generated id associated with the request. Equivalent to utterance id.
647
+ pub message_id : String ,
648
+ /// Unix timestamp (milliseconds) immediately before sending the request.
649
+ pub request_start_timestamp_ms : u64 ,
650
+ /// Unix timestamp (milliseconds) once the stream has either completed or ended in an error.
651
+ pub stream_end_timestamp_ms : u64 ,
561
652
/// Time until the first chunk was received.
562
653
pub time_to_first_chunk : Option < Duration > ,
563
654
/// Time between each received chunk in the stream.
564
655
pub time_between_chunks : Vec < Duration > ,
656
+ /// Total size (in bytes) of the user prompt associated with the request.
657
+ pub user_prompt_length : usize ,
565
658
/// Total size (in bytes) of the response.
566
659
pub response_size : usize ,
567
660
/// [ChatConversationType] for the returned assistant message.
568
661
pub chat_conversation_type : Option < ChatConversationType > ,
662
+ /// Tool uses returned by the assistant for this request.
663
+ pub tool_use_ids_and_names : Vec < ( String , String ) > ,
664
+ /// Model id.
665
+ pub model_id : Option < String > ,
666
+ /// Meta tags for the request.
667
+ pub message_meta_tags : Vec < MessageMetaTag > ,
668
+ }
669
+
670
+ fn system_time_to_unix_ms ( time : SystemTime ) -> u64 {
671
+ ( time
672
+ . duration_since ( UNIX_EPOCH )
673
+ . expect ( "time should never be before unix epoch" )
674
+ . as_secs_f64 ( )
675
+ * 1000.0 ) as u64
569
676
}
570
677
571
678
#[ cfg( test) ]
@@ -623,8 +730,12 @@ mod tests {
623
730
let mut parser = ResponseParser :: new (
624
731
mock,
625
732
"" . to_string ( ) ,
733
+ None ,
734
+ 1 ,
735
+ vec ! [ ] ,
626
736
mpsc:: channel ( 32 ) . 0 ,
627
737
Instant :: now ( ) ,
738
+ SystemTime :: now ( ) ,
628
739
CancellationToken :: new ( ) ,
629
740
Arc :: new ( Mutex :: new ( None ) ) ,
630
741
) ;
0 commit comments