@@ -2,6 +2,8 @@ use std::sync::Arc;
2
2
use std:: time:: {
3
3
Duration ,
4
4
Instant ,
5
+ SystemTime ,
6
+ UNIX_EPOCH ,
5
7
} ;
6
8
7
9
use eyre:: Result ;
@@ -37,8 +39,43 @@ use crate::api_client::{
37
39
ApiClientError ,
38
40
} ;
39
41
use crate :: telemetry:: ReasonCode ;
40
- use crate :: telemetry:: core:: ChatConversationType ;
42
+ use crate :: telemetry:: core:: {
43
+ ChatConversationType ,
44
+ MessageMetaTag ,
45
+ } ;
46
+
47
+ /// Error from sending a SendMessage request.
48
+ #[ derive( Debug , Error ) ]
49
+ pub struct SendMessageError {
50
+ #[ source]
51
+ pub source : ApiClientError ,
52
+ pub request_metadata : RequestMetadata ,
53
+ }
54
+
55
+ impl SendMessageError {
56
+ pub fn status_code ( & self ) -> Option < u16 > {
57
+ self . source . status_code ( )
58
+ }
59
+ }
60
+
61
+ impl ReasonCode for SendMessageError {
62
+ fn reason_code ( & self ) -> String {
63
+ self . source . reason_code ( )
64
+ }
65
+ }
66
+
67
+ impl std:: fmt:: Display for SendMessageError {
68
+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
69
+ write ! ( f, "Failed to send the request: " ) ?;
70
+ if let Some ( request_id) = self . request_metadata . request_id . as_ref ( ) {
71
+ write ! ( f, "request_id: {}, error: " , request_id) ?;
72
+ }
73
+ write ! ( f, "{}" , self . source) ?;
74
+ Ok ( ( ) )
75
+ }
76
+ }
41
77
78
+ /// Errors associated with consuming the response stream.
42
79
#[ derive( Debug , Error ) ]
43
80
pub struct RecvError {
44
81
#[ source]
@@ -163,16 +200,36 @@ impl SendMessageStream {
163
200
client : & ApiClient ,
164
201
conversation_state : ConversationState ,
165
202
request_metadata_lock : Arc < Mutex < Option < RequestMetadata > > > ,
166
- ) -> Result < Self , ApiClientError > {
203
+ message_meta_tags : Option < Vec < MessageMetaTag > > ,
204
+ ) -> Result < Self , SendMessageError > {
167
205
let message_id = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
168
206
info ! ( ?message_id, "Generated new message id" ) ;
207
+ let user_prompt_length = conversation_state. user_input_message . content . len ( ) ;
208
+ let model_id = conversation_state. user_input_message . model_id . clone ( ) ;
209
+ let message_meta_tags = message_meta_tags. unwrap_or_default ( ) ;
169
210
170
211
let cancel_token = CancellationToken :: new ( ) ;
171
212
let cancel_token_clone = cancel_token. clone ( ) ;
172
213
173
214
let start_time = Instant :: now ( ) ;
215
+ let start_time_sys = SystemTime :: now ( ) ;
174
216
debug ! ( ?start_time, "sending send_message request" ) ;
175
- let response = client. send_message ( conversation_state) . await ?;
217
+ let response = client
218
+ . send_message ( conversation_state)
219
+ . await
220
+ . map_err ( |err| SendMessageError {
221
+ source : err,
222
+ request_metadata : RequestMetadata {
223
+ message_id : message_id. clone ( ) ,
224
+ request_start_timestamp_ms : system_time_to_unix_ms ( start_time_sys) ,
225
+ stream_end_timestamp_ms : system_time_to_unix_ms ( SystemTime :: now ( ) ) ,
226
+ model_id : model_id. clone ( ) ,
227
+ user_prompt_length,
228
+ message_meta_tags : message_meta_tags. clone ( ) ,
229
+ // Other fields are irrelevant if we can't get a successful response
230
+ ..Default :: default ( )
231
+ } ,
232
+ } ) ?;
176
233
let elapsed = start_time. elapsed ( ) ;
177
234
debug ! ( ?elapsed, "send_message succeeded" ) ;
178
235
@@ -182,8 +239,12 @@ impl SendMessageStream {
182
239
ResponseParser :: new (
183
240
response,
184
241
message_id,
242
+ model_id,
243
+ user_prompt_length,
244
+ message_meta_tags,
185
245
ev_tx,
186
246
start_time,
247
+ start_time_sys,
187
248
cancel_token_clone,
188
249
request_metadata_lock,
189
250
)
@@ -221,9 +282,8 @@ struct ResponseParser {
221
282
222
283
/// Message identifier for the assistant's response. Randomly generated on creation.
223
284
message_id : String ,
224
-
285
+ /// Whether or not the stream has completed.
225
286
ended : bool ,
226
-
227
287
/// Buffer to hold the next event in [SendMessageOutput].
228
288
peek : Option < ChatResponseStream > ,
229
289
/// Buffer for holding the accumulated assistant response.
@@ -238,33 +298,50 @@ struct ResponseParser {
238
298
cancel_token : CancellationToken ,
239
299
240
300
// metadata fields
241
- /// Time immediately after sending the request.
242
- start_time : Instant ,
301
+ /// Id of the model used with this request.
302
+ model_id : Option < String > ,
303
+ /// Length of the user prompt for the initial request.
304
+ user_prompt_length : usize ,
305
+ /// Meta tags for the initial request.
306
+ message_meta_tags : Vec < MessageMetaTag > ,
307
+ /// Time immediately before sending the request.
308
+ request_start_time : Instant ,
309
+ /// Time immediately before sending the request, as a [SystemTime].
310
+ request_start_time_sys : SystemTime ,
243
311
/// Total size (in bytes) of the response received so far.
244
312
received_response_size : usize ,
245
313
time_to_first_chunk : Option < Duration > ,
246
314
time_between_chunks : Vec < Duration > ,
247
315
}
248
316
249
317
impl ResponseParser {
318
+ #[ allow( clippy:: too_many_arguments) ]
250
319
fn new (
251
320
response : SendMessageOutput ,
252
321
message_id : String ,
322
+ model_id : Option < String > ,
323
+ user_prompt_length : usize ,
324
+ message_meta_tags : Vec < MessageMetaTag > ,
253
325
event_tx : mpsc:: Sender < Result < ResponseEvent , RecvError > > ,
254
- start_time : Instant ,
326
+ request_start_time : Instant ,
327
+ request_start_time_sys : SystemTime ,
255
328
cancel_token : CancellationToken ,
256
329
request_metadata : Arc < Mutex < Option < RequestMetadata > > > ,
257
330
) -> Self {
258
331
Self {
259
332
response,
260
333
message_id,
334
+ model_id,
335
+ user_prompt_length,
336
+ message_meta_tags,
261
337
ended : false ,
262
338
event_tx,
263
339
peek : None ,
264
340
assistant_text : String :: new ( ) ,
265
341
tool_uses : Vec :: new ( ) ,
266
342
parsing_tool_use : None ,
267
- start_time,
343
+ request_start_time,
344
+ request_start_time_sys,
268
345
received_response_size : 0 ,
269
346
time_to_first_chunk : None ,
270
347
time_between_chunks : Vec :: new ( ) ,
@@ -481,7 +558,7 @@ impl ResponseParser {
481
558
482
559
// Track metadata about the chunk.
483
560
self . time_to_first_chunk
484
- . get_or_insert_with ( || self . start_time . elapsed ( ) ) ;
561
+ . get_or_insert_with ( || self . request_start_time . elapsed ( ) ) ;
485
562
self . time_between_chunks . push ( duration) ;
486
563
if let Some ( r) = ev. as_ref ( ) {
487
564
match r {
@@ -510,10 +587,6 @@ impl ResponseParser {
510
587
}
511
588
}
512
589
513
- fn request_id ( & self ) -> Option < & str > {
514
- self . response . request_id ( )
515
- }
516
-
517
590
/// Helper to create a new [RecvError] populated with the associated request id for the stream.
518
591
fn error ( & self , source : impl Into < RecvErrorKind > ) -> RecvError {
519
592
RecvError {
@@ -524,11 +597,24 @@ impl ResponseParser {
524
597
525
598
fn make_metadata ( & self , chat_conversation_type : Option < ChatConversationType > ) -> RequestMetadata {
526
599
RequestMetadata {
527
- request_id : self . request_id ( ) . map ( String :: from) ,
600
+ request_id : self . response . request_id ( ) . map ( String :: from) ,
601
+ message_id : self . message_id . clone ( ) ,
528
602
time_to_first_chunk : self . time_to_first_chunk ,
529
603
time_between_chunks : self . time_between_chunks . clone ( ) ,
530
604
response_size : self . received_response_size ,
531
605
chat_conversation_type,
606
+ request_start_timestamp_ms : system_time_to_unix_ms ( self . request_start_time_sys ) ,
607
+ // We always end the stream when this method is called, so just set the end timestamp
608
+ // here.
609
+ stream_end_timestamp_ms : system_time_to_unix_ms ( SystemTime :: now ( ) ) ,
610
+ user_prompt_length : self . user_prompt_length ,
611
+ message_meta_tags : self . message_meta_tags . clone ( ) ,
612
+ tool_use_ids_and_names : self
613
+ . tool_uses
614
+ . iter ( )
615
+ . map ( |t| ( t. id . clone ( ) , t. name . clone ( ) ) )
616
+ . collect :: < _ > ( ) ,
617
+ model_id : self . model_id . clone ( ) ,
532
618
}
533
619
}
534
620
}
@@ -554,18 +640,40 @@ pub enum ResponseEvent {
554
640
}
555
641
556
642
/// Metadata about the sent request and associated response stream.
557
- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
643
+ #[ derive( Debug , Clone , Serialize , Deserialize , Default ) ]
558
644
pub struct RequestMetadata {
559
645
/// The request id associated with the [SendMessageOutput] stream.
560
646
pub request_id : Option < String > ,
647
+ /// The randomly-generated id associated with the request. Equivalent to utterance id.
648
+ pub message_id : String ,
649
+ /// Unix timestamp (milliseconds) immediately before sending the request.
650
+ pub request_start_timestamp_ms : u64 ,
651
+ /// Unix timestamp (milliseconds) once the stream has either completed or ended in an error.
652
+ pub stream_end_timestamp_ms : u64 ,
561
653
/// Time until the first chunk was received.
562
654
pub time_to_first_chunk : Option < Duration > ,
563
655
/// Time between each received chunk in the stream.
564
656
pub time_between_chunks : Vec < Duration > ,
657
+ /// Total size (in bytes) of the user prompt associated with the request.
658
+ pub user_prompt_length : usize ,
565
659
/// Total size (in bytes) of the response.
566
660
pub response_size : usize ,
567
661
/// [ChatConversationType] for the returned assistant message.
568
662
pub chat_conversation_type : Option < ChatConversationType > ,
663
+ /// Tool uses returned by the assistant for this request.
664
+ pub tool_use_ids_and_names : Vec < ( String , String ) > ,
665
+ /// Model id.
666
+ pub model_id : Option < String > ,
667
+ /// Meta tags for the request.
668
+ pub message_meta_tags : Vec < MessageMetaTag > ,
669
+ }
670
+
671
+ fn system_time_to_unix_ms ( time : SystemTime ) -> u64 {
672
+ ( time
673
+ . duration_since ( UNIX_EPOCH )
674
+ . expect ( "time should never be before unix epoch" )
675
+ . as_secs_f64 ( )
676
+ * 1000.0 ) as u64
569
677
}
570
678
571
679
#[ cfg( test) ]
@@ -623,8 +731,12 @@ mod tests {
623
731
let mut parser = ResponseParser :: new (
624
732
mock,
625
733
"" . to_string ( ) ,
734
+ None ,
735
+ 1 ,
736
+ vec ! [ ] ,
626
737
mpsc:: channel ( 32 ) . 0 ,
627
738
Instant :: now ( ) ,
739
+ SystemTime :: now ( ) ,
628
740
CancellationToken :: new ( ) ,
629
741
Arc :: new ( Mutex :: new ( None ) ) ,
630
742
) ;
0 commit comments