Skip to content

Commit cffc38f

Browse files
committed
Reworks fix to add support for lanes that have an optional type
1 parent d44bd98 commit cffc38f

File tree

2 files changed

+70
-7
lines changed

2 files changed

+70
-7
lines changed

runtime/swimos_runtime/src/downlink/mod.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,10 @@ where
555555
let mut awaiting_synced: Vec<DownlinkSender> = vec![];
556556
let mut awaiting_linked: Vec<DownlinkSender> = vec![];
557557
let mut registered: Vec<DownlinkSender> = vec![];
558+
// Track whether any events have been received while syncing the downlink. While this isn't the
559+
// nicest thing to have, it's required to distinguish between communicating with a stateless
560+
// lane and a downlink syncing with a lane that has a type which may be optional.
561+
let mut sync_event = false;
558562

559563
let result: Result<(), H::Report> = loop {
560564
let (event, is_active) = match task_state.as_mut().as_pin_mut() {
@@ -653,13 +657,17 @@ where
653657
trace!("Entering Synced state.");
654658
dl_state = ReadTaskDlState::Synced;
655659
if is_active {
656-
// `current` will be empty if we're communicating with a stateless lane as
657-
// no event envelope will have been sent. Therefore, we can't use
658-
// `sync_current` as it will send an event notification with an empty body
659-
// and cause the downlink's runtime task envelope decoder will potentially
660-
// fail due to reading an extant. Therefore, delegate the operation to
660+
// `sync_event` will be false if we're communicating with a stateless lane
661+
// as no event envelope will have been sent. However, it's valid Recon for
662+
// an empty event envelope to be sent (consider Option::None) and this must
663+
// still be sent to the downlink task.
664+
//
665+
// If we're linked to a stateless lane, then `sync_current` cannot be used
666+
// as we will not have received an event envelope as it will dispatch one
667+
// with the empty buffer and this may cause the downlink task's decoder to
668+
// fail due to reading an extant read event. Therefore, delegate the operation to
661669
// `sync_only` which will not send an event notification.
662-
if I::SINGLE_FRAME_STATE && !current.is_empty() {
670+
if I::SINGLE_FRAME_STATE && sync_event {
663671
sync_current(&mut awaiting_synced, &mut registered, &current).await;
664672
} else {
665673
sync_only(&mut awaiting_synced, &mut registered).await;
@@ -675,8 +683,11 @@ where
675683
break Ok(());
676684
}
677685
Notification::Event(bytes) => {
686+
sync_event = true;
687+
678688
trace!("Updating the current value.");
679689
current.clear();
690+
680691
if let Err(e) = interpretation.interpret_frame_data(bytes, &mut current) {
681692
if let BadFrameResponse::Abort(report) = failure_handler.failed_with(e) {
682693
break Err(report);

runtime/swimos_runtime/src/downlink/tests/value.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ async fn shutdowm_after_attached() {
384384
}
385385

386386
#[tokio::test]
387-
async fn shutdowm_after_corrupted_frame() {
387+
async fn shutdown_after_corrupted_frame() {
388388
let (events, result) = run_test(DownlinkOptions::empty(), |context| async move {
389389
let TestContext {
390390
mut tx,
@@ -1124,3 +1124,55 @@ async fn sync_no_event() {
11241124
vec![(State::Synced, DownlinkNotification::Unlinked)]
11251125
);
11261126
}
1127+
1128+
#[tokio::test]
1129+
async fn sync_empty_event() {
1130+
let (events, result) = run_test(
1131+
DownlinkOptions::SYNC,
1132+
|TestContext {
1133+
mut tx,
1134+
mut rx,
1135+
start_client,
1136+
stop,
1137+
mut events,
1138+
..
1139+
}| async move {
1140+
expect_message(rx.recv().await, Operation::Link);
1141+
1142+
tx.link().await;
1143+
1144+
start_client.trigger();
1145+
1146+
expect_event(
1147+
events.next().await,
1148+
State::Unlinked,
1149+
DownlinkNotification::Linked,
1150+
);
1151+
expect_message(rx.recv().await, Operation::Sync);
1152+
1153+
let content = Message::CurrentValue(Text::new(""));
1154+
tx.update(content.clone()).await;
1155+
tx.sync().await;
1156+
1157+
expect_event(
1158+
events.next().await,
1159+
State::Linked,
1160+
DownlinkNotification::Event { body: content },
1161+
);
1162+
expect_event(
1163+
events.next().await,
1164+
State::Linked,
1165+
DownlinkNotification::Synced,
1166+
);
1167+
1168+
stop.trigger();
1169+
events.collect::<Vec<_>>().await
1170+
},
1171+
)
1172+
.await;
1173+
assert!(result.is_ok());
1174+
assert_eq!(
1175+
events,
1176+
vec![(State::Synced, DownlinkNotification::Unlinked)]
1177+
);
1178+
}

0 commit comments

Comments
 (0)