@@ -182,6 +182,7 @@ where
182
182
}
183
183
184
184
const LIVELINESS_TIMEOUT : Duration = Duration :: from_secs ( 60 ) ;
185
+ const SEND_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
185
186
186
187
async fn ws_client_actor ( client : ClientConnection , ws : WebSocketStream , sendrx : MeteredReceiver < SerializableMessage > ) {
187
188
// ensure that even if this task gets cancelled, we always cleanup the connection
@@ -266,12 +267,12 @@ async fn ws_client_actor_inner(
266
267
Some ( Ok ( m) ) => Item :: Message ( ClientMessage :: from_message( m) ) ,
267
268
Some ( Err ( error) ) => {
268
269
log:: warn!( "Websocket receive error: {}" , error) ;
269
- continue ;
270
+ break ;
270
271
}
271
272
// the client sent us a close frame
272
273
None => {
273
- break
274
- } ,
274
+ break ;
275
+ }
275
276
} ,
276
277
277
278
// If we have an outgoing message to send, send it off.
@@ -311,11 +312,29 @@ async fn ws_client_actor_inner(
311
312
// now we flush all the messages to the socket
312
313
( ws. flush( ) . await , msg_buffer)
313
314
} ;
315
+ // Build a future that both times out and drives the send.
316
+ //
317
+ // Note that if flushing cannot immediately complete for whatever reason,
318
+ // it will wait without polling the other futures in the `select!` arms.
319
+ // Among other things, this means our liveness tick will not be polled.
320
+ //
321
+ // To avoid waiting indefinitely, we wrap the send in a timeout.
322
+ // A timeout is treated as an unresponsive client and we drop the connection.
323
+ let send_all = tokio:: time:: timeout( SEND_TIMEOUT , send_all) ;
314
324
// Flush the websocket while continuing to poll the `handle_queue`,
315
325
// to avoid deadlocks or delays due to enqueued futures holding resources.
316
326
let send_all = also_poll( send_all, make_progress( & mut current_message) ) ;
317
327
let t1 = Instant :: now( ) ;
318
- let ( send_all_result, buf) = send_all. await ;
328
+ let ( send_all_result, buf) = match send_all. await {
329
+ Ok ( ( send_all_result, buf) ) => {
330
+ ( send_all_result, buf)
331
+ }
332
+ Err ( e) => {
333
+ // Our send timed out; drop client without trying to send them a Close
334
+ log:: warn!( "send_all timed out: {e}" ) ;
335
+ break ;
336
+ }
337
+ } ;
319
338
msg_buffer = buf;
320
339
if let Err ( error) = send_all_result {
321
340
log:: warn!( "Websocket send error: {error}" )
@@ -335,13 +354,33 @@ async fn ws_client_actor_inner(
335
354
Err ( NoSuchModule ) => {
336
355
// Send a close frame while continuing to poll the `handle_queue`,
337
356
// to avoid deadlocks or delays due to enqueued futures holding resources.
338
- let close = also_poll(
339
- ws. close( Some ( CloseFrame { code: CloseCode :: Away , reason: "module exited" . into( ) } ) ) ,
340
- make_progress( & mut current_message) ,
341
- ) ;
342
- if let Err ( e) = close. await {
343
- log:: warn!( "error closing: {e:#}" )
344
- }
357
+ let close = ws. close( Some ( CloseFrame { code: CloseCode :: Away , reason: "module exited" . into( ) } ) ) ;
358
+ // Wrap the close in a timeout
359
+ let close = tokio:: time:: timeout( SEND_TIMEOUT , close) ;
360
+ match also_poll( close, make_progress( & mut current_message) ) . await {
361
+ Ok ( Err ( e) ) => {
362
+ log:: warn!( "error closing websocket: {e:#}" )
363
+ }
364
+ Err ( e) => {
365
+ // Our send timed out; drop client without trying to send them a Close.
366
+ //
367
+ // Is it correct to break if a reducer is still in progress?
368
+ // Answer: Yes it is.
369
+ //
370
+ // If a reducer is currently being executed,
371
+ // we are waiting for the `current_message` future to complete.
372
+ // When we break, the task completes and this future is dropped.
373
+ //
374
+ // Notably though the reducer itself will run to completion,
375
+ // however when it tries to notify this task that it is done,
376
+ // it will encounter a closed sender in `JobThread::run`,
377
+ // dropping the value that it's trying to send.
378
+ // In particular it will not throw an error or panic.
379
+ log:: warn!( "websocket close timed out: {e}" ) ;
380
+ break ;
381
+ }
382
+ _ => { }
383
+ } ;
345
384
closed = true ;
346
385
}
347
386
}
@@ -352,10 +391,29 @@ async fn ws_client_actor_inner(
352
391
_ = liveness_check_interval. tick( ) => {
353
392
// If we received a pong at some point, send a fresh ping.
354
393
if mem:: take( & mut got_pong) {
394
+ // Build a future that both times out and drives the send.
395
+ //
396
+ // Note that if the send cannot immediately complete for whatever reason,
397
+ // it will wait without polling the other futures in the `select!` arms.
398
+ // Among other things, this means we won't poll the websocket for a Close frame.
399
+ //
400
+ // To avoid waiting indefinitely, we wrap the ping in a timeout.
401
+ // A timeout is treated as an unresponsive client and we drop the connection.
402
+ let ping = ws. send( WsMessage :: Ping ( Bytes :: new( ) ) ) ;
403
+ let ping_with_timeout = tokio:: time:: timeout( SEND_TIMEOUT , ping) ;
404
+
355
405
// Send a ping message while continuing to poll the `handle_queue`,
356
406
// to avoid deadlocks or delays due to enqueued futures holding resources.
357
- if let Err ( e) = also_poll( ws. send( WsMessage :: Ping ( Bytes :: new( ) ) ) , make_progress( & mut current_message) ) . await {
358
- log:: warn!( "error sending ping: {e:#}" ) ;
407
+ match also_poll( ping_with_timeout, make_progress( & mut current_message) ) . await {
408
+ Ok ( Err ( e) ) => {
409
+ log:: warn!( "error sending ping: {e:#}" ) ;
410
+ }
411
+ Err ( e) => {
412
+ // Our ping timed out; drop them without trying to send them a Close
413
+ log:: warn!( "ping timed out after: {e}" ) ;
414
+ break ;
415
+ }
416
+ _ => { }
359
417
}
360
418
continue ;
361
419
} else {
@@ -380,13 +438,22 @@ async fn ws_client_actor_inner(
380
438
Item :: HandleResult ( res) => {
381
439
if let Err ( e) = res {
382
440
if let MessageHandleError :: Execution ( err) = e {
383
- log:: error!( "{err:#}" ) ;
441
+ log:: error!( "reducer execution error: {err:#}" ) ;
384
442
// Serialize the message and keep a handle to the buffer.
385
443
let ( msg_alloc, msg_data) = serialize ( msg_buffer, err, client. config ) ;
386
444
387
- // Buffer the message without necessarily sending it.
388
- if let Err ( error) = ws. send ( datamsg_to_wsmsg ( msg_data) ) . await {
389
- log:: warn!( "Websocket send error: {error}" )
445
+ let send = async { ws. send ( datamsg_to_wsmsg ( msg_data) ) . await } ;
446
+ let send = tokio:: time:: timeout ( SEND_TIMEOUT , send) ;
447
+
448
+ match send. await {
449
+ Ok ( Err ( error) ) => {
450
+ log:: warn!( "Websocket send error: {error}" )
451
+ }
452
+ Err ( error) => {
453
+ log:: warn!( "send timed out after: {error}" ) ;
454
+ break ;
455
+ }
456
+ _ => { }
390
457
}
391
458
392
459
// At this point,
@@ -399,16 +466,23 @@ async fn ws_client_actor_inner(
399
466
400
467
continue ;
401
468
}
402
- log:: debug!( "Client caused error on text message: {}" , e) ;
403
- if let Err ( e) = ws
404
- . close ( Some ( CloseFrame {
405
- code : CloseCode :: Error ,
406
- reason : format ! ( "{e:#}" ) . into ( ) ,
407
- } ) )
408
- . await
409
- {
410
- log:: warn!( "error closing websocket: {e:#}" )
411
- } ;
469
+ log:: warn!( "Client caused error on text message: {}" , e) ;
470
+ let close = ws. close ( Some ( CloseFrame {
471
+ code : CloseCode :: Error ,
472
+ reason : format ! ( "{e:#}" ) . into ( ) ,
473
+ } ) ) ;
474
+
475
+ // Wrap the close in a timeout
476
+ match tokio:: time:: timeout ( SEND_TIMEOUT , close) . await {
477
+ Ok ( Err ( e) ) => {
478
+ log:: warn!( "error closing websocket: {e:#}" )
479
+ }
480
+ Err ( e) => {
481
+ log:: warn!( "send timed out after: {e}" ) ;
482
+ break ;
483
+ }
484
+ _ => { }
485
+ }
412
486
}
413
487
}
414
488
Item :: Message ( ClientMessage :: Ping ( _message) ) => {
@@ -439,6 +513,10 @@ async fn ws_client_actor_inner(
439
513
. with_label_values ( & addr)
440
514
. inc ( ) ;
441
515
}
516
+
517
+ // Can't we just break out of the loop here?
518
+ // Not, if we want tungstenite to send a close frame back to the client.
519
+ // That will only happen once `ws.next()` returns `None`.
442
520
closed = true ;
443
521
}
444
522
}
0 commit comments