Skip to content

Commit e3208ae

Browse files
committed
Resolves PR comments
1 parent 787962b commit e3208ae

File tree

5 files changed

+136
-113
lines changed

5 files changed

+136
-113
lines changed

example_apps/aggregations/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ edition = "2021"
55

66
[dependencies]
77
swimos = { path = "../../swimos", features = ["server", "agent"] }
8+
swimos_form = { path = "../../api/swimos_form" }
89
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
910
example-util = { path = "../example_util" }
1011
rand = { workspace = true }

example_apps/aggregations/src/aggregate.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::collections::HashMap;
1616
use swimos::{
1717
agent::agent_lifecycle::utility::HandlerContext,
1818
agent::event_handler::EventHandler,
19-
agent::event_handler::HandlerActionExt,
2019
agent::lanes::ValueLane,
2120
agent::lanes::{CommandLane, JoinValueLane},
2221
agent::AgentLaneModel,
@@ -45,10 +44,8 @@ impl AggregateLifecycle {
4544
_prev: Option<f64>,
4645
_new_value: &f64,
4746
) -> impl EventHandler<AggregateAgent> {
48-
let averages = averages.clone();
49-
context
50-
.effect(move || averages.values().sum::<f64>() / averages.len() as f64)
51-
.and_then(move |average: f64| context.set_value(AggregateAgent::AVERAGE_SPEED, average))
47+
let average = averages.values().sum::<f64>() / averages.len() as f64;
48+
context.set_value(AggregateAgent::AVERAGE_SPEED, average)
5249
}
5350

5451
#[on_command(register)]

example_apps/aggregations/src/area.rs

Lines changed: 82 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -13,110 +13,137 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16+
use std::fmt::{Display, Formatter};
1617
use std::str::FromStr;
17-
use std::sync::{Arc, Mutex};
1818

1919
use rand::seq::SliceRandom;
2020

21+
use swimos::agent::event_handler::HandlerActionExt;
2122
use swimos::{
2223
agent::agent_lifecycle::utility::HandlerContext,
2324
agent::event_handler::EventHandler,
24-
agent::event_handler::HandlerActionExt,
2525
agent::lanes::{CommandLane, JoinValueLane, ValueLane},
2626
agent::lifecycle,
2727
agent::projections,
2828
agent::AgentLaneModel,
2929
};
30+
use swimos_form::Form;
3031

31-
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
32+
#[derive(Debug, Copy, Clone, Eq, PartialEq, Form)]
3233
pub enum Area {
33-
A,
34-
B,
35-
C,
36-
D,
34+
Arbury,
35+
CherryHinton,
36+
KingsHedges,
37+
Petersfield,
38+
}
39+
40+
impl Area {
41+
pub fn universe() -> [Area; 4] {
42+
[
43+
Area::Arbury,
44+
Area::CherryHinton,
45+
Area::KingsHedges,
46+
Area::Petersfield,
47+
]
48+
}
49+
}
50+
51+
impl Default for Area {
52+
fn default() -> Self {
53+
Area::select_random()
54+
}
3755
}
3856

3957
impl FromStr for Area {
4058
type Err = ();
4159

4260
fn from_str(s: &str) -> Result<Self, Self::Err> {
4361
match s {
44-
"A" => Ok(Area::A),
45-
"B" => Ok(Area::B),
46-
"C" => Ok(Area::C),
47-
"D" => Ok(Area::D),
62+
"arbury" => Ok(Area::Arbury),
63+
"cherryhinton" => Ok(Area::CherryHinton),
64+
"kingshedges" => Ok(Area::KingsHedges),
65+
"petersfield" => Ok(Area::Petersfield),
4866
_ => Err(()),
4967
}
5068
}
5169
}
5270

71+
impl Display for Area {
72+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73+
match self {
74+
Area::Arbury => write!(f, "arbury"),
75+
Area::CherryHinton => write!(f, "cherryhinton"),
76+
Area::KingsHedges => write!(f, "kingshedges"),
77+
Area::Petersfield => write!(f, "petersfield"),
78+
}
79+
}
80+
}
81+
5382
impl Area {
54-
pub fn random() -> Self {
55-
[Area::A, Area::B, Area::C, Area::D]
56-
.choose(&mut rand::thread_rng())
57-
.copied()
58-
.expect("Slice was not empty")
83+
pub fn select_random() -> Self {
84+
[
85+
Area::Arbury,
86+
Area::CherryHinton,
87+
Area::KingsHedges,
88+
Area::Petersfield,
89+
]
90+
.choose(&mut rand::thread_rng())
91+
.copied()
92+
.expect("Slice was not empty")
5993
}
6094
}
6195

96+
#[derive(PartialEq, Copy, Clone, Form)]
97+
pub enum Action {
98+
Register(u64),
99+
Deregister(u64),
100+
}
101+
62102
#[derive(AgentLaneModel)]
63103
#[projections]
64104
pub struct AreaAgent {
65-
register: CommandLane<u64>,
66-
deregister: CommandLane<u64>,
105+
registrations: CommandLane<Action>,
67106
cars: JoinValueLane<u64, u64>,
68107
average_speed: ValueLane<f64>,
69108
}
70109

71110
#[derive(Clone, Default)]
72111
pub struct AreaLifecycle {
73-
area: Arc<Mutex<Option<Area>>>,
112+
area: Area,
113+
}
114+
115+
impl AreaLifecycle {
116+
pub fn new(area: Area) -> AreaLifecycle {
117+
AreaLifecycle { area }
118+
}
74119
}
75120

76121
#[lifecycle(AreaAgent)]
77122
impl AreaLifecycle {
78123
#[on_start]
79124
pub fn on_start(&self, context: HandlerContext<AreaAgent>) -> impl EventHandler<AreaAgent> {
80-
let area = self.area.clone();
81-
context
82-
.get_parameter("area")
83-
.and_then(move |area_id: Option<String>| {
84-
context.effect(move || {
85-
let assigned_area = &mut *area.lock().expect("Mutex poisoned");
86-
let area_str = area_id.expect("Missing area URI");
87-
*assigned_area = Area::from_str(area_str.as_str()).ok();
88-
area_str
89-
})
90-
})
91-
.and_then(move |area: String| {
92-
context.send_command(None, "/aggregate", "register", area)
93-
})
94-
}
95-
96-
#[on_command(register)]
97-
pub fn register(
98-
&self,
99-
context: HandlerContext<AreaAgent>,
100-
car_id: &u64,
101-
) -> impl EventHandler<AreaAgent> {
102-
context.add_downlink(
103-
AreaAgent::CARS,
104-
*car_id,
105-
None,
106-
format!("/cars/{car_id}").as_str(),
107-
"speed",
108-
)
125+
context.send_command(None, "/aggregate", "register", self.area.to_string())
109126
}
110127

111-
#[on_command(deregister)]
112-
pub fn deregister(
128+
#[on_command(registrations)]
129+
pub fn registrations(
113130
&self,
114131
context: HandlerContext<AreaAgent>,
115-
car_id: &u64,
132+
action: &Action,
116133
) -> impl EventHandler<AreaAgent> {
117-
context.remove_downlink(AreaAgent::CARS, *car_id)
134+
match action {
135+
Action::Register(car_id) => context
136+
.add_downlink(
137+
AreaAgent::CARS,
138+
*car_id,
139+
None,
140+
format!("/cars/{car_id}").as_str(),
141+
"speed",
142+
)
143+
.boxed(),
144+
Action::Deregister(car_id) => context.remove_downlink(AreaAgent::CARS, *car_id).boxed(),
145+
}
118146
}
119-
120147
#[on_update(cars)]
121148
fn cars(
122149
&self,
@@ -126,9 +153,7 @@ impl AreaLifecycle {
126153
_prev: Option<u64>,
127154
_new_value: &u64,
128155
) -> impl EventHandler<AreaAgent> {
129-
let speeds = speeds.clone();
130-
context
131-
.effect(move || speeds.values().sum::<u64>() as f64 / speeds.len() as f64)
132-
.and_then(move |average: f64| context.set_value(AreaAgent::AVERAGE_SPEED, average))
156+
let average = speeds.values().sum::<u64>() as f64 / speeds.len() as f64;
157+
context.set_value(AreaAgent::AVERAGE_SPEED, average)
133158
}
134159
}

example_apps/aggregations/src/car.rs

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -12,80 +12,71 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::mem::replace;
1615
use std::str::FromStr;
17-
use std::sync::{Arc, Mutex};
1816
use std::time::Duration;
1917

2018
use rand::Rng;
2119

20+
use swimos::agent::stores::ValueStore;
2221
use swimos::{
2322
agent::agent_lifecycle::utility::HandlerContext,
2423
agent::event_handler::{EventHandler, HandlerActionExt},
2524
agent::lanes::ValueLane,
2625
agent::{lifecycle, projections, AgentLaneModel},
2726
};
2827

29-
use crate::area::Area;
28+
use crate::area::{Action, Area};
3029

3130
#[derive(AgentLaneModel)]
3231
#[projections]
3332
pub struct CarAgent {
3433
speed: ValueLane<u64>,
34+
#[item(transient)]
35+
area: ValueStore<Area>,
3536
}
3637

3738
#[derive(Debug, Clone)]
38-
pub struct CarLifecycle {
39-
area: Arc<Mutex<Area>>,
40-
}
41-
42-
impl Default for CarLifecycle {
43-
fn default() -> Self {
44-
CarLifecycle {
45-
area: Arc::new(Mutex::new(Area::random())),
46-
}
47-
}
48-
}
39+
pub struct CarLifecycle;
4940

5041
#[lifecycle(CarAgent)]
5142
impl CarLifecycle {
5243
#[on_start]
5344
pub fn on_start(&self, context: HandlerContext<CarAgent>) -> impl EventHandler<CarAgent> {
54-
let area = self.area.clone();
55-
5645
let speed_handler = context.schedule_repeatedly(Duration::from_secs(5), move || {
5746
let mut rng = rand::rngs::OsRng;
5847
Some(context.set_value(CarAgent::SPEED, rng.gen_range(10..=70)))
5948
});
6049
let area_handler = move |car_id: u64| {
6150
context.schedule_repeatedly(Duration::from_secs(5), move || {
62-
let area = area.clone();
63-
let assigned_area = &mut *area.lock().expect("Mutex poisoned");
64-
let old_area = replace(assigned_area, Area::random());
65-
66-
let handler = if old_area != *assigned_area {
67-
// deregister this car with its current area
68-
let deregister_handler = context.send_command(
69-
None,
70-
format!("/area/{old_area:?}"),
71-
"deregister".to_string(),
72-
car_id,
73-
);
74-
// register this car with its new assigned area
75-
let register_handler = context.send_command(
76-
None,
77-
format!("/area/{:?}", *assigned_area),
78-
"register".to_string(),
79-
car_id,
80-
);
51+
Some(context.get_value(CarAgent::AREA).and_then(move |old_area| {
52+
let new_area = Area::select_random();
53+
let handler = if old_area != new_area {
54+
// deregister this car with its current area
55+
let deregister_handler = context.send_command(
56+
None,
57+
format!("/area/{old_area}"),
58+
"registrations".to_string(),
59+
Action::Deregister(car_id),
60+
);
61+
// register this car with its new assigned area
62+
let register_handler = context.send_command(
63+
None,
64+
format!("/area/{new_area}"),
65+
"registrations".to_string(),
66+
Action::Register(car_id),
67+
);
8168

82-
Some(deregister_handler.followed_by(register_handler))
83-
} else {
84-
// noop handler as the car didn't switch area
85-
None
86-
};
69+
let handler = deregister_handler
70+
.followed_by(register_handler)
71+
.followed_by(context.set_value(CarAgent::AREA, new_area));
72+
Some(handler)
73+
} else {
74+
// noop handler as the car didn't switch area
75+
None
76+
};
8777

88-
Some(handler.discard())
78+
handler.discard()
79+
}))
8980
})
9081
};
9182

example_apps/aggregations/src/main.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::error::Error;
1616
use std::str::FromStr;
1717
use std::time::Duration;
1818

19+
use crate::area::Area;
1920
use crate::{
2021
aggregate::{AggregateAgent, AggregateLifecycle},
2122
area::{AreaAgent, AreaLifecycle},
@@ -38,23 +39,31 @@ mod car;
3839
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
3940
example_logging()?;
4041

41-
let car_agent = AgentModel::new(CarAgent::default, CarLifecycle::default().into_lifecycle());
42-
let area_agent = AgentModel::new(
43-
AreaAgent::default,
44-
AreaLifecycle::default().into_lifecycle(),
45-
);
42+
let car_agent = AgentModel::new(CarAgent::default, CarLifecycle.into_lifecycle());
43+
let area_agent = move |name| {
44+
AgentModel::new(
45+
AreaAgent::default,
46+
AreaLifecycle::new(name).into_lifecycle(),
47+
)
48+
};
4649
let aggregate_agent =
4750
AgentModel::new(AggregateAgent::default, AggregateLifecycle.into_lifecycle());
4851

49-
let server = ServerBuilder::with_plane_name("Example Plane")
52+
let mut builder = ServerBuilder::with_plane_name("Example Plane")
5053
.add_route(RoutePattern::parse_str("/cars/:car_id")?, car_agent)
51-
.add_route(RoutePattern::parse_str("/area/:area")?, area_agent)
5254
.add_route(RoutePattern::parse_str("/aggregate")?, aggregate_agent)
5355
.update_config(|config| {
5456
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
55-
})
56-
.build()
57-
.await?;
57+
});
58+
59+
for area in Area::universe() {
60+
builder = builder.add_route(
61+
RoutePattern::parse_str(format!("/area/{}", area).as_str())?,
62+
area_agent(area),
63+
);
64+
}
65+
66+
let server = builder.build().await?;
5867

5968
let (task, handle) = server.run();
6069
let _task = tokio::spawn(task);

0 commit comments

Comments
 (0)