Skip to content

Commit 643d55c

Browse files
authored
Merge pull request #624 from swimos/supply_lane_runtime
Fixes incorrect dispatching of event envelopes during downlink synchronisation
2 parents 160911b + cffc38f commit 643d55c

File tree

3 files changed

+231
-7
lines changed

3 files changed

+231
-7
lines changed

client/swimos_client/src/lib.rs

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#[cfg(not(feature = "deflate"))]
1616
use ratchet::NoExtProvider;
1717
use ratchet::WebSocketStream;
18+
use std::marker::PhantomData;
1819
use std::num::NonZeroUsize;
1920
use swimos_remote::ws::RatchetClient;
2021

@@ -28,12 +29,12 @@ pub use runtime::{CommandError, Commander, RemotePath};
2829
use std::sync::Arc;
2930
pub use swimos_api::downlink::DownlinkConfig;
3031
pub use swimos_downlink::lifecycle::{
31-
BasicMapDownlinkLifecycle, BasicValueDownlinkLifecycle, MapDownlinkLifecycle,
32-
ValueDownlinkLifecycle,
32+
BasicEventDownlinkLifecycle, BasicMapDownlinkLifecycle, BasicValueDownlinkLifecycle,
33+
EventDownlinkLifecycle, MapDownlinkLifecycle, ValueDownlinkLifecycle,
3334
};
3435
use swimos_downlink::{
35-
ChannelError, DownlinkTask, MapDownlinkHandle, MapDownlinkModel, MapKey, MapValue,
36-
NotYetSyncedError, ValueDownlinkModel, ValueDownlinkSet,
36+
ChannelError, DownlinkTask, EventDownlinkModel, MapDownlinkHandle, MapDownlinkModel, MapKey,
37+
MapValue, NotYetSyncedError, ValueDownlinkModel, ValueDownlinkSet,
3738
};
3839
use swimos_form::Form;
3940
use swimos_remote::net::dns::Resolver;
@@ -235,6 +236,24 @@ impl ClientHandle {
235236
}
236237
}
237238

239+
/// Returns an event downlink builder initialised with the default options.
240+
///
241+
/// # Arguments
242+
/// * `path` - The path of the downlink top open.
243+
pub fn event_downlink<T>(
244+
&self,
245+
path: RemotePath,
246+
) -> EventDownlinkBuilder<'_, BasicEventDownlinkLifecycle<T>> {
247+
EventDownlinkBuilder {
248+
handle: self,
249+
lifecycle: BasicEventDownlinkLifecycle::default(),
250+
path,
251+
options: DownlinkOptions::SYNC,
252+
runtime_config: Default::default(),
253+
downlink_config: Default::default(),
254+
}
255+
}
256+
238257
/// Returns a map downlink builder initialised with the default options.
239258
///
240259
/// # Arguments
@@ -497,3 +516,94 @@ impl<K, V> MapDownlinkView<K, V> {
497516
self.stop_rx.clone()
498517
}
499518
}
519+
520+
/// A builder for value downlinks.
521+
pub struct EventDownlinkBuilder<'h, L> {
522+
handle: &'h ClientHandle,
523+
lifecycle: L,
524+
path: RemotePath,
525+
options: DownlinkOptions,
526+
runtime_config: DownlinkRuntimeConfig,
527+
downlink_config: DownlinkConfig,
528+
}
529+
530+
impl<'h, L> EventDownlinkBuilder<'h, L> {
531+
/// Sets a new lifecycle that to be used.
532+
pub fn lifecycle<NL>(self, lifecycle: NL) -> EventDownlinkBuilder<'h, NL> {
533+
let EventDownlinkBuilder {
534+
handle,
535+
path,
536+
options,
537+
runtime_config,
538+
downlink_config,
539+
..
540+
} = self;
541+
EventDownlinkBuilder {
542+
handle,
543+
lifecycle,
544+
path,
545+
options,
546+
runtime_config,
547+
downlink_config,
548+
}
549+
}
550+
551+
/// Sets link options for the downlink.
552+
pub fn options(mut self, options: DownlinkOptions) -> Self {
553+
self.options = options;
554+
self
555+
}
556+
557+
/// Sets a new downlink runtime configuration.
558+
pub fn runtime_config(mut self, config: DownlinkRuntimeConfig) -> Self {
559+
self.runtime_config = config;
560+
self
561+
}
562+
563+
/// Sets a new downlink configuration.
564+
pub fn downlink_config(mut self, config: DownlinkConfig) -> Self {
565+
self.downlink_config = config;
566+
self
567+
}
568+
569+
/// Attempts to open the downlink.
570+
pub async fn open<T>(self) -> Result<EventDownlinkView<T>, Arc<DownlinkRuntimeError>>
571+
where
572+
L: EventDownlinkLifecycle<T> + Sync + 'static,
573+
T: Send + Sync + Form + Clone + 'static,
574+
T::Rec: Send,
575+
{
576+
let EventDownlinkBuilder {
577+
handle,
578+
lifecycle,
579+
path,
580+
options,
581+
runtime_config,
582+
downlink_config,
583+
} = self;
584+
let task = DownlinkTask::new(EventDownlinkModel::new(lifecycle));
585+
let stop_rx = handle
586+
.inner
587+
.run_downlink(path, runtime_config, downlink_config, options, task)
588+
.await?;
589+
590+
Ok(EventDownlinkView {
591+
_type: Default::default(),
592+
stop_rx,
593+
})
594+
}
595+
}
596+
597+
/// An event downlink handle which provides the functionality to await the downlink terminating.
598+
#[derive(Debug, Clone)]
599+
pub struct EventDownlinkView<T> {
600+
_type: PhantomData<T>,
601+
stop_rx: promise::Receiver<Result<(), DownlinkRuntimeError>>,
602+
}
603+
604+
impl<T> EventDownlinkView<T> {
605+
/// Returns a receiver that completes with the result of downlink's internal task.
606+
pub fn stop_notification(&self) -> promise::Receiver<Result<(), DownlinkRuntimeError>> {
607+
self.stop_rx.clone()
608+
}
609+
}

runtime/swimos_runtime/src/downlink/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,10 @@ where
555555
let mut awaiting_synced: Vec<DownlinkSender> = vec![];
556556
let mut awaiting_linked: Vec<DownlinkSender> = vec![];
557557
let mut registered: Vec<DownlinkSender> = vec![];
558+
// Track whether any events have been received while syncing the downlink. While this isn't the
559+
// nicest thing to have, it's required to distinguish between communicating with a stateless
560+
// lane and a downlink syncing with a lane that has a type which may be optional.
561+
let mut sync_event = false;
558562

559563
let result: Result<(), H::Report> = loop {
560564
let (event, is_active) = match task_state.as_mut().as_pin_mut() {
@@ -653,7 +657,17 @@ where
653657
trace!("Entering Synced state.");
654658
dl_state = ReadTaskDlState::Synced;
655659
if is_active {
656-
if I::SINGLE_FRAME_STATE {
660+
// `sync_event` will be false if we're communicating with a stateless lane
661+
// as no event envelope will have been sent. However, it's valid Recon for
662+
// an empty event envelope to be sent (consider Option::None) and this must
663+
// still be sent to the downlink task.
664+
//
665+
// If we're linked to a stateless lane, then `sync_current` cannot be used
666+
// as we will not have received an event envelope as it will dispatch one
667+
// with the empty buffer and this may cause the downlink task's decoder to
668+
// fail due to reading an extant read event. Therefore, delegate the operation to
669+
// `sync_only` which will not send an event notification.
670+
if I::SINGLE_FRAME_STATE && sync_event {
657671
sync_current(&mut awaiting_synced, &mut registered, &current).await;
658672
} else {
659673
sync_only(&mut awaiting_synced, &mut registered).await;
@@ -669,8 +683,11 @@ where
669683
break Ok(());
670684
}
671685
Notification::Event(bytes) => {
686+
sync_event = true;
687+
672688
trace!("Updating the current value.");
673689
current.clear();
690+
674691
if let Err(e) = interpretation.interpret_frame_data(bytes, &mut current) {
675692
if let BadFrameResponse::Abort(report) = failure_handler.failed_with(e) {
676693
break Err(report);

runtime/swimos_runtime/src/downlink/tests/value.rs

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ async fn shutdowm_after_attached() {
384384
}
385385

386386
#[tokio::test]
387-
async fn shutdowm_after_corrupted_frame() {
387+
async fn shutdown_after_corrupted_frame() {
388388
let (events, result) = run_test(DownlinkOptions::empty(), |context| async move {
389389
let TestContext {
390390
mut tx,
@@ -731,7 +731,7 @@ async fn exhaust_output_buffer() {
731731
}
732732

733733
#[tokio::test]
734-
async fn shutdowm_after_timeout_with_no_subscribers() {
734+
async fn shutdown_after_timeout_with_no_subscribers() {
735735
let ((_stop, events), result) = run_test_with_config(
736736
DownlinkOptions::empty(),
737737
DownlinkRuntimeConfig {
@@ -1079,3 +1079,100 @@ async fn receive_from_two_consumers() {
10791079
assert_eq!(first_events, vec![DownlinkNotification::Unlinked]);
10801080
assert_eq!(second_events, vec![DownlinkNotification::Unlinked]);
10811081
}
1082+
1083+
#[tokio::test]
1084+
async fn sync_no_event() {
1085+
let (events, result) = run_test(
1086+
DownlinkOptions::SYNC,
1087+
|TestContext {
1088+
mut tx,
1089+
mut rx,
1090+
start_client,
1091+
stop,
1092+
mut events,
1093+
..
1094+
}| async move {
1095+
expect_message(rx.recv().await, Operation::Link);
1096+
1097+
tx.link().await;
1098+
1099+
start_client.trigger();
1100+
1101+
expect_event(
1102+
events.next().await,
1103+
State::Unlinked,
1104+
DownlinkNotification::Linked,
1105+
);
1106+
expect_message(rx.recv().await, Operation::Sync);
1107+
1108+
tx.sync().await;
1109+
1110+
expect_event(
1111+
events.next().await,
1112+
State::Linked,
1113+
DownlinkNotification::Synced,
1114+
);
1115+
1116+
stop.trigger();
1117+
events.collect::<Vec<_>>().await
1118+
},
1119+
)
1120+
.await;
1121+
assert!(result.is_ok());
1122+
assert_eq!(
1123+
events,
1124+
vec![(State::Synced, DownlinkNotification::Unlinked)]
1125+
);
1126+
}
1127+
1128+
#[tokio::test]
1129+
async fn sync_empty_event() {
1130+
let (events, result) = run_test(
1131+
DownlinkOptions::SYNC,
1132+
|TestContext {
1133+
mut tx,
1134+
mut rx,
1135+
start_client,
1136+
stop,
1137+
mut events,
1138+
..
1139+
}| async move {
1140+
expect_message(rx.recv().await, Operation::Link);
1141+
1142+
tx.link().await;
1143+
1144+
start_client.trigger();
1145+
1146+
expect_event(
1147+
events.next().await,
1148+
State::Unlinked,
1149+
DownlinkNotification::Linked,
1150+
);
1151+
expect_message(rx.recv().await, Operation::Sync);
1152+
1153+
let content = Message::CurrentValue(Text::new(""));
1154+
tx.update(content.clone()).await;
1155+
tx.sync().await;
1156+
1157+
expect_event(
1158+
events.next().await,
1159+
State::Linked,
1160+
DownlinkNotification::Event { body: content },
1161+
);
1162+
expect_event(
1163+
events.next().await,
1164+
State::Linked,
1165+
DownlinkNotification::Synced,
1166+
);
1167+
1168+
stop.trigger();
1169+
events.collect::<Vec<_>>().await
1170+
},
1171+
)
1172+
.await;
1173+
assert!(result.is_ok());
1174+
assert_eq!(
1175+
events,
1176+
vec![(State::Synced, DownlinkNotification::Unlinked)]
1177+
);
1178+
}

0 commit comments

Comments
 (0)