Skip to content

Commit a34e203

Browse files
committed
Adds aggregations example application
1 parent 83fcb25 commit a34e203

File tree

4 files changed

+186
-24
lines changed

4 files changed

+186
-24
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use swimos::{
16+
agent::agent_lifecycle::utility::HandlerContext,
17+
agent::event_handler::EventHandler,
18+
agent::lanes::{CommandLane, JoinValueLane},
19+
agent::AgentLaneModel,
20+
agent::{lifecycle, projections},
21+
};
22+
23+
#[derive(AgentLaneModel)]
24+
#[projections]
25+
pub struct AggregateAgent {
26+
aggregated: JoinValueLane<String, f64>,
27+
register: CommandLane<String>,
28+
}
29+
30+
#[derive(Clone, Default)]
31+
pub struct AggregateLifecycle;
32+
33+
#[lifecycle(AggregateAgent)]
34+
impl AggregateLifecycle {
35+
#[on_command(register)]
36+
pub fn register(
37+
&self,
38+
context: HandlerContext<AggregateAgent>,
39+
area_id: &String,
40+
) -> impl EventHandler<AggregateAgent> {
41+
context.add_downlink(
42+
AggregateAgent::AGGREGATED,
43+
area_id.clone(),
44+
None,
45+
format!("/area/{}", area_id).as_str(),
46+
"average_speed",
47+
)
48+
}
49+
}

example_apps/aggregations/src/area.rs

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,27 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
use std::str::FromStr;
17+
use std::sync::{Arc, Mutex};
18+
119
use rand::seq::SliceRandom;
220

321
use swimos::{
422
agent::agent_lifecycle::utility::HandlerContext,
523
agent::event_handler::EventHandler,
24+
agent::event_handler::HandlerActionExt,
625
agent::lanes::{CommandLane, JoinValueLane, ValueLane},
726
agent::lifecycle,
827
agent::projections,
@@ -17,6 +36,20 @@ pub enum Area {
1736
D,
1837
}
1938

39+
impl FromStr for Area {
40+
type Err = ();
41+
42+
fn from_str(s: &str) -> Result<Self, Self::Err> {
43+
match s {
44+
"A" => Ok(Area::A),
45+
"B" => Ok(Area::B),
46+
"C" => Ok(Area::C),
47+
"D" => Ok(Area::D),
48+
_ => Err(()),
49+
}
50+
}
51+
}
52+
2053
impl Area {
2154
pub fn random() -> Self {
2255
[Area::A, Area::B, Area::C, Area::D]
@@ -36,23 +69,42 @@ pub struct AreaAgent {
3669
average_speed: ValueLane<f64>,
3770
}
3871

39-
#[derive(Clone)]
40-
pub struct AreaLifecycle;
72+
#[derive(Clone, Default)]
73+
pub struct AreaLifecycle {
74+
area: Arc<Mutex<Option<Area>>>,
75+
}
4176

4277
#[lifecycle(AreaAgent)]
4378
impl AreaLifecycle {
79+
#[on_start]
80+
pub fn on_start(&self, context: HandlerContext<AreaAgent>) -> impl EventHandler<AreaAgent> {
81+
let area = self.area.clone();
82+
context
83+
.get_parameter("area")
84+
.and_then(move |area_id: Option<String>| {
85+
context.effect(move || {
86+
let assigned_area = &mut *area.lock().expect("Mutex poisoned");
87+
let area_str = area_id.expect("Missing area URI");
88+
*assigned_area = Area::from_str(area_str.as_str()).ok();
89+
area_str
90+
})
91+
})
92+
.and_then(move |area: String| {
93+
context.send_command(None, "/aggregate", "register", area)
94+
})
95+
}
96+
4497
#[on_command(register)]
4598
pub fn register(
4699
&self,
47100
context: HandlerContext<AreaAgent>,
48101
car_id: &u64,
49102
) -> impl EventHandler<AreaAgent> {
50-
let car_id = *car_id;
51103
context.add_downlink(
52-
AreaAgent::REGISTER,
53-
car_id,
104+
AreaAgent::CARS,
105+
*car_id,
54106
None,
55-
car_id.to_string().as_str(),
107+
format!("/cars/{car_id}").as_str(),
56108
"speed",
57109
)
58110
}
@@ -61,8 +113,23 @@ impl AreaLifecycle {
61113
pub fn deregister(
62114
&self,
63115
context: HandlerContext<AreaAgent>,
64-
_car_id: &u64,
116+
car_id: &u64,
65117
) -> impl EventHandler<AreaAgent> {
118+
context.remove_downlink(AreaAgent::CARS, *car_id)
119+
}
66120

121+
#[on_update(cars)]
122+
fn cars(
123+
&self,
124+
context: HandlerContext<AreaAgent>,
125+
speeds: &HashMap<u64, u64>,
126+
_key: u64,
127+
_prev: Option<u64>,
128+
_new_value: &u64,
129+
) -> impl EventHandler<AreaAgent> {
130+
let speeds = speeds.clone();
131+
context
132+
.effect(move || speeds.values().sum::<u64>() as f64 / speeds.len() as f64)
133+
.and_then(move |average: f64| context.set_value(AreaAgent::AVERAGE_SPEED, average))
67134
}
68135
}

example_apps/aggregations/src/car.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
use std::mem::replace;
216
use std::str::FromStr;
317
use std::sync::{Arc, Mutex};
@@ -39,13 +53,12 @@ impl CarLifecycle {
3953
pub fn on_start(&self, context: HandlerContext<CarAgent>) -> impl EventHandler<CarAgent> {
4054
let area = self.area.clone();
4155

42-
let speed_handler = context.schedule_repeatedly(Duration::from_secs(10), move || {
56+
let speed_handler = context.schedule_repeatedly(Duration::from_secs(5), move || {
4357
let mut rng = rand::rngs::OsRng::default();
4458
Some(context.set_value(CarAgent::SPEED, rng.gen_range(10..=30)))
4559
});
46-
4760
let car_handler = move |car_id: u64| {
48-
context.schedule_repeatedly(Duration::from_secs(5), move || {
61+
context.schedule_repeatedly(Duration::from_secs(1), move || {
4962
let area = area.clone();
5063
let car_id = car_id.clone();
5164

@@ -54,21 +67,21 @@ impl CarLifecycle {
5467

5568
let handler = if old_area != *assigned_area {
5669
// deregister this car with its current area
57-
let register_handler = context.send_command(
70+
let deregister_handler = context.send_command(
5871
None,
5972
format!("/area/{old_area:?}"),
6073
"deregister".to_string(),
6174
car_id.clone(),
6275
);
6376
// register this car with its new assigned area
64-
let deregister_handler = context.send_command(
77+
let register_handler = context.send_command(
6578
None,
6679
format!("/area/{:?}", *assigned_area),
6780
"register".to_string(),
6881
car_id,
6982
);
7083

71-
Some(register_handler.followed_by(deregister_handler))
84+
Some(deregister_handler.followed_by(register_handler))
7285
} else {
7386
// noop handler as the car didn't switch area
7487
None

example_apps/aggregations/src/main.rs

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,36 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
use std::error::Error;
16+
use std::str::FromStr;
217
use std::time::Duration;
318

19+
use crate::{
20+
aggregate::{AggregateAgent, AggregateLifecycle},
21+
area::{AreaAgent, AreaLifecycle},
22+
car::CarAgent,
23+
car::CarLifecycle,
24+
};
425
use example_util::{example_logging, manage_handle};
26+
use swimos::route::RouteUri;
527
use swimos::{
628
agent::agent_model::AgentModel,
729
route::RoutePattern,
830
server::{Server, ServerBuilder},
931
};
1032

11-
use crate::car::{CarAgent, CarLifecycle};
12-
33+
mod aggregate;
1334
mod area;
1435
mod car;
1536

@@ -18,26 +39,38 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1839
example_logging()?;
1940

2041
let car_agent = AgentModel::new(CarAgent::default, CarLifecycle::default().into_lifecycle());
42+
let area_agent = AgentModel::new(
43+
AreaAgent::default,
44+
AreaLifecycle::default().into_lifecycle(),
45+
);
46+
let aggregate_agent = AgentModel::new(
47+
AggregateAgent::default,
48+
AggregateLifecycle::default().into_lifecycle(),
49+
);
2150

2251
let server = ServerBuilder::with_plane_name("Example Plane")
23-
.add_route(RoutePattern::parse_str("/car/:car_id")?, car_agent)
52+
.add_route(RoutePattern::parse_str("/cars/:car_id")?, car_agent)
53+
.add_route(RoutePattern::parse_str("/area/:area")?, area_agent)
54+
.add_route(RoutePattern::parse_str("/aggregate")?, aggregate_agent)
2455
.update_config(|config| {
2556
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
2657
})
2758
.build()
2859
.await?;
2960

3061
let (task, handle) = server.run();
31-
// handle
32-
// .start_agent(RouteUri::from_str("/generator")?)
33-
// .await
34-
// .expect("Failed to start generator agent");
62+
let _task = tokio::spawn(task);
3563

36-
let shutdown = manage_handle(handle);
64+
for i in 0..1000 {
65+
let route = format!("/cars/{i}");
66+
handle
67+
.start_agent(RouteUri::from_str(route.as_str())?)
68+
.await
69+
.expect("Failed to start agent");
70+
}
3771

38-
let (_, result) = tokio::join!(shutdown, task);
39-
40-
result?;
72+
manage_handle(handle).await;
4173
println!("Server stopped successfully.");
74+
4275
Ok(())
4376
}

0 commit comments

Comments
 (0)