Skip to content

Commit 36cd081

Browse files
authored
Merge pull request #660 from swimos/time_series
Data windowing example application
2 parents 295d860 + ff8f5ca commit 36cd081

File tree

5 files changed

+270
-0
lines changed

5 files changed

+270
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ members = [
3939
"example_apps/join_map",
4040
"example_apps/join_value",
4141
"example_apps/aggregations",
42+
"example_apps/time_series",
4243
]
4344

4445
exclude = [

example_apps/time_series/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "time_series"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
swimos = { path = "../../swimos", features = ["server", "agent"] }
8+
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
9+
example-util = { path = "../example_util" }
10+
futures-util = { workspace = true }
11+
swimos_form = { path = "../../api/swimos_form" }

example_apps/time_series/src/count.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::cell::RefCell;
16+
use std::collections::{HashMap, VecDeque};
17+
use std::time::{SystemTime, UNIX_EPOCH};
18+
19+
use swimos::{
20+
agent::event_handler::Sequentially,
21+
agent::lanes::CommandLane,
22+
agent::lanes::MapLane,
23+
agent::{
24+
agent_lifecycle::HandlerContext,
25+
event_handler::{EventHandler, HandlerActionExt},
26+
lifecycle, projections, AgentLaneModel,
27+
},
28+
};
29+
30+
#[derive(AgentLaneModel)]
31+
#[projections]
32+
pub struct CountAgent {
33+
history: MapLane<u64, String>,
34+
add: CommandLane<String>,
35+
}
36+
37+
pub struct CountLifecycle {
38+
max: usize,
39+
keys: RefCell<VecDeque<u64>>,
40+
}
41+
42+
impl CountLifecycle {
43+
pub fn new(max: usize) -> CountLifecycle {
44+
CountLifecycle {
45+
max,
46+
keys: Default::default(),
47+
}
48+
}
49+
}
50+
51+
#[lifecycle(CountAgent, no_clone)]
52+
impl CountLifecycle {
53+
#[on_command(add)]
54+
pub fn add(
55+
&self,
56+
context: HandlerContext<CountAgent>,
57+
cmd: &str,
58+
) -> impl EventHandler<CountAgent> {
59+
let now = SystemTime::now()
60+
.duration_since(UNIX_EPOCH)
61+
.unwrap()
62+
.as_millis() as u64;
63+
context.update(CountAgent::HISTORY, now, cmd.to_string())
64+
}
65+
66+
#[on_update(history)]
67+
pub fn on_update(
68+
&self,
69+
context: HandlerContext<CountAgent>,
70+
_map: &HashMap<u64, String>,
71+
key: u64,
72+
_prev: Option<String>,
73+
_new_value: &str,
74+
) -> impl EventHandler<CountAgent> {
75+
let CountLifecycle { max, keys } = self;
76+
let timestamps = &mut *keys.borrow_mut();
77+
timestamps.push_front(key);
78+
79+
let len = timestamps.len();
80+
let to_drop = if len > *max { len - *max } else { 0 };
81+
82+
let handler = if to_drop > 0 {
83+
let keys = timestamps
84+
.split_off(to_drop)
85+
.into_iter()
86+
.take(to_drop)
87+
.map(move |key| context.remove(CountAgent::HISTORY, key));
88+
Some(Sequentially::new(keys))
89+
} else {
90+
None
91+
};
92+
93+
handler.discard()
94+
}
95+
}

example_apps/time_series/src/main.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::{error::Error, time::Duration};
16+
17+
use example_util::{example_logging, manage_handle};
18+
use swimos::{
19+
agent::agent_model::AgentModel,
20+
route::RoutePattern,
21+
server::{Server, ServerBuilder},
22+
};
23+
24+
use crate::count::{CountAgent, CountLifecycle};
25+
use crate::time::{TimeAgent, TimeLifecycle};
26+
27+
mod count;
28+
mod time;
29+
30+
#[tokio::main]
31+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
32+
example_logging()?;
33+
34+
let count_route = RoutePattern::parse_str("/count/:name}")?;
35+
let count_agent = AgentModel::from_fn(CountAgent::default, move || {
36+
CountLifecycle::new(5).into_lifecycle()
37+
});
38+
39+
let time_route = RoutePattern::parse_str("/time/:name}")?;
40+
let time_agent = AgentModel::from_fn(TimeAgent::default, move || {
41+
TimeLifecycle::new(13).into_lifecycle()
42+
});
43+
44+
let server = ServerBuilder::with_plane_name("Example Plane")
45+
.add_route(count_route, count_agent)
46+
.add_route(time_route, time_agent)
47+
.update_config(|config| {
48+
config.agent_runtime.inactive_timeout = Duration::from_secs(5 * 60);
49+
})
50+
.build()
51+
.await?;
52+
53+
let (task, handle) = server.run();
54+
55+
let shutdown = manage_handle(handle);
56+
57+
let (_, result) = tokio::join!(shutdown, task);
58+
59+
result?;
60+
println!("Server stopped successfully.");
61+
Ok(())
62+
}

example_apps/time_series/src/time.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::cell::RefCell;
16+
use std::collections::{HashMap, VecDeque};
17+
use std::time::{SystemTime, UNIX_EPOCH};
18+
19+
use swimos::{
20+
agent::event_handler::Sequentially,
21+
agent::lanes::CommandLane,
22+
agent::lanes::MapLane,
23+
agent::{
24+
agent_lifecycle::HandlerContext,
25+
event_handler::{EventHandler, HandlerActionExt},
26+
lifecycle, projections, AgentLaneModel,
27+
},
28+
};
29+
30+
#[derive(AgentLaneModel)]
31+
#[projections]
32+
pub struct TimeAgent {
33+
history: MapLane<u64, String>,
34+
add: CommandLane<String>,
35+
}
36+
37+
pub struct TimeLifecycle {
38+
interval: u64,
39+
keys: RefCell<VecDeque<u64>>,
40+
}
41+
42+
impl TimeLifecycle {
43+
pub fn new(interval: u64) -> TimeLifecycle {
44+
TimeLifecycle {
45+
interval,
46+
keys: Default::default(),
47+
}
48+
}
49+
}
50+
51+
#[lifecycle(TimeAgent, no_clone)]
52+
impl TimeLifecycle {
53+
#[on_command(add)]
54+
pub fn add(
55+
&self,
56+
context: HandlerContext<TimeAgent>,
57+
cmd: &str,
58+
) -> impl EventHandler<TimeAgent> {
59+
let now = SystemTime::now()
60+
.duration_since(UNIX_EPOCH)
61+
.unwrap()
62+
.as_millis() as u64;
63+
context.update(TimeAgent::HISTORY, now, cmd.to_string())
64+
}
65+
66+
#[on_update(history)]
67+
pub fn on_update(
68+
&self,
69+
context: HandlerContext<TimeAgent>,
70+
_map: &HashMap<u64, String>,
71+
key: u64,
72+
_prev: Option<String>,
73+
_new_value: &str,
74+
) -> impl EventHandler<TimeAgent> {
75+
let TimeLifecycle { interval, keys } = self;
76+
let timestamps = &mut *keys.borrow_mut();
77+
timestamps.push_back(key);
78+
let mut to_remove = Vec::new();
79+
80+
let start = SystemTime::now()
81+
.duration_since(UNIX_EPOCH)
82+
.unwrap()
83+
.as_millis() as u64;
84+
85+
timestamps.retain(|timestamp| {
86+
if start - *timestamp > *interval {
87+
to_remove.push(context.remove(TimeAgent::HISTORY, *timestamp));
88+
false
89+
} else {
90+
true
91+
}
92+
});
93+
let handler = if to_remove.is_empty() {
94+
None
95+
} else {
96+
Some(Sequentially::new(to_remove))
97+
};
98+
99+
handler.discard()
100+
}
101+
}

0 commit comments

Comments
 (0)