Skip to content

Commit ff8f5ca

Browse files
committed
Resolves PR comments
1 parent 8ab22cc commit ff8f5ca

File tree

4 files changed

+210
-164
lines changed

4 files changed

+210
-164
lines changed

example_apps/time_series/src/agent.rs

Lines changed: 0 additions & 158 deletions
This file was deleted.

example_apps/time_series/src/count.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::cell::RefCell;
16+
use std::collections::{HashMap, VecDeque};
17+
use std::time::{SystemTime, UNIX_EPOCH};
18+
19+
use swimos::{
20+
agent::event_handler::Sequentially,
21+
agent::lanes::CommandLane,
22+
agent::lanes::MapLane,
23+
agent::{
24+
agent_lifecycle::HandlerContext,
25+
event_handler::{EventHandler, HandlerActionExt},
26+
lifecycle, projections, AgentLaneModel,
27+
},
28+
};
29+
30+
#[derive(AgentLaneModel)]
31+
#[projections]
32+
pub struct CountAgent {
33+
history: MapLane<u64, String>,
34+
add: CommandLane<String>,
35+
}
36+
37+
pub struct CountLifecycle {
38+
max: usize,
39+
keys: RefCell<VecDeque<u64>>,
40+
}
41+
42+
impl CountLifecycle {
43+
pub fn new(max: usize) -> CountLifecycle {
44+
CountLifecycle {
45+
max,
46+
keys: Default::default(),
47+
}
48+
}
49+
}
50+
51+
#[lifecycle(CountAgent, no_clone)]
52+
impl CountLifecycle {
53+
#[on_command(add)]
54+
pub fn add(
55+
&self,
56+
context: HandlerContext<CountAgent>,
57+
cmd: &str,
58+
) -> impl EventHandler<CountAgent> {
59+
let now = SystemTime::now()
60+
.duration_since(UNIX_EPOCH)
61+
.unwrap()
62+
.as_millis() as u64;
63+
context.update(CountAgent::HISTORY, now, cmd.to_string())
64+
}
65+
66+
#[on_update(history)]
67+
pub fn on_update(
68+
&self,
69+
context: HandlerContext<CountAgent>,
70+
_map: &HashMap<u64, String>,
71+
key: u64,
72+
_prev: Option<String>,
73+
_new_value: &str,
74+
) -> impl EventHandler<CountAgent> {
75+
let CountLifecycle { max, keys } = self;
76+
let timestamps = &mut *keys.borrow_mut();
77+
timestamps.push_front(key);
78+
79+
let len = timestamps.len();
80+
let to_drop = if len > *max { len - *max } else { 0 };
81+
82+
let handler = if to_drop > 0 {
83+
let keys = timestamps
84+
.split_off(to_drop)
85+
.into_iter()
86+
.take(to_drop)
87+
.map(move |key| context.remove(CountAgent::HISTORY, key));
88+
Some(Sequentially::new(keys))
89+
} else {
90+
None
91+
};
92+
93+
handler.discard()
94+
}
95+
}

example_apps/time_series/src/main.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,29 @@ use swimos::{
2121
server::{Server, ServerBuilder},
2222
};
2323

24-
use crate::agent::{ExampleAgent, ExampleLifecycle, RetentionPolicy};
24+
use crate::count::{CountAgent, CountLifecycle};
25+
use crate::time::{TimeAgent, TimeLifecycle};
2526

26-
mod agent;
27+
mod count;
28+
mod time;
2729

2830
#[tokio::main]
2931
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
3032
example_logging()?;
31-
let route = RoutePattern::parse_str("/example/:name}")?;
3233

33-
let agent = AgentModel::from_fn(ExampleAgent::default, move || {
34-
ExampleLifecycle::new(RetentionPolicy::count(5)).into_lifecycle()
34+
let count_route = RoutePattern::parse_str("/count/:name}")?;
35+
let count_agent = AgentModel::from_fn(CountAgent::default, move || {
36+
CountLifecycle::new(5).into_lifecycle()
37+
});
38+
39+
let time_route = RoutePattern::parse_str("/time/:name}")?;
40+
let time_agent = AgentModel::from_fn(TimeAgent::default, move || {
41+
TimeLifecycle::new(13).into_lifecycle()
3542
});
3643

3744
let server = ServerBuilder::with_plane_name("Example Plane")
38-
.add_route(route, agent)
45+
.add_route(count_route, count_agent)
46+
.add_route(time_route, time_agent)
3947
.update_config(|config| {
4048
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
4149
})

example_apps/time_series/src/time.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::cell::RefCell;
16+
use std::collections::{HashMap, VecDeque};
17+
use std::time::{SystemTime, UNIX_EPOCH};
18+
19+
use swimos::{
20+
agent::event_handler::Sequentially,
21+
agent::lanes::CommandLane,
22+
agent::lanes::MapLane,
23+
agent::{
24+
agent_lifecycle::HandlerContext,
25+
event_handler::{EventHandler, HandlerActionExt},
26+
lifecycle, projections, AgentLaneModel,
27+
},
28+
};
29+
30+
#[derive(AgentLaneModel)]
31+
#[projections]
32+
pub struct TimeAgent {
33+
history: MapLane<u64, String>,
34+
add: CommandLane<String>,
35+
}
36+
37+
pub struct TimeLifecycle {
38+
interval: u64,
39+
keys: RefCell<VecDeque<u64>>,
40+
}
41+
42+
impl TimeLifecycle {
43+
pub fn new(interval: u64) -> TimeLifecycle {
44+
TimeLifecycle {
45+
interval,
46+
keys: Default::default(),
47+
}
48+
}
49+
}
50+
51+
#[lifecycle(TimeAgent, no_clone)]
52+
impl TimeLifecycle {
53+
#[on_command(add)]
54+
pub fn add(
55+
&self,
56+
context: HandlerContext<TimeAgent>,
57+
cmd: &str,
58+
) -> impl EventHandler<TimeAgent> {
59+
let now = SystemTime::now()
60+
.duration_since(UNIX_EPOCH)
61+
.unwrap()
62+
.as_millis() as u64;
63+
context.update(TimeAgent::HISTORY, now, cmd.to_string())
64+
}
65+
66+
#[on_update(history)]
67+
pub fn on_update(
68+
&self,
69+
context: HandlerContext<TimeAgent>,
70+
_map: &HashMap<u64, String>,
71+
key: u64,
72+
_prev: Option<String>,
73+
_new_value: &str,
74+
) -> impl EventHandler<TimeAgent> {
75+
let TimeLifecycle { interval, keys } = self;
76+
let timestamps = &mut *keys.borrow_mut();
77+
timestamps.push_back(key);
78+
let mut to_remove = Vec::new();
79+
80+
let start = SystemTime::now()
81+
.duration_since(UNIX_EPOCH)
82+
.unwrap()
83+
.as_millis() as u64;
84+
85+
timestamps.retain(|timestamp| {
86+
if start - *timestamp > *interval {
87+
to_remove.push(context.remove(TimeAgent::HISTORY, *timestamp));
88+
false
89+
} else {
90+
true
91+
}
92+
});
93+
let handler = if to_remove.is_empty() {
94+
None
95+
} else {
96+
Some(Sequentially::new(to_remove))
97+
};
98+
99+
handler.discard()
100+
}
101+
}

0 commit comments

Comments
 (0)