Skip to content

Commit 87172a5

Browse files
authored
Merge pull request #585 from swimos/join-map-lane
Adds support for join map lanes.
2 parents 609e888 + ac91f13 commit 87172a5

File tree

73 files changed

+6083
-693
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+6083
-693
lines changed

api/swim_api/src/downlink/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ pub enum DownlinkKind {
3232
Event,
3333
/// Accepts key-value pairs and maintains a state as a map.
3434
Map,
35+
/// Accepts map updates but does not maintain any state.
36+
MapEvent,
3537
}
3638

3739
#[derive(Debug, Clone, Copy, PartialEq, Eq)]

api/swim_api/src/protocol/map/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,9 +456,9 @@ impl<K: StructuralWritable, V: StructuralWritable> Encoder<MapOperation<K, V>>
456456
}
457457
}
458458

459-
/// Repreesentation of map lane messages (used to form the body of Recon messages when operating)
459+
/// Representation of map lane messages (used to form the body of Recon messages when operating)
460460
/// on downlinks. This extends [`MapOperation`] with `Take` (retain the first `n` items) and `Drop`
461-
/// (remove teh first `n` items). We never use these internally but must support them for communicating
461+
/// (remove the first `n` items). We never use these internally but must support them for communicating
462462
/// with other implementations.
463463
#[derive(Copy, Clone, Debug, PartialEq, Eq, Form, Hash)]
464464
#[form_root(::swim_form)]

client/runtime/src/models.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl DownlinkRuntime {
163163
);
164164
runtime.run().await;
165165
}
166-
DownlinkKind::Map => {
166+
DownlinkKind::Map | DownlinkKind::MapEvent => {
167167
let strategy = if config.abort_on_bad_frames {
168168
ReportStrategy::new(AlwaysAbortStrategy).boxed()
169169
} else {

docs/agent.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ Contents
3535
* `ValueLane` events.
3636
* `MapLane` events.
3737
* `JoinValueLane` events.
38+
* `JoinMapLane` events.
3839
* `HttpLane` events.
3940
* Store events.
4041
* Borrowing from lifecycles.

docs/define.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ The derive macro for `AgentLaneModel` can be applied to any struct type where al
2727
* Demand Map lanes: `swim::agent::lanes::DemandMapLane`.
2828
* Map Lanes: `swim::agent::lanes::MapLane`.
2929
* Join Value Lanes: `swim::agent::lanes::JoinValueLane`.
30+
* Join Map Lanes: `swim::agent::lanes::JoinMapLane`.
3031
* HTTP Lanes: `swim::agent::lanes::HttpLane` (or the shorthand `swim::agent::lanes::SimpleHttpLane`).
3132

3233
The supported store types are:
@@ -50,12 +51,18 @@ struct ExampleAgent {
5051

5152
As mentioned above, all of the type parameters used in the lane types must be `Send` (which is clearly true for `i32`, `String`, and `u64`). However, to use the derive macro there is a further restriction.
5253

53-
For the macro to be able to generate the implementation, it needs to know how to serialize and deserialize the types use in the lane. This is encoded by the `swim::form::Form` trait which is covered in the following section. Additionally, for a map-like item (`MapLane<K, V>`, `MapStore<K, V>` or `JoinValueLane<K, V>`) the key type `K` must additionally satisfy:
54+
For the macro to be able to generate the implementation, it needs to know how to serialize and deserialize the types use in the lane. This is encoded by the `swim::form::Form` trait which is covered in the following section. Additionally, for a map-like item (`MapLane<K, V>`, `MapStore<K, V>`, `JoinValueLane<K, V>`, `JoinMapLane<L, K, V>`) the key type `K` must additionally satisfy:
5455

5556
```rust
5657
K: Eq + Hash + Ord + Clone
5758
```
5859

60+
For `JoinMapLane`s, the link key type `L` must satisfy:
61+
62+
```rust
63+
L: Eq + Hash + Clone
64+
```
65+
5966
Stores are effectively private alternatives to lanes. The maintain state in exactly the same was as the corresponding lane types but are not exposed externally.
6067

6168
The `Form` trait

docs/lifecycle.md

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ struct ExampleAgent {
2424
example_value: ValueLane<i32>,
2525
example_map: MapLane<String, i32>,
2626
example_join_value: JoinValueLane<String, i32>,
27+
example_join_map: JoinMapLane<String, String, i32>,
2728
example_http: SimpleHttpLane<String>,
2829
}
2930
```
@@ -212,7 +213,7 @@ Additionally, it is possible to attach events to the downlinks used for each key
212213
fn register_lifecycle(
213214
&self,
214215
context: JoinValueContext<ExampleAgent, String, i32>,
215-
) -> impl JoinValueLaneLifecycle<i32, Text, ExampleAgent> + 'static {
216+
) -> impl JoinValueLaneLifecycle<String, i32, ExampleAgent> + 'static {
216217
context.builder()
217218
.on_linked(|context, key, address| context.effect(move || {
218219
println!("Linked downlink for key: {} from lane at: {}", key, address);
@@ -246,6 +247,52 @@ The `on_unlinked` and `on_failed` handlers return a `LinkClosedResponse`. This i
246247

247248
It is possible to add a shared state to a join value lifecycle in a similar way to other downlinks types (see [Downlinks](downlink.md)). Note that this state is shared between the event handlers of a single instance of the lifecycle and not between all instances. If you have state that needs to be shared between all instances it must be stored inside an `Arc`.
248249

250+
Join map lane events
251+
--------------------
252+
253+
A join map lane supports all of the events supported by a map lane (`on_update`, `on_remove` and `on_clear`).
254+
255+
Additionally, it is possible to attach events to the downlinks used for each key of the join value lane. To do this, add the following to the agent lifecycle implementation:
256+
257+
```rust
258+
#[join_map_lifecycle(example_join_map)]
259+
fn register_lifecycle(
260+
&self,
261+
context: JoinMapContext<ExampleAgent, String, String, i32>,
262+
) -> impl JoinMapLaneLifecycle<String, String, ExampleAgent> + 'static {
263+
context.builder()
264+
.on_linked(|context, link_key, address| context.effect(move || {
265+
println!("Linked downlink for link: {} from lane at: {}", key, address);
266+
}))
267+
.done()
268+
}
269+
```
270+
271+
An instance of the lifecycle will be created for each link that is attached to the lane so it is necessary that the lifecycle is `Clone` and has a `static` lifetime.
272+
273+
Supported handlers are `on_linked`, `on_synced`, `on_unlinked` and `on_failed`. These take closures with the following signatures:
274+
275+
1. `on_linked`:
276+
```rust
277+
(context: HandlerContext<ExampleAgent>, link_key: L, address: Address<&str>) -> impl EventHandler<ExampleAgent>
278+
```
279+
2. `on_synced`:
280+
```rust
281+
(context: HandlerContext<ExampleAgent>, link_key: L, address: Address<&str>, keys: &HashSet<K>) -> impl EventHandler<ExampleAgent>
282+
```
283+
3. `on_unlinked` and `on_failed`:
284+
```rust
285+
(context: HandlerContext<ExampleAgent>, link_key: L, address: Address<&str>, keys: HashSet<K>) -> impl HandlerAction<ExampleAgent, Completion = LinkClosedResponse>
286+
```
287+
288+
The `on_unlinked` and `on_failed` handlers return a `LinkClosedResponse`. This is an enumeration with the values `Retry`, `Abandon` and `Delete`. These have the following effects:
289+
290+
1. `Retry`: An attempt will be made to reconnect the link (TODO: this is not implemented yet.)
291+
2. `Abandon`: The link will be abandoned by the entry will still remain in the map. (This is the default.)
292+
3. `Delete`: The link will be abandoned and the entry deleted from the map.
293+
294+
It is possible to add a shared state to a join value lifecycle in a similar way to other downlinks types (see [Downlinks](downlink.md)). Note that this state is shared between the event handlers of a single instance of the lifecycle and not between all instances. If you have state that needs to be shared between all instances it must be stored inside an `Arc`.
295+
249296
HTTP lane events
250297
----------------
251298

example_apps/map_store/src/agent.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,13 @@ impl ExampleLifecycle {
8484
let key = name.clone();
8585
context
8686
.get_value(ExampleAgent::LANE)
87-
.and_then(move |v| context.update_store(ExampleAgent::SAVED, key, v))
87+
.and_then(move |v| context.update(ExampleAgent::SAVED, key, v))
8888
.boxed()
8989
}
9090
Instruction::Restore(name) => {
9191
let key = name.clone();
9292
context
93-
.get_entry_store(ExampleAgent::SAVED, key)
93+
.get_entry(ExampleAgent::SAVED, key)
9494
.map(|maybe: Option<i32>| maybe.unwrap_or_default())
9595
.and_then(move |v| context.set_value(ExampleAgent::LANE, v))
9696
.boxed()

example_apps/map_store_persistence/src/agent.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ impl ExampleLifecycle {
4545
) -> impl EventHandler<ExampleAgent> {
4646
join3(
4747
context.get_agent_uri(),
48-
context.get_map_store(ExampleAgent::VALUE),
49-
context.get_map_store(ExampleAgent::TEMPORARY),
48+
context.get_map(ExampleAgent::VALUE),
49+
context.get_map(ExampleAgent::TEMPORARY),
5050
)
5151
.and_then(move |(uri, m1, m2)| {
5252
context.effect(move || {
@@ -67,8 +67,8 @@ impl ExampleLifecycle {
6767
) -> impl EventHandler<ExampleAgent> {
6868
join3(
6969
context.get_agent_uri(),
70-
context.get_map_store(ExampleAgent::VALUE),
71-
context.get_map_store(ExampleAgent::TEMPORARY),
70+
context.get_map(ExampleAgent::VALUE),
71+
context.get_map(ExampleAgent::TEMPORARY),
7272
)
7373
.and_then(move |(uri, m1, m2)| {
7474
context.effect(move || {
@@ -91,10 +91,10 @@ impl ExampleLifecycle {
9191
match cmd {
9292
Instruction::Wake => UnitHandler::default().boxed(),
9393
Instruction::SetValue { key, value } => context
94-
.update_store(ExampleAgent::VALUE, key.clone(), *value)
94+
.update(ExampleAgent::VALUE, key.clone(), *value)
9595
.boxed(),
9696
Instruction::SetTemp { key, value } => context
97-
.update_store(ExampleAgent::TEMPORARY, key.clone(), *value)
97+
.update(ExampleAgent::TEMPORARY, key.clone(), *value)
9898
.boxed(),
9999
Instruction::Stop => context.stop().boxed(),
100100
}

example_apps/transit/src/agents/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::hash::Hash;
1818
use swim::{
1919
agent::{
2020
agent_lifecycle::utility::JoinValueContext,
21-
lanes::join_value::{lifecycle::JoinValueLaneLifecycle, LinkClosedResponse},
21+
lanes::{join_value::lifecycle::JoinValueLaneLifecycle, LinkClosedResponse},
2222
},
2323
form::Form,
2424
};

example_apps/value_store/src/agent.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ impl ExampleLifecycle {
8282
match *cmd {
8383
Instruction::Save => context
8484
.get_value(ExampleAgent::LANE)
85-
.and_then(move |v| context.set_store_value(ExampleAgent::SAVED, v))
85+
.and_then(move |v| context.set_value(ExampleAgent::SAVED, v))
8686
.boxed(),
8787
Instruction::Restore => context
88-
.get_store_value(ExampleAgent::SAVED)
88+
.get_value(ExampleAgent::SAVED)
8989
.and_then(move |v| context.set_value(ExampleAgent::LANE, v))
9090
.boxed(),
9191
}

0 commit comments

Comments
 (0)