Skip to content

Commit 7c61b96

Browse files
authored
Merge pull request #652 from swimos/downlink_stop
Adds stop functionality for downlinks in Join Map and Join Value lanes
2 parents 079f398 + 6a91a7b commit 7c61b96

File tree

13 files changed

+434
-64
lines changed

13 files changed

+434
-64
lines changed

server/swimos_agent/src/agent_lifecycle/utility/mod.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ use std::{collections::HashMap, marker::PhantomData};
2020

2121
use futures::stream::unfold;
2222
use futures::{Future, FutureExt, Stream, StreamExt};
23+
use tokio::time::Instant;
24+
2325
use swimos_api::address::Address;
2426
use swimos_form::read::RecognizerReadable;
2527
use swimos_form::write::StructuralWritable;
2628
use swimos_form::Form;
2729
use swimos_utilities::routing::RouteUri;
28-
use tokio::time::Instant;
2930

3031
use crate::agent_model::downlink::hosted::{
3132
EventDownlinkHandle, MapDownlinkHandle, ValueDownlinkHandle,
@@ -43,7 +44,8 @@ use crate::event_handler::{
4344
};
4445
use crate::event_handler::{GetAgentUri, HandlerAction, SideEffect};
4546
use crate::item::{
46-
MapLikeItem, MutableMapLikeItem, MutableValueLikeItem, TransformableMapLikeItem, ValueLikeItem,
47+
JoinLikeItem, MapLikeItem, MutableMapLikeItem, MutableValueLikeItem, TransformableMapLikeItem,
48+
ValueLikeItem,
4749
};
4850
use crate::lanes::command::{CommandLane, DoCommand};
4951
use crate::lanes::demand::{Cue, DemandLane};
@@ -649,7 +651,7 @@ impl<Agent: 'static> HandlerContext<Agent> {
649651
///
650652
/// # Arguments
651653
/// * `lane` - Projection to the lane.
652-
/// * `key - The key for the downlink.
654+
/// * `key` - The key for the downlink.
653655
/// * `host` - The remote host at which the agent resides (a local agent if not specified).
654656
/// * `node` - The node URI of the agent.
655657
/// * `lane_uri` - The lane to downlink from.
@@ -670,12 +672,30 @@ impl<Agent: 'static> HandlerContext<Agent> {
670672
JoinValueAddDownlink::new(lane, key, address)
671673
}
672674

675+
/// Removes a downlink from a Join lane. Removing any associated values the downlink holds in
676+
/// the underlying map.
677+
///
678+
/// # Arguments
679+
/// * `lane` - Projection to the lane.
680+
/// * `link_key` - The link key for the downlink.
681+
pub fn remove_downlink<L, K>(
682+
&self,
683+
lane: fn(&Agent) -> &L,
684+
link_key: K,
685+
) -> impl HandlerAction<Agent, Completion = ()> + Send + 'static
686+
where
687+
K: Clone + Send + Eq + PartialEq + Hash + 'static,
688+
L: JoinLikeItem<K>,
689+
{
690+
L::remove_downlink_handler(lane, link_key)
691+
}
692+
673693
/// Add a downlink to a Join Map lane. All key-value pairs received on the downlink will be set into the
674694
/// map state of the lane.
675695
///
676696
/// # Arguments
677697
/// * `lane` - Projection to the lane.
678-
/// * `link_key - A key to identify the link.
698+
/// * `link_key` - A key to identify the link.
679699
/// * `host` - The remote host at which the agent resides (a local agent if not specified).
680700
/// * `node` - The node URI of the agent.
681701
/// * `lane_uri` - The lane to downlink from.

server/swimos_agent/src/agent_model/downlink/hosted/event/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,4 +355,9 @@ impl EventDownlinkHandle {
355355
pub fn is_linked(&self) -> bool {
356356
matches!(self.observer.get(), DlState::Linked | DlState::Synced)
357357
}
358+
359+
/// Consumes this handle and returns the stop handle, if it exists.
360+
pub fn into_stop_rx(self) -> Option<trigger::Sender> {
361+
self.stop_tx
362+
}
358363
}

server/swimos_agent/src/item.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,22 @@ pub trait MutableValueLikeItem<T> {
107107

108108
fn set_handler<C: 'static>(projection: fn(&C) -> &Self, value: T) -> Self::SetHandler<C>;
109109
}
110+
111+
/// Trait for abstracting over common functionality between Join Map and Join Value lanes.
112+
pub trait JoinLikeItem<L> {
113+
/// Handler action for removing a downlink from a join lane.
114+
type RemoveDownlinkHandler<C>: HandlerAction<C, Completion = ()> + Send + 'static
115+
where
116+
C: 'static;
117+
118+
/// Create a handler that will remove a downlink from the lane and clear any entries in the
119+
/// underlying map.
120+
///
121+
/// # Arguments
122+
/// * `projection`: a projection to the join lane.
123+
/// * `link_key`: a key that signifies the downlink to remove.
124+
fn remove_downlink_handler<C: 'static>(
125+
projection: fn(&C) -> &Self,
126+
link_key: L,
127+
) -> Self::RemoveDownlinkHandler<C>;
128+
}

server/swimos_agent/src/lanes/join/map/downlink/tests.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -264,15 +264,19 @@ where
264264
fn state_for(
265265
lane: &JoinMapLane<String, i32, String>,
266266
link_key: &str,
267-
) -> (Option<Link<i32>>, HashSet<i32>) {
267+
) -> (Option<(DownlinkStatus, HashSet<i32>)>, HashSet<i32>) {
268268
let JoinMapLane { link_tracker, .. } = lane;
269269
let guard = link_tracker.borrow();
270270
let owned = guard
271271
.ownership
272272
.iter()
273273
.filter_map(|(k, v)| if v == link_key { Some(*k) } else { None })
274274
.collect();
275-
(guard.links.get(link_key).cloned(), owned)
275+
let state = guard
276+
.links
277+
.get(link_key)
278+
.map(|link| (link.status, link.keys.clone()));
279+
(state, owned)
276280
}
277281

278282
fn set_state_for(
@@ -324,7 +328,7 @@ fn run_on_linked() {
324328
} else {
325329
panic!("Events incorrect: {:?}", events);
326330
}
327-
if let (Some(Link { status, keys }), owned) = state_for(&agent.lane, "link") {
331+
if let (Some((status, keys)), owned) = state_for(&agent.lane, "link") {
328332
assert_eq!(status, DownlinkStatus::Linked);
329333
assert!(keys.is_empty());
330334
assert!(owned.is_empty());
@@ -522,7 +526,7 @@ fn run_update_linked() {
522526
assert_eq!(value, Some("a".to_string()));
523527
let (state, owned) = state_for(&agent.lane, "link");
524528
let expected = [1].into_iter().collect();
525-
if let Some(Link { status, keys }) = state {
529+
if let Some((status, keys)) = state {
526530
assert_eq!(status, DownlinkStatus::Linked);
527531
assert_eq!(keys, expected);
528532
}
@@ -572,7 +576,7 @@ fn run_remove_linked() {
572576

573577
let (state, owned) = state_for(&agent.lane, "link");
574578
let expected = [2].into_iter().collect();
575-
if let Some(Link { status, keys }) = state {
579+
if let Some((status, keys)) = state {
576580
assert_eq!(status, DownlinkStatus::Linked);
577581
assert_eq!(keys, expected);
578582
}
@@ -621,7 +625,7 @@ fn run_clear_linked() {
621625
assert!(value.is_none());
622626
let (state, owned) = state_for(&agent.lane, "link");
623627

624-
if let Some(Link { status, keys }) = state {
628+
if let Some((status, keys)) = state {
625629
assert_eq!(status, DownlinkStatus::Linked);
626630
assert!(keys.is_empty());
627631
}
@@ -672,7 +676,7 @@ fn run_take_linked() {
672676
let expected = [1].into_iter().collect();
673677
let (state, owned) = state_for(&agent.lane, "link");
674678

675-
if let Some(Link { status, keys }) = state {
679+
if let Some((status, keys)) = state {
676680
assert_eq!(status, DownlinkStatus::Linked);
677681
assert_eq!(keys, expected);
678682
}
@@ -723,7 +727,7 @@ fn run_drop_linked() {
723727
let expected = [2].into_iter().collect();
724728
let (state, owned) = state_for(&agent.lane, "link");
725729

726-
if let Some(Link { status, keys }) = state {
730+
if let Some((status, keys)) = state {
727731
assert_eq!(status, DownlinkStatus::Linked);
728732
assert_eq!(keys, expected);
729733
}
@@ -782,7 +786,7 @@ fn run_on_synced() {
782786

783787
let (state, owned) = state_for(&agent.lane, "link");
784788

785-
if let Some(Link { status, keys }) = state {
789+
if let Some((status, keys)) = state {
786790
assert_eq!(status, DownlinkStatus::Linked);
787791
assert_eq!(keys, expected);
788792
}

server/swimos_agent/src/lanes/join/map/init/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::event_handler::{DowncastError, EventHandler, JoinLaneInitializer};
2323
use crate::lanes::JoinLaneKind;
2424

2525
use super::AddDownlinkAction;
26-
//use super::AddDownlinkAction;
2726
use super::{lifecycle::JoinMapLaneLifecycle, JoinMapLane};
2827

2928
/// Uses a [`JoinMapLaneLifecycle`] to create a handler action that will open a new downlink

0 commit comments

Comments
 (0)