File tree Expand file tree Collapse file tree 2 files changed +53
-2
lines changed
runtime/swimos_runtime/src/downlink Expand file tree Collapse file tree 2 files changed +53
-2
lines changed Original file line number Diff line number Diff line change @@ -653,7 +653,13 @@ where
653
653
trace ! ( "Entering Synced state." ) ;
654
654
dl_state = ReadTaskDlState :: Synced ;
655
655
if is_active {
656
- if I :: SINGLE_FRAME_STATE {
656
+ // Current will be empty if we're communicating with a Supply Lane as no
657
+ // event envelope will have been sent. Therefore, we can't use
658
+ // `sync_current` as it will send an event notification with an empty body
659
+ // and cause the downlink's runtime task envelope decoder will potentially
660
+ // fail due to reading an extant. Therefore, delegate the operation to
661
+ // `sync_only` which will not send an event notification.
662
+ if I :: SINGLE_FRAME_STATE && !current. is_empty ( ) {
657
663
sync_current ( & mut awaiting_synced, & mut registered, & current) . await ;
658
664
} else {
659
665
sync_only ( & mut awaiting_synced, & mut registered) . await ;
Original file line number Diff line number Diff line change @@ -731,7 +731,7 @@ async fn exhaust_output_buffer() {
731
731
}
732
732
733
733
#[ tokio:: test]
734
- async fn shutdowm_after_timeout_with_no_subscribers ( ) {
734
+ async fn shutdown_after_timeout_with_no_subscribers ( ) {
735
735
let ( ( _stop, events) , result) = run_test_with_config (
736
736
DownlinkOptions :: empty ( ) ,
737
737
DownlinkRuntimeConfig {
@@ -1079,3 +1079,48 @@ async fn receive_from_two_consumers() {
1079
1079
assert_eq ! ( first_events, vec![ DownlinkNotification :: Unlinked ] ) ;
1080
1080
assert_eq ! ( second_events, vec![ DownlinkNotification :: Unlinked ] ) ;
1081
1081
}
1082
+
1083
+ #[ tokio:: test]
1084
+ async fn sync_no_event ( ) {
1085
+ let ( events, result) = run_test (
1086
+ DownlinkOptions :: SYNC ,
1087
+ |TestContext {
1088
+ mut tx,
1089
+ mut rx,
1090
+ start_client,
1091
+ stop,
1092
+ mut events,
1093
+ ..
1094
+ } | async move {
1095
+ expect_message ( rx. recv ( ) . await , Operation :: Link ) ;
1096
+
1097
+ tx. link ( ) . await ;
1098
+
1099
+ start_client. trigger ( ) ;
1100
+
1101
+ expect_event (
1102
+ events. next ( ) . await ,
1103
+ State :: Unlinked ,
1104
+ DownlinkNotification :: Linked ,
1105
+ ) ;
1106
+ expect_message ( rx. recv ( ) . await , Operation :: Sync ) ;
1107
+
1108
+ tx. sync ( ) . await ;
1109
+
1110
+ expect_event (
1111
+ events. next ( ) . await ,
1112
+ State :: Linked ,
1113
+ DownlinkNotification :: Synced ,
1114
+ ) ;
1115
+
1116
+ stop. trigger ( ) ;
1117
+ events. collect :: < Vec < _ > > ( ) . await
1118
+ } ,
1119
+ )
1120
+ . await ;
1121
+ assert ! ( result. is_ok( ) ) ;
1122
+ assert_eq ! (
1123
+ events,
1124
+ vec![ ( State :: Synced , DownlinkNotification :: Unlinked ) ]
1125
+ ) ;
1126
+ }
You can’t perform that action at this time.
0 commit comments