Skip to content

Commit 6b8419f

Browse files
committed
Documented swimos_trigger.
Made promises more flexible.
1 parent 3e15627 commit 6b8419f

File tree

10 files changed

+39
-28
lines changed

10 files changed

+39
-28
lines changed

client/runtime/src/runtime.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use swimos_utilities::trigger::promise;
4848
pub type BoxedDownlink = Box<dyn Downlink + Send + Sync + 'static>;
4949
type ByteChannel = (ByteWriter, ByteReader);
5050
type CallbackResult =
51-
Result<promise::Receiver<Result<(), DownlinkRuntimeError>>, Arc<DownlinkRuntimeError>>;
51+
Result<promise::Receiver<Result<(), Arc<DownlinkRuntimeError>>>, Arc<DownlinkRuntimeError>>;
5252
pub type DownlinkCallback = oneshot::Sender<CallbackResult>;
5353

5454
impl Debug for DownlinkRegistrationRequest {
@@ -172,7 +172,7 @@ enum RuntimeEvent {
172172
kind: DownlinkKind,
173173
address: RemotePath,
174174
result: Result<(), DownlinkRuntimeError>,
175-
tx: promise::Sender<Result<(), DownlinkRuntimeError>>,
175+
tx: promise::Sender<Result<(), Arc<DownlinkRuntimeError>>>,
176176
},
177177
/// A downlink runtime has completed.
178178
DownlinkRuntimeComplete {
@@ -662,7 +662,7 @@ async fn runtime_task<Net, Ws, Provider>(
662662
}
663663
}
664664

665-
let _r = tx.provide(result);
665+
let _r = tx.provide(result.map_err(Arc::new));
666666
}
667667
RuntimeEvent::Shutdown => {
668668
for peer in peers.values_mut() {

client/runtime/src/tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ struct ValueDownlinkContext {
387387
stopped: Arc<Notify>,
388388
handle_tx: mpsc::Sender<ValueDownlinkSet<i32>>,
389389
server: Server,
390-
promise: promise::Receiver<Result<(), DownlinkRuntimeError>>,
390+
promise: promise::Receiver<Result<(), Arc<DownlinkRuntimeError>>>,
391391
stop_tx: trigger::Sender,
392392
}
393393

@@ -610,14 +610,14 @@ struct TrackingValueContext {
610610
spawned: Arc<Notify>,
611611
stopped: Arc<Notify>,
612612
handle_tx: mpsc::Sender<ValueDownlinkSet<i32>>,
613-
promise: promise::Receiver<Result<(), DownlinkRuntimeError>>,
613+
promise: promise::Receiver<Result<(), Arc<DownlinkRuntimeError>>>,
614614
}
615615

616616
struct TrackingMapContext {
617617
spawned: Arc<Notify>,
618618
stopped: Arc<Notify>,
619619
tx: MapDownlinkHandle<i32, i32>,
620-
promise: promise::Receiver<Result<(), DownlinkRuntimeError>>,
620+
promise: promise::Receiver<Result<(), Arc<DownlinkRuntimeError>>>,
621621
}
622622

623623
#[tokio::test]

client/swimos_client/src/lib.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ impl From<NotYetSyncedError> for ValueDownlinkOperationError {
379379
#[derive(Debug, Clone)]
380380
pub struct ValueDownlinkView<T> {
381381
tx: mpsc::Sender<ValueDownlinkSet<T>>,
382-
stop_rx: promise::Receiver<Result<(), DownlinkRuntimeError>>,
382+
stop_rx: promise::Receiver<Result<(), Arc<DownlinkRuntimeError>>>,
383383
}
384384

385385
impl<T> ValueDownlinkView<T> {
@@ -390,7 +390,7 @@ impl<T> ValueDownlinkView<T> {
390390
}
391391

392392
/// Returns a receiver that completes with the result of downlink's internal task.
393-
pub fn stop_notification(&self) -> promise::Receiver<Result<(), DownlinkRuntimeError>> {
393+
pub fn stop_notification(&self) -> promise::Receiver<Result<(), Arc<DownlinkRuntimeError>>> {
394394
self.stop_rx.clone()
395395
}
396396
}
@@ -482,7 +482,7 @@ impl<'h, L> MapDownlinkBuilder<'h, L> {
482482
#[derive(Debug, Clone)]
483483
pub struct MapDownlinkView<K, V> {
484484
inner: MapDownlinkHandle<K, V>,
485-
stop_rx: promise::Receiver<Result<(), DownlinkRuntimeError>>,
485+
stop_rx: promise::Receiver<Result<(), Arc<DownlinkRuntimeError>>>,
486486
}
487487

488488
impl<K, V> MapDownlinkView<K, V> {
@@ -512,7 +512,7 @@ impl<K, V> MapDownlinkView<K, V> {
512512
}
513513

514514
/// Returns a receiver that completes with the result of downlink's internal task.
515-
pub fn stop_notification(&self) -> promise::Receiver<Result<(), DownlinkRuntimeError>> {
515+
pub fn stop_notification(&self) -> promise::Receiver<Result<(), Arc<DownlinkRuntimeError>>> {
516516
self.stop_rx.clone()
517517
}
518518
}
@@ -598,12 +598,12 @@ impl<'h, L> EventDownlinkBuilder<'h, L> {
598598
#[derive(Debug, Clone)]
599599
pub struct EventDownlinkView<T> {
600600
_type: PhantomData<T>,
601-
stop_rx: promise::Receiver<Result<(), DownlinkRuntimeError>>,
601+
stop_rx: promise::Receiver<Result<(), Arc<DownlinkRuntimeError>>>,
602602
}
603603

604604
impl<T> EventDownlinkView<T> {
605605
/// Returns a receiver that completes with the result of downlink's internal task.
606-
pub fn stop_notification(&self) -> promise::Receiver<Result<(), DownlinkRuntimeError>> {
606+
pub fn stop_notification(&self) -> promise::Receiver<Result<(), Arc<DownlinkRuntimeError>>> {
607607
self.stop_rx.clone()
608608
}
609609
}

runtime/swimos_runtime/src/agent/task/remotes/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,5 +239,5 @@ async fn remove_remote() {
239239
.expect("Timed out.")
240240
.expect("Reason promised dropped.");
241241

242-
assert_eq!(*result, DisconnectionReason::RemoteTimedOut);
242+
assert_eq!(result, DisconnectionReason::RemoteTimedOut);
243243
}

runtime/swimos_runtime/src/agent/task/tests/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -600,10 +600,7 @@ impl RemoteReceiver {
600600
if !lanes.is_empty() {
601601
panic!("Some lanes were not unlinked: {:?}", lanes);
602602
}
603-
let reason = completion_rx
604-
.await
605-
.map(|arc| *arc)
606-
.unwrap_or(DisconnectionReason::Failed);
603+
let reason = completion_rx.await.unwrap_or(DisconnectionReason::Failed);
607604

608605
assert_eq!(
609606
reason,

server/swimos_server_app/src/server/runtime/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,7 +1037,7 @@ async fn attach_agent(
10371037
Ok(true) => {
10381038
if provider.send(Ok((in_tx, out_rx))).is_ok() {
10391039
if let Ok(reason) = disconnect_rx.await {
1040-
*reason
1040+
reason
10411041
} else {
10421042
DisconnectionReason::Failed
10431043
}
@@ -1077,7 +1077,7 @@ async fn attach_link_remote(
10771077
info!("Downlink request dropped before satisfied.");
10781078
DisconnectionReason::Failed
10791079
} else if let Ok(reason) = disconnect_rx.await {
1080-
*reason
1080+
reason
10811081
} else {
10821082
DisconnectionReason::Failed
10831083
}

swimos_utilities/swimos_trigger/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
//! # Asynchronous Trigger
16+
//!
17+
//! This crate provides a [SPMC barrier](trigger::trigger) where any number of parties can wait for
18+
//! a single other party to complete an action.
19+
//!
20+
//! Additionally, a [promise](promise::promise) implementation is provided, implemented in terms of
21+
//! the trigger, that allows any number of parties to wait for a single producer to generate a value.
22+
1523
pub mod promise;
1624
mod trigger;
1725

swimos_utilities/swimos_trigger/src/promise/mod.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::sync::Arc;
2626
mod tests;
2727

2828
#[derive(Debug)]
29-
struct PromiseInner<T>(Arc<UnsafeCell<Option<Arc<T>>>>);
29+
struct PromiseInner<T>(Arc<UnsafeCell<Option<T>>>);
3030

3131
unsafe impl<T: Send> Send for PromiseInner<T> {}
3232
unsafe impl<T: Sync> Sync for PromiseInner<T> {}
@@ -55,7 +55,7 @@ impl<T> Clone for Receiver<T> {
5555
}
5656

5757
/// A promise allows a value to be provided, exactly once, at some time in the future.
58-
pub fn promise<T: Send + Sync>() -> (Sender<T>, Receiver<T>) {
58+
pub fn promise<T: Send + Sync + Clone>() -> (Sender<T>, Receiver<T>) {
5959
let data = Arc::new(UnsafeCell::new(None));
6060
let (tx, rx) = crate::trigger();
6161
(
@@ -79,10 +79,10 @@ impl<T: Send + Sync> Sender<T> {
7979
data: PromiseInner(data),
8080
} = self;
8181
unsafe {
82-
*data.get() = Some(Arc::new(value));
82+
*data.get() = Some(value);
8383
if trigger.trigger() {
8484
Ok(())
85-
} else if let Ok(value) = Arc::try_unwrap((*data.get()).take().unwrap()) {
85+
} else if let Some(value) = (*data.get()).take() {
8686
Err(value)
8787
} else {
8888
unreachable!()
@@ -91,6 +91,7 @@ impl<T: Send + Sync> Sender<T> {
9191
}
9292
}
9393

94+
/// An error generated if the [`Sender`] was dropped before it provided a value.
9495
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9596
pub struct PromiseError;
9697

@@ -101,15 +102,16 @@ impl Display for PromiseError {
101102
}
102103

103104
impl<T> Receiver<T> {
105+
/// Determine if two [`Receiver`]s are connected to the same [`Sender`].
104106
pub fn same_promise(left: &Self, right: &Self) -> bool {
105107
crate::Receiver::same_receiver(&left.trigger, &right.trigger)
106108
}
107109
}
108110

109111
impl Error for PromiseError {}
110112

111-
impl<T: Send + Sync> Future for Receiver<T> {
112-
type Output = Result<Arc<T>, PromiseError>;
113+
impl<T: Send + Sync + Clone> Future for Receiver<T> {
114+
type Output = Result<T, PromiseError>;
113115

114116
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115117
if ready!(self.trigger.poll_unpin(cx)).is_ok() {

swimos_utilities/swimos_trigger/src/promise/tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async fn await_promise() {
3232

3333
let receive_task = async move {
3434
let result = rx.await;
35-
assert!(matches!(result, Ok(v) if *v == 4));
35+
assert!(matches!(result, Ok(v) if v == 4));
3636
};
3737

3838
join(send_task, receive_task).await;
@@ -49,7 +49,7 @@ async fn await_promise_threaded() {
4949

5050
let receive_task = async move {
5151
let result = rx.await;
52-
assert!(matches!(result, Ok(v) if *v == 4));
52+
assert!(matches!(result, Ok(v) if v == 4));
5353
};
5454

5555
let (r1, r2) = join(tokio::spawn(send_task), tokio::spawn(receive_task)).await;

swimos_utilities/swimos_trigger/src/trigger/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,23 @@ struct TriggerInner {
3434
waiters: Mutex<Slab<Waker>>,
3535
}
3636

37+
/// The sending party of a trigger.
3738
#[derive(Debug)]
3839
pub struct Sender {
3940
inner: Option<Weak<TriggerInner>>,
4041
}
4142

4243
impl Error for TriggerError {}
4344

45+
/// A receiving party of a trigger.
4446
#[derive(Clone, Debug)]
4547
pub struct Receiver {
4648
inner: Arc<TriggerInner>,
4749
slot: Option<usize>,
4850
}
4951

5052
impl Receiver {
51-
/// Determine if two triggers are the same.
53+
/// Determine if two [`Receiver`]s are connected to the same [`Sender`].
5254
pub fn same_receiver(this: &Self, other: &Self) -> bool {
5355
Arc::ptr_eq(&this.inner, &other.inner)
5456
}
@@ -99,6 +101,7 @@ impl Drop for Sender {
99101
}
100102
}
101103

104+
/// An error generated if the [`Sender`] was dropped before it was triggered.
102105
#[derive(Debug, PartialEq, Eq)]
103106
pub struct TriggerError;
104107

@@ -139,6 +142,7 @@ impl Future for Receiver {
139142
}
140143

141144
impl Receiver {
145+
/// Check if the [`Sender`] has been dropped (regardless of whether it has been triggered).
142146
pub fn check_state(&self) -> Option<Result<(), TriggerError>> {
143147
let Receiver { inner, .. } = self;
144148
let flag = inner.flag.load(Ordering::Acquire);

0 commit comments

Comments
 (0)