1
- use std:: time:: Instant ;
1
+ use std:: time:: {
2
+ Duration ,
3
+ Instant ,
4
+ } ;
2
5
3
6
use eyre:: Result ;
4
7
use fig_api_client:: clients:: SendMessageOutput ;
@@ -66,6 +69,7 @@ pub enum RecvErrorKind {
66
69
tool_use_id : String ,
67
70
name : String ,
68
71
message : Box < AssistantMessage > ,
72
+ time_elapsed : Duration ,
69
73
} ,
70
74
}
71
75
@@ -180,29 +184,29 @@ impl ResponseParser {
180
184
/// The arguments are the fields from the first [ChatResponseStream::ToolUseEvent] consumed.
181
185
async fn parse_tool_use ( & mut self , id : String , name : String ) -> Result < AssistantToolUse , RecvError > {
182
186
let mut tool_string = String :: new ( ) ;
183
- let mut stop_seen = false ;
184
187
let start = Instant :: now ( ) ;
185
188
while let Some ( ChatResponseStream :: ToolUseEvent { .. } ) = self . peek ( ) . await ? {
186
189
if let Some ( ChatResponseStream :: ToolUseEvent { input, stop, .. } ) = self . next ( ) . await ? {
187
190
if let Some ( i) = input {
188
191
tool_string. push_str ( & i) ;
189
192
}
190
193
if let Some ( true ) = stop {
191
- stop_seen = true ;
192
194
break ;
193
195
}
194
196
}
195
197
}
196
198
let args = match serde_json:: from_str ( & tool_string) {
197
199
Ok ( args) => args,
198
200
Err ( err) => {
199
- // If the stream ended before we saw the final tool use event (and thus failed
200
- // deserializing the tool use), this is most likely due to the backend dropping the
201
- // connection. The tool was too large!
202
- if self . peek ( ) . await ?. is_none ( ) && !stop_seen {
201
+ // If we failed deserializing after waiting for a long time, then this is most
202
+ // likely bedrock responding with a stop event for some reason without actually
203
+ // including the tool contents. Essentially, the tool was too large.
204
+ // Timeouts have been seen as short as ~1 minute, so setting the time to 30.
205
+ let time_elapsed = start. elapsed ( ) ;
206
+ if self . peek ( ) . await ?. is_none ( ) && time_elapsed > Duration :: from_secs ( 30 ) {
203
207
error ! (
204
208
"Received an unexpected end of stream after spending ~{}s receiving tool events" ,
205
- Instant :: now ( ) . duration_since ( start ) . as_secs_f64( )
209
+ time_elapsed . as_secs_f64( )
206
210
) ;
207
211
self . tool_uses . push ( AssistantToolUse {
208
212
id : id. clone ( ) ,
@@ -211,18 +215,14 @@ impl ResponseParser {
211
215
[ (
212
216
"key" . to_string ( ) ,
213
217
serde_json:: Value :: String (
214
- "fake tool use args - actual tool use was too large to include" . to_string ( ) ,
218
+ "WARNING: the actual tool use arguments were too complicated to be generated"
219
+ . to_string ( ) ,
215
220
) ,
216
221
) ]
217
222
. into_iter ( )
218
223
. collect ( ) ,
219
224
) ,
220
225
} ) ;
221
- // let message = Box::new(AssistantMessage {
222
- // message_id: Some(self.message_id.clone()),
223
- // content: std::mem::take(&mut self.assistant_text),
224
- // tool_uses: Some(self.tool_uses.clone().into_iter().map(Into::into).collect()),
225
- // });
226
226
let message = Box :: new ( AssistantMessage :: new_tool_use (
227
227
Some ( self . message_id . clone ( ) ) ,
228
228
std:: mem:: take ( & mut self . assistant_text ) ,
@@ -232,6 +232,7 @@ impl ResponseParser {
232
232
tool_use_id : id,
233
233
name,
234
234
message,
235
+ time_elapsed,
235
236
} ) ) ;
236
237
} else {
237
238
return Err ( self . error ( err) ) ;
0 commit comments