@@ -20,7 +20,7 @@ use crate::downlink::failure::BadFrameResponse;
20
20
use crate :: timeout_coord:: { VoteResult , Voter } ;
21
21
use backpressure:: DownlinkBackpressure ;
22
22
use bitflags:: bitflags;
23
- use bytes:: BytesMut ;
23
+ use bytes:: { Bytes , BytesMut } ;
24
24
use futures:: future:: { join, select, Either } ;
25
25
use futures:: stream:: SelectAll ;
26
26
use futures:: { Future , FutureExt , Sink , SinkExt , Stream , StreamExt } ;
@@ -32,12 +32,12 @@ use swim_messages::protocol::{
32
32
RawResponseMessageDecoder , ResponseMessage ,
33
33
} ;
34
34
use swim_model:: address:: RelativeAddress ;
35
- use swim_model:: Text ;
35
+ use swim_model:: { BytesStr , Text } ;
36
36
use swim_utilities:: future:: { immediate_or_join, immediate_or_start, SecondaryResult } ;
37
37
use swim_utilities:: io:: byte_channel:: { ByteReader , ByteWriter } ;
38
38
use swim_utilities:: trigger;
39
39
use tokio:: sync:: mpsc;
40
- use tokio:: time:: { timeout, timeout_at , Instant } ;
40
+ use tokio:: time:: { timeout, Instant } ;
41
41
use tokio_stream:: wrappers:: ReceiverStream ;
42
42
use tokio_util:: codec:: { Decoder , Encoder , FramedRead , FramedWrite } ;
43
43
use tracing:: { error, info, info_span, trace, warn, Instrument } ;
@@ -421,7 +421,7 @@ async fn attach_task<F>(
421
421
}
422
422
423
423
#[ derive( Debug , PartialEq , Eq ) ]
424
- enum ReadTaskState {
424
+ enum ReadTaskDlState {
425
425
Init ,
426
426
Linked ,
427
427
Synced ,
@@ -519,6 +519,15 @@ impl<D: Decoder> Stream for DownlinkReceiver<D> {
519
519
}
520
520
}
521
521
522
+ enum ReadTaskEvent {
523
+ Message ( ResponseMessage < BytesStr , Bytes , Bytes > ) ,
524
+ ReadFailed ( std:: io:: Error ) ,
525
+ MessagesStopped ,
526
+ NewConsumer ( ByteWriter , DownlinkOptions ) ,
527
+ ConsumerChannelStopped ,
528
+ ConsumersTimedOut ,
529
+ }
530
+
522
531
/// Consumes incoming messages from the remote lane and passes them to the consumers.
523
532
async fn read_task < I , H > (
524
533
input : ByteReader ,
@@ -539,135 +548,190 @@ where
539
548
540
549
let mut consumer_stream = ReceiverStream :: new ( consumers) ;
541
550
542
- let mut state = ReadTaskState :: Init ;
551
+ let make_timeout = || tokio:: time:: sleep_until ( Instant :: now ( ) + config. empty_timeout ) ;
552
+ let mut task_state = pin ! ( Some ( make_timeout( ) ) ) ;
553
+ let mut dl_state = ReadTaskDlState :: Init ;
543
554
let mut current = BytesMut :: new ( ) ;
544
555
let mut awaiting_synced: Vec < DownlinkSender > = vec ! [ ] ;
545
556
let mut awaiting_linked: Vec < DownlinkSender > = vec ! [ ] ;
546
557
let mut registered: Vec < DownlinkSender > = vec ! [ ] ;
547
558
548
- let mut empty_timestamp = Some ( Instant :: now ( ) + config. empty_timeout ) ;
549
- let result = loop {
550
- let next_item = if let Some ( timestamp) = empty_timestamp {
551
- if voted {
552
- consumer_stream. next ( ) . await . map ( Either :: Right )
553
- } else if let Ok ( result) = timeout_at ( timestamp, consumer_stream. next ( ) ) . await {
554
- result. map ( Either :: Right )
555
- } else {
559
+ let result: Result < ( ) , H :: Report > = loop {
560
+ let ( event, is_active) = match task_state. as_mut ( ) . as_pin_mut ( ) {
561
+ Some ( sleep) if !voted => (
562
+ tokio:: select! {
563
+ biased;
564
+ _ = sleep => {
565
+ task_state. set( None ) ;
566
+ ReadTaskEvent :: ConsumersTimedOut
567
+ } ,
568
+ maybe_consumer = consumer_stream. next( ) => {
569
+ if let Some ( ( consumer, options) ) = maybe_consumer {
570
+ ReadTaskEvent :: NewConsumer ( consumer, options)
571
+ } else {
572
+ ReadTaskEvent :: ConsumerChannelStopped
573
+ }
574
+ } ,
575
+ maybe_message = messages. next( ) => {
576
+ match maybe_message {
577
+ Some ( Ok ( msg) ) => ReadTaskEvent :: Message ( msg) ,
578
+ Some ( Err ( err) ) => ReadTaskEvent :: ReadFailed ( err) ,
579
+ _ => ReadTaskEvent :: MessagesStopped ,
580
+ }
581
+ }
582
+ } ,
583
+ false ,
584
+ ) ,
585
+ _ => {
586
+ let get_next = async {
587
+ tokio:: select! {
588
+ maybe_consumer = consumer_stream. next( ) => {
589
+ if let Some ( ( consumer, options) ) = maybe_consumer {
590
+ ReadTaskEvent :: NewConsumer ( consumer, options)
591
+ } else {
592
+ ReadTaskEvent :: ConsumerChannelStopped
593
+ }
594
+ } ,
595
+ maybe_message = messages. next( ) => {
596
+ match maybe_message {
597
+ Some ( Ok ( msg) ) => ReadTaskEvent :: Message ( msg) ,
598
+ Some ( Err ( err) ) => ReadTaskEvent :: ReadFailed ( err) ,
599
+ _ => ReadTaskEvent :: MessagesStopped ,
600
+ }
601
+ }
602
+ }
603
+ } ;
604
+ if flushed {
605
+ trace ! ( "Waiting without flush." ) ;
606
+ ( get_next. await , true )
607
+ } else {
608
+ trace ! ( "Waiting with flush." ) ;
609
+ let flush = join ( flush_all ( & mut awaiting_synced) , flush_all ( & mut registered) ) ;
610
+ let next_with_flush = immediate_or_join ( get_next, flush) ;
611
+ let ( next, flush_result) = next_with_flush. await ;
612
+ let is_active = if flush_result. is_some ( ) {
613
+ trace ! ( "Flush completed." ) ;
614
+ flushed = true ;
615
+ if registered. is_empty ( ) && awaiting_synced. is_empty ( ) {
616
+ trace ! ( "Number of subscribers dropped to 0." ) ;
617
+ task_state. set ( Some ( make_timeout ( ) ) ) ;
618
+ false
619
+ } else {
620
+ true
621
+ }
622
+ } else {
623
+ true
624
+ } ;
625
+ ( next, is_active)
626
+ }
627
+ }
628
+ } ;
629
+
630
+ match event {
631
+ ReadTaskEvent :: ConsumersTimedOut => {
556
632
info ! ( "No consumers connected within the timeout period. Voting to stop." ) ;
557
633
if stop_voter. vote ( ) == VoteResult :: Unanimous {
558
634
// No consumers registered within the timeout and the write task has voted to stop.
559
635
break Ok ( ( ) ) ;
560
636
} else {
561
637
voted = true ;
562
- consumer_stream. next ( ) . await . map ( Either :: Right )
563
- }
564
- }
565
- } else if flushed {
566
- trace ! ( "Waiting without flush." ) ;
567
- match select ( consumer_stream. next ( ) , messages. next ( ) ) . await {
568
- Either :: Left ( ( consumer, _) ) => consumer. map ( Either :: Right ) ,
569
- Either :: Right ( ( msg, _) ) => msg. map ( Either :: Left ) ,
570
- }
571
- } else {
572
- trace ! ( "Waiting with flush." ) ;
573
- let get_next = select ( consumer_stream. next ( ) , messages. next ( ) ) ;
574
- let flush = join ( flush_all ( & mut awaiting_synced) , flush_all ( & mut registered) ) ;
575
- let next_with_flush = immediate_or_join ( get_next, flush) ;
576
- let ( next, flush_result) = next_with_flush. await ;
577
- if flush_result. is_some ( ) {
578
- trace ! ( "Flush completed." ) ;
579
- if registered. is_empty ( ) && awaiting_synced. is_empty ( ) {
580
- trace ! ( "Number of subscribers dropped to 0." ) ;
581
- empty_timestamp = Some ( Instant :: now ( ) + config. empty_timeout ) ;
582
638
}
583
- flushed = true ;
584
- }
585
- match next {
586
- Either :: Left ( ( consumer, _) ) => consumer. map ( Either :: Right ) ,
587
- Either :: Right ( ( msg, _) ) => msg. map ( Either :: Left ) ,
588
639
}
589
- } ;
590
-
591
- match next_item {
592
- Some ( Either :: Left ( Ok ( ResponseMessage { envelope, .. } ) ) ) => match envelope {
640
+ ReadTaskEvent :: Message ( ResponseMessage { envelope, .. } ) => match envelope {
593
641
Notification :: Linked => {
594
642
trace ! ( "Entering Linked state." ) ;
595
- state = ReadTaskState :: Linked ;
596
- link ( & mut awaiting_linked, & mut awaiting_synced, & mut registered) . await ;
597
- if awaiting_synced. is_empty ( ) && registered. is_empty ( ) {
598
- empty_timestamp = Some ( Instant :: now ( ) + config. empty_timeout ) ;
643
+ dl_state = ReadTaskDlState :: Linked ;
644
+ if is_active {
645
+ link ( & mut awaiting_linked, & mut awaiting_synced, & mut registered) . await ;
646
+ if awaiting_synced. is_empty ( ) && registered. is_empty ( ) {
647
+ trace ! ( "Number of subscribers dropped to 0." ) ;
648
+ task_state. set ( Some ( make_timeout ( ) ) ) ;
649
+ }
599
650
}
600
651
}
601
652
Notification :: Synced => {
602
653
trace ! ( "Entering Synced state." ) ;
603
- state = ReadTaskState :: Synced ;
604
- if I :: SINGLE_FRAME_STATE {
605
- sync_current ( & mut awaiting_synced, & mut registered, & current) . await ;
606
- } else {
607
- sync_only ( & mut awaiting_synced, & mut registered) . await ;
608
- }
609
- if registered. is_empty ( ) {
610
- empty_timestamp = Some ( Instant :: now ( ) + config. empty_timeout ) ;
654
+ dl_state = ReadTaskDlState :: Synced ;
655
+ if is_active {
656
+ if I :: SINGLE_FRAME_STATE {
657
+ sync_current ( & mut awaiting_synced, & mut registered, & current) . await ;
658
+ } else {
659
+ sync_only ( & mut awaiting_synced, & mut registered) . await ;
660
+ }
661
+ if registered. is_empty ( ) {
662
+ trace ! ( "Number of subscribers dropped to 0." ) ;
663
+ task_state. set ( Some ( make_timeout ( ) ) ) ;
664
+ }
611
665
}
612
666
}
667
+ Notification :: Unlinked ( message) => {
668
+ trace ! ( "Stopping after unlinked: {msg:?}" , msg = message) ;
669
+ break Ok ( ( ) ) ;
670
+ }
613
671
Notification :: Event ( bytes) => {
614
- trace ! ( "Dispatching an event ." ) ;
672
+ trace ! ( "Updating the current value ." ) ;
615
673
current. clear ( ) ;
616
674
if let Err ( e) = interpretation. interpret_frame_data ( bytes, & mut current) {
617
675
if let BadFrameResponse :: Abort ( report) = failure_handler. failed_with ( e) {
618
676
break Err ( report) ;
619
677
}
620
678
}
621
- send_current ( & mut registered, & current) . await ;
622
- if !I :: SINGLE_FRAME_STATE {
623
- send_current ( & mut awaiting_synced, & current) . await ;
624
- }
625
- if registered. is_empty ( ) && awaiting_synced. is_empty ( ) {
626
- trace ! ( "Number of subscribers dropped to 0." ) ;
627
- empty_timestamp = Some ( Instant :: now ( ) + config. empty_timeout ) ;
628
- flushed = true ;
629
- } else {
630
- flushed = false ;
679
+ if is_active {
680
+ send_current ( & mut registered, & current) . await ;
681
+ if !I :: SINGLE_FRAME_STATE {
682
+ send_current ( & mut awaiting_synced, & current) . await ;
683
+ }
684
+ if registered. is_empty ( ) && awaiting_synced. is_empty ( ) {
685
+ trace ! ( "Number of subscribers dropped to 0." ) ;
686
+ task_state. set ( Some ( make_timeout ( ) ) ) ;
687
+ flushed = true ;
688
+ } else {
689
+ flushed = false ;
690
+ }
631
691
}
632
692
}
633
- Notification :: Unlinked ( message) => {
634
- trace ! ( "Stopping after unlinked: {msg:?}" , msg = message) ;
635
- break Ok ( ( ) ) ;
636
- }
637
693
} ,
638
- Some ( Either :: Right ( ( writer, options) ) ) => {
639
- if voted {
640
- if stop_voter. rescind ( ) == VoteResult :: Unanimous {
641
- info ! ( "Attempted to rescind stop vote but shutdown had already started." ) ;
642
- break Ok ( ( ) ) ;
643
- } else {
644
- voted = false ;
645
- }
646
- }
694
+ ReadTaskEvent :: ReadFailed ( err) => {
695
+ error ! (
696
+ "Failed to read a frame from the input: {error}" ,
697
+ error = err
698
+ ) ;
699
+ break Ok ( ( ) ) ;
700
+ }
701
+ ReadTaskEvent :: NewConsumer ( writer, options) => {
647
702
let mut dl_writer = DownlinkSender :: new ( writer, options) ;
648
- if matches ! ( state , ReadTaskState :: Init ) {
703
+ let added = if matches ! ( dl_state , ReadTaskDlState :: Init ) {
649
704
trace ! ( "Attaching a new subscriber to be linked." ) ;
650
- empty_timestamp = None ;
651
705
awaiting_linked. push ( dl_writer) ;
706
+ true
652
707
} else if dl_writer. send ( DownlinkNotification :: Linked ) . await . is_ok ( ) {
653
708
trace ! ( "Attaching a new subscriber to be synced." ) ;
654
- empty_timestamp = None ;
655
709
awaiting_synced. push ( dl_writer) ;
710
+ true
711
+ } else {
712
+ false
713
+ } ;
714
+ if added {
715
+ task_state. set ( None ) ;
716
+ if voted {
717
+ if stop_voter. rescind ( ) == VoteResult :: Unanimous {
718
+ info ! (
719
+ "Attempted to rescind stop vote but shutdown had already started."
720
+ ) ;
721
+ break Ok ( ( ) ) ;
722
+ } else {
723
+ voted = false ;
724
+ }
725
+ }
656
726
}
657
727
}
658
- Some ( Either :: Left ( Err ( err) ) ) => {
659
- error ! (
660
- "Failed to read a frame from the input: {error}" ,
661
- error = err
662
- ) ;
663
- break Ok ( ( ) ) ;
664
- }
665
728
_ => {
666
729
trace ! ( "Instructed to stop." ) ;
667
730
break Ok ( ( ) ) ;
668
731
}
669
732
}
670
733
} ;
734
+
671
735
trace ! ( "Read task stopping and unlinked all subscribers" ) ;
672
736
unlink ( awaiting_linked) . await ;
673
737
unlink ( awaiting_synced) . await ;
0 commit comments