Skip to content

Commit 20202c3

Browse files
authored
Merge pull request #655 from swimos/aggregations
2 parents 7c61b96 + e3208ae commit 20202c3

File tree

6 files changed

+413
-1
lines changed

6 files changed

+413
-1
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ members = [
3838
"example_apps/tutorial_app/generator",
3939
"example_apps/join_map",
4040
"example_apps/join_value",
41+
"example_apps/aggregations",
4142
]
4243

4344
exclude = [
@@ -126,4 +127,4 @@ httparse = "1.8"
126127
sha1 = "0.10"
127128
waker-fn = "1.1.0"
128129
num = "0.4"
129-
smol_str = "0.2.0"
130+
smol_str = "0.2.0"

example_apps/aggregations/Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "aggregations"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
swimos = { path = "../../swimos", features = ["server", "agent"] }
8+
swimos_form = { path = "../../api/swimos_form" }
9+
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
10+
example-util = { path = "../example_util" }
11+
rand = { workspace = true }
12+
futures-util = { workspace = true }
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
use swimos::{
17+
agent::agent_lifecycle::utility::HandlerContext,
18+
agent::event_handler::EventHandler,
19+
agent::lanes::ValueLane,
20+
agent::lanes::{CommandLane, JoinValueLane},
21+
agent::AgentLaneModel,
22+
agent::{lifecycle, projections},
23+
};
24+
25+
#[derive(AgentLaneModel)]
26+
#[projections]
27+
pub struct AggregateAgent {
28+
aggregated: JoinValueLane<String, f64>,
29+
average_speed: ValueLane<f64>,
30+
register: CommandLane<String>,
31+
}
32+
33+
#[derive(Clone, Default)]
34+
pub struct AggregateLifecycle;
35+
36+
#[lifecycle(AggregateAgent)]
37+
impl AggregateLifecycle {
38+
#[on_update(aggregated)]
39+
fn aggregated(
40+
&self,
41+
context: HandlerContext<AggregateAgent>,
42+
averages: &HashMap<String, f64>,
43+
_key: String,
44+
_prev: Option<f64>,
45+
_new_value: &f64,
46+
) -> impl EventHandler<AggregateAgent> {
47+
let average = averages.values().sum::<f64>() / averages.len() as f64;
48+
context.set_value(AggregateAgent::AVERAGE_SPEED, average)
49+
}
50+
51+
#[on_command(register)]
52+
pub fn register(
53+
&self,
54+
context: HandlerContext<AggregateAgent>,
55+
area_id: &String,
56+
) -> impl EventHandler<AggregateAgent> {
57+
context.add_downlink(
58+
AggregateAgent::AGGREGATED,
59+
area_id.clone(),
60+
None,
61+
format!("/area/{}", area_id).as_str(),
62+
"average_speed",
63+
)
64+
}
65+
}

example_apps/aggregations/src/area.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
use std::fmt::{Display, Formatter};
17+
use std::str::FromStr;
18+
19+
use rand::seq::SliceRandom;
20+
21+
use swimos::agent::event_handler::HandlerActionExt;
22+
use swimos::{
23+
agent::agent_lifecycle::utility::HandlerContext,
24+
agent::event_handler::EventHandler,
25+
agent::lanes::{CommandLane, JoinValueLane, ValueLane},
26+
agent::lifecycle,
27+
agent::projections,
28+
agent::AgentLaneModel,
29+
};
30+
use swimos_form::Form;
31+
32+
#[derive(Debug, Copy, Clone, Eq, PartialEq, Form)]
33+
pub enum Area {
34+
Arbury,
35+
CherryHinton,
36+
KingsHedges,
37+
Petersfield,
38+
}
39+
40+
impl Area {
41+
pub fn universe() -> [Area; 4] {
42+
[
43+
Area::Arbury,
44+
Area::CherryHinton,
45+
Area::KingsHedges,
46+
Area::Petersfield,
47+
]
48+
}
49+
}
50+
51+
impl Default for Area {
52+
fn default() -> Self {
53+
Area::select_random()
54+
}
55+
}
56+
57+
impl FromStr for Area {
58+
type Err = ();
59+
60+
fn from_str(s: &str) -> Result<Self, Self::Err> {
61+
match s {
62+
"arbury" => Ok(Area::Arbury),
63+
"cherryhinton" => Ok(Area::CherryHinton),
64+
"kingshedges" => Ok(Area::KingsHedges),
65+
"petersfield" => Ok(Area::Petersfield),
66+
_ => Err(()),
67+
}
68+
}
69+
}
70+
71+
impl Display for Area {
72+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73+
match self {
74+
Area::Arbury => write!(f, "arbury"),
75+
Area::CherryHinton => write!(f, "cherryhinton"),
76+
Area::KingsHedges => write!(f, "kingshedges"),
77+
Area::Petersfield => write!(f, "petersfield"),
78+
}
79+
}
80+
}
81+
82+
impl Area {
83+
pub fn select_random() -> Self {
84+
[
85+
Area::Arbury,
86+
Area::CherryHinton,
87+
Area::KingsHedges,
88+
Area::Petersfield,
89+
]
90+
.choose(&mut rand::thread_rng())
91+
.copied()
92+
.expect("Slice was not empty")
93+
}
94+
}
95+
96+
#[derive(PartialEq, Copy, Clone, Form)]
97+
pub enum Action {
98+
Register(u64),
99+
Deregister(u64),
100+
}
101+
102+
#[derive(AgentLaneModel)]
103+
#[projections]
104+
pub struct AreaAgent {
105+
registrations: CommandLane<Action>,
106+
cars: JoinValueLane<u64, u64>,
107+
average_speed: ValueLane<f64>,
108+
}
109+
110+
#[derive(Clone, Default)]
111+
pub struct AreaLifecycle {
112+
area: Area,
113+
}
114+
115+
impl AreaLifecycle {
116+
pub fn new(area: Area) -> AreaLifecycle {
117+
AreaLifecycle { area }
118+
}
119+
}
120+
121+
#[lifecycle(AreaAgent)]
122+
impl AreaLifecycle {
123+
#[on_start]
124+
pub fn on_start(&self, context: HandlerContext<AreaAgent>) -> impl EventHandler<AreaAgent> {
125+
context.send_command(None, "/aggregate", "register", self.area.to_string())
126+
}
127+
128+
#[on_command(registrations)]
129+
pub fn registrations(
130+
&self,
131+
context: HandlerContext<AreaAgent>,
132+
action: &Action,
133+
) -> impl EventHandler<AreaAgent> {
134+
match action {
135+
Action::Register(car_id) => context
136+
.add_downlink(
137+
AreaAgent::CARS,
138+
*car_id,
139+
None,
140+
format!("/cars/{car_id}").as_str(),
141+
"speed",
142+
)
143+
.boxed(),
144+
Action::Deregister(car_id) => context.remove_downlink(AreaAgent::CARS, *car_id).boxed(),
145+
}
146+
}
147+
#[on_update(cars)]
148+
fn cars(
149+
&self,
150+
context: HandlerContext<AreaAgent>,
151+
speeds: &HashMap<u64, u64>,
152+
_key: u64,
153+
_prev: Option<u64>,
154+
_new_value: &u64,
155+
) -> impl EventHandler<AreaAgent> {
156+
let average = speeds.values().sum::<u64>() as f64 / speeds.len() as f64;
157+
context.set_value(AreaAgent::AVERAGE_SPEED, average)
158+
}
159+
}

example_apps/aggregations/src/car.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::str::FromStr;
16+
use std::time::Duration;
17+
18+
use rand::Rng;
19+
20+
use swimos::agent::stores::ValueStore;
21+
use swimos::{
22+
agent::agent_lifecycle::utility::HandlerContext,
23+
agent::event_handler::{EventHandler, HandlerActionExt},
24+
agent::lanes::ValueLane,
25+
agent::{lifecycle, projections, AgentLaneModel},
26+
};
27+
28+
use crate::area::{Action, Area};
29+
30+
#[derive(AgentLaneModel)]
31+
#[projections]
32+
pub struct CarAgent {
33+
speed: ValueLane<u64>,
34+
#[item(transient)]
35+
area: ValueStore<Area>,
36+
}
37+
38+
#[derive(Debug, Clone)]
39+
pub struct CarLifecycle;
40+
41+
#[lifecycle(CarAgent)]
42+
impl CarLifecycle {
43+
#[on_start]
44+
pub fn on_start(&self, context: HandlerContext<CarAgent>) -> impl EventHandler<CarAgent> {
45+
let speed_handler = context.schedule_repeatedly(Duration::from_secs(5), move || {
46+
let mut rng = rand::rngs::OsRng;
47+
Some(context.set_value(CarAgent::SPEED, rng.gen_range(10..=70)))
48+
});
49+
let area_handler = move |car_id: u64| {
50+
context.schedule_repeatedly(Duration::from_secs(5), move || {
51+
Some(context.get_value(CarAgent::AREA).and_then(move |old_area| {
52+
let new_area = Area::select_random();
53+
let handler = if old_area != new_area {
54+
// deregister this car with its current area
55+
let deregister_handler = context.send_command(
56+
None,
57+
format!("/area/{old_area}"),
58+
"registrations".to_string(),
59+
Action::Deregister(car_id),
60+
);
61+
// register this car with its new assigned area
62+
let register_handler = context.send_command(
63+
None,
64+
format!("/area/{new_area}"),
65+
"registrations".to_string(),
66+
Action::Register(car_id),
67+
);
68+
69+
let handler = deregister_handler
70+
.followed_by(register_handler)
71+
.followed_by(context.set_value(CarAgent::AREA, new_area));
72+
Some(handler)
73+
} else {
74+
// noop handler as the car didn't switch area
75+
None
76+
};
77+
78+
handler.discard()
79+
}))
80+
})
81+
};
82+
83+
context
84+
.get_parameter("car_id")
85+
.map(|param: Option<String>| {
86+
let car_id = param.expect("Missing car_id URI parameter");
87+
u64::from_str(car_id.as_str()).expect("Failed to parse car ID into u64")
88+
})
89+
.and_then(area_handler)
90+
.followed_by(speed_handler)
91+
}
92+
}

0 commit comments

Comments
 (0)