Skip to content

Commit 8593a37

Browse files
authored
Merge branch 'main' into client_event_downlinks
2 parents bf6db76 + 643d55c commit 8593a37

File tree

2 files changed

+117
-3
lines changed

2 files changed

+117
-3
lines changed

runtime/swimos_runtime/src/downlink/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,10 @@ where
555555
let mut awaiting_synced: Vec<DownlinkSender> = vec![];
556556
let mut awaiting_linked: Vec<DownlinkSender> = vec![];
557557
let mut registered: Vec<DownlinkSender> = vec![];
558+
// Track whether any events have been received while syncing the downlink. While this isn't the
559+
// nicest thing to have, it's required to distinguish between communicating with a stateless
560+
// lane and a downlink syncing with a lane that has a type which may be optional.
561+
let mut sync_event = false;
558562

559563
let result: Result<(), H::Report> = loop {
560564
let (event, is_active) = match task_state.as_mut().as_pin_mut() {
@@ -653,7 +657,17 @@ where
653657
trace!("Entering Synced state.");
654658
dl_state = ReadTaskDlState::Synced;
655659
if is_active {
656-
if I::SINGLE_FRAME_STATE {
660+
// `sync_event` will be false if we're communicating with a stateless lane
661+
// as no event envelope will have been sent. However, it's valid Recon for
662+
// an empty event envelope to be sent (consider Option::None) and this must
663+
// still be sent to the downlink task.
664+
//
665+
// If we're linked to a stateless lane, then `sync_current` cannot be used
666+
// as we will not have received an event envelope as it will dispatch one
667+
// with the empty buffer and this may cause the downlink task's decoder to
668+
// fail due to reading an extant read event. Therefore, delegate the operation to
669+
// `sync_only` which will not send an event notification.
670+
if I::SINGLE_FRAME_STATE && sync_event {
657671
sync_current(&mut awaiting_synced, &mut registered, &current).await;
658672
} else {
659673
sync_only(&mut awaiting_synced, &mut registered).await;
@@ -669,8 +683,11 @@ where
669683
break Ok(());
670684
}
671685
Notification::Event(bytes) => {
686+
sync_event = true;
687+
672688
trace!("Updating the current value.");
673689
current.clear();
690+
674691
if let Err(e) = interpretation.interpret_frame_data(bytes, &mut current) {
675692
if let BadFrameResponse::Abort(report) = failure_handler.failed_with(e) {
676693
break Err(report);

runtime/swimos_runtime/src/downlink/tests/value.rs

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ async fn shutdowm_after_attached() {
384384
}
385385

386386
#[tokio::test]
387-
async fn shutdowm_after_corrupted_frame() {
387+
async fn shutdown_after_corrupted_frame() {
388388
let (events, result) = run_test(DownlinkOptions::empty(), |context| async move {
389389
let TestContext {
390390
mut tx,
@@ -731,7 +731,7 @@ async fn exhaust_output_buffer() {
731731
}
732732

733733
#[tokio::test]
734-
async fn shutdowm_after_timeout_with_no_subscribers() {
734+
async fn shutdown_after_timeout_with_no_subscribers() {
735735
let ((_stop, events), result) = run_test_with_config(
736736
DownlinkOptions::empty(),
737737
DownlinkRuntimeConfig {
@@ -1079,3 +1079,100 @@ async fn receive_from_two_consumers() {
10791079
assert_eq!(first_events, vec![DownlinkNotification::Unlinked]);
10801080
assert_eq!(second_events, vec![DownlinkNotification::Unlinked]);
10811081
}
1082+
1083+
#[tokio::test]
1084+
async fn sync_no_event() {
1085+
let (events, result) = run_test(
1086+
DownlinkOptions::SYNC,
1087+
|TestContext {
1088+
mut tx,
1089+
mut rx,
1090+
start_client,
1091+
stop,
1092+
mut events,
1093+
..
1094+
}| async move {
1095+
expect_message(rx.recv().await, Operation::Link);
1096+
1097+
tx.link().await;
1098+
1099+
start_client.trigger();
1100+
1101+
expect_event(
1102+
events.next().await,
1103+
State::Unlinked,
1104+
DownlinkNotification::Linked,
1105+
);
1106+
expect_message(rx.recv().await, Operation::Sync);
1107+
1108+
tx.sync().await;
1109+
1110+
expect_event(
1111+
events.next().await,
1112+
State::Linked,
1113+
DownlinkNotification::Synced,
1114+
);
1115+
1116+
stop.trigger();
1117+
events.collect::<Vec<_>>().await
1118+
},
1119+
)
1120+
.await;
1121+
assert!(result.is_ok());
1122+
assert_eq!(
1123+
events,
1124+
vec![(State::Synced, DownlinkNotification::Unlinked)]
1125+
);
1126+
}
1127+
1128+
#[tokio::test]
1129+
async fn sync_empty_event() {
1130+
let (events, result) = run_test(
1131+
DownlinkOptions::SYNC,
1132+
|TestContext {
1133+
mut tx,
1134+
mut rx,
1135+
start_client,
1136+
stop,
1137+
mut events,
1138+
..
1139+
}| async move {
1140+
expect_message(rx.recv().await, Operation::Link);
1141+
1142+
tx.link().await;
1143+
1144+
start_client.trigger();
1145+
1146+
expect_event(
1147+
events.next().await,
1148+
State::Unlinked,
1149+
DownlinkNotification::Linked,
1150+
);
1151+
expect_message(rx.recv().await, Operation::Sync);
1152+
1153+
let content = Message::CurrentValue(Text::new(""));
1154+
tx.update(content.clone()).await;
1155+
tx.sync().await;
1156+
1157+
expect_event(
1158+
events.next().await,
1159+
State::Linked,
1160+
DownlinkNotification::Event { body: content },
1161+
);
1162+
expect_event(
1163+
events.next().await,
1164+
State::Linked,
1165+
DownlinkNotification::Synced,
1166+
);
1167+
1168+
stop.trigger();
1169+
events.collect::<Vec<_>>().await
1170+
},
1171+
)
1172+
.await;
1173+
assert!(result.is_ok());
1174+
assert_eq!(
1175+
events,
1176+
vec![(State::Synced, DownlinkNotification::Unlinked)]
1177+
);
1178+
}

0 commit comments

Comments
 (0)