Skip to content

Commit 6febc76

Browse files
committed
Adds client event downlink support
1 parent 72bec4b commit 6febc76

File tree

1 file changed

+114
-4
lines changed
  • client/swimos_client/src

1 file changed

+114
-4
lines changed

client/swimos_client/src/lib.rs

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#[cfg(not(feature = "deflate"))]
1616
use ratchet::NoExtProvider;
1717
use ratchet::WebSocketStream;
18+
use std::marker::PhantomData;
1819
use std::num::NonZeroUsize;
1920
use swimos_remote::ws::RatchetClient;
2021

@@ -28,12 +29,12 @@ pub use runtime::{CommandError, Commander, RemotePath};
2829
use std::sync::Arc;
2930
pub use swimos_api::downlink::DownlinkConfig;
3031
pub use swimos_downlink::lifecycle::{
31-
BasicMapDownlinkLifecycle, BasicValueDownlinkLifecycle, MapDownlinkLifecycle,
32-
ValueDownlinkLifecycle,
32+
BasicEventDownlinkLifecycle, BasicMapDownlinkLifecycle, BasicValueDownlinkLifecycle,
33+
EventDownlinkLifecycle, MapDownlinkLifecycle, ValueDownlinkLifecycle,
3334
};
3435
use swimos_downlink::{
35-
ChannelError, DownlinkTask, MapDownlinkHandle, MapDownlinkModel, MapKey, MapValue,
36-
NotYetSyncedError, ValueDownlinkModel, ValueDownlinkSet,
36+
ChannelError, DownlinkTask, EventDownlinkModel, MapDownlinkHandle, MapDownlinkModel, MapKey,
37+
MapValue, NotYetSyncedError, ValueDownlinkModel, ValueDownlinkSet,
3738
};
3839
use swimos_form::Form;
3940
use swimos_remote::net::dns::Resolver;
@@ -235,6 +236,24 @@ impl ClientHandle {
235236
}
236237
}
237238

239+
/// Returns an event downlink builder initialised with the default options.
240+
///
241+
/// # Arguments
242+
/// * `path` - The path of the downlink top open.
243+
pub fn event_downlink<T>(
244+
&self,
245+
path: RemotePath,
246+
) -> EventDownlinkBuilder<'_, BasicEventDownlinkLifecycle<T>> {
247+
EventDownlinkBuilder {
248+
handle: self,
249+
lifecycle: BasicEventDownlinkLifecycle::default(),
250+
path,
251+
options: DownlinkOptions::SYNC,
252+
runtime_config: Default::default(),
253+
downlink_config: Default::default(),
254+
}
255+
}
256+
238257
/// Returns a map downlink builder initialised with the default options.
239258
///
240259
/// # Arguments
@@ -497,3 +516,94 @@ impl<K, V> MapDownlinkView<K, V> {
497516
self.stop_rx.clone()
498517
}
499518
}
519+
520+
/// A builder for value downlinks.
521+
pub struct EventDownlinkBuilder<'h, L> {
522+
handle: &'h ClientHandle,
523+
lifecycle: L,
524+
path: RemotePath,
525+
options: DownlinkOptions,
526+
runtime_config: DownlinkRuntimeConfig,
527+
downlink_config: DownlinkConfig,
528+
}
529+
530+
impl<'h, L> EventDownlinkBuilder<'h, L> {
531+
/// Sets a new lifecycle that to be used.
532+
pub fn lifecycle<NL>(self, lifecycle: NL) -> EventDownlinkBuilder<'h, NL> {
533+
let EventDownlinkBuilder {
534+
handle,
535+
path,
536+
options,
537+
runtime_config,
538+
downlink_config,
539+
..
540+
} = self;
541+
EventDownlinkBuilder {
542+
handle,
543+
lifecycle,
544+
path,
545+
options,
546+
runtime_config,
547+
downlink_config,
548+
}
549+
}
550+
551+
/// Sets link options for the downlink.
552+
pub fn options(mut self, options: DownlinkOptions) -> Self {
553+
self.options = options;
554+
self
555+
}
556+
557+
/// Sets a new downlink runtime configuration.
558+
pub fn runtime_config(mut self, config: DownlinkRuntimeConfig) -> Self {
559+
self.runtime_config = config;
560+
self
561+
}
562+
563+
/// Sets a new downlink configuration.
564+
pub fn downlink_config(mut self, config: DownlinkConfig) -> Self {
565+
self.downlink_config = config;
566+
self
567+
}
568+
569+
/// Attempts to open the downlink.
570+
pub async fn open<T>(self) -> Result<EventDownlinkView<T>, Arc<DownlinkRuntimeError>>
571+
where
572+
L: EventDownlinkLifecycle<T> + Sync + 'static,
573+
T: Send + Sync + Form + Clone + 'static,
574+
T::Rec: Send,
575+
{
576+
let EventDownlinkBuilder {
577+
handle,
578+
lifecycle,
579+
path,
580+
options,
581+
runtime_config,
582+
downlink_config,
583+
} = self;
584+
let task = DownlinkTask::new(EventDownlinkModel::new(lifecycle));
585+
let stop_rx = handle
586+
.inner
587+
.run_downlink(path, runtime_config, downlink_config, options, task)
588+
.await?;
589+
590+
Ok(EventDownlinkView {
591+
_type: Default::default(),
592+
stop_rx,
593+
})
594+
}
595+
}
596+
597+
/// An event downlink handle which provides the functionality to await the downlink terminating.
598+
#[derive(Debug, Clone)]
599+
pub struct EventDownlinkView<T> {
600+
_type: PhantomData<T>,
601+
stop_rx: promise::Receiver<Result<(), DownlinkRuntimeError>>,
602+
}
603+
604+
impl<T> EventDownlinkView<T> {
605+
/// Returns a receiver that completes with the result of downlink's internal task.
606+
pub fn stop_notification(&self) -> promise::Receiver<Result<(), DownlinkRuntimeError>> {
607+
self.stop_rx.clone()
608+
}
609+
}

0 commit comments

Comments
 (0)