Skip to content

Commit c6efb05

Browse files
committed
Time series example application
1 parent 7ce4bb1 commit c6efb05

File tree

3 files changed

+31
-74
lines changed

3 files changed

+31
-74
lines changed

example_apps/time_series/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@ edition = "2021"
77
swimos = { path = "../../swimos", features = ["server", "agent"] }
88
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
99
example-util = { path = "../example_util" }
10+
futures-util = { workspace = true }
11+
swimos_form = { path = "../../api/swimos_form" }

example_apps/time_series/src/agent.rs

Lines changed: 28 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::cell::{Cell, RefCell};
15+
use std::cell::RefCell;
1616
use std::collections::{HashMap, VecDeque};
17-
use std::rc::Rc;
18-
use std::sync::atomic::{AtomicBool, Ordering};
19-
use std::sync::Arc;
20-
use std::time::{Duration, SystemTime, UNIX_EPOCH};
17+
use std::time::{SystemTime, UNIX_EPOCH};
2118

22-
use swimos::agent::event_handler::{ConstHandler, UnitHandler};
2319
use swimos::{
2420
agent::event_handler::Sequentially,
2521
agent::lanes::CommandLane,
@@ -50,14 +46,23 @@ impl ExampleLifecycle {
5046

5147
#[lifecycle(ExampleAgent, no_clone)]
5248
impl ExampleLifecycle {
49+
#[on_command(add)]
50+
pub fn add(
51+
&self,
52+
context: HandlerContext<ExampleAgent>,
53+
cmd: &str,
54+
) -> impl EventHandler<ExampleAgent> {
55+
context.update(ExampleAgent::HISTORY, now(), cmd.to_string())
56+
}
57+
5358
#[on_update(history)]
5459
pub fn on_update(
5560
&self,
5661
context: HandlerContext<ExampleAgent>,
5762
map: &HashMap<u64, String>,
5863
key: u64,
5964
_prev: Option<String>,
60-
_new_value: &String,
65+
_new_value: &str,
6166
) -> impl EventHandler<ExampleAgent> {
6267
self.policy.on_event(context, key, map.clone())
6368
}
@@ -71,19 +76,25 @@ pub enum RetentionPolicy {
7176
max: usize,
7277
},
7378
Time {
74-
start: u64,
7579
interval: u64,
7680
keys: RefCell<VecDeque<u64>>,
7781
},
78-
Recency {
79-
start: u64,
80-
max: u64,
81-
keys: RefCell<VecDeque<u64>>,
82-
scheduled: Arc<AtomicBool>,
83-
},
8482
}
8583

8684
impl RetentionPolicy {
85+
#[allow(dead_code)]
86+
pub fn count(max: usize) -> RetentionPolicy {
87+
RetentionPolicy::Count { max }
88+
}
89+
90+
#[allow(dead_code)]
91+
pub fn time(interval: u64) -> RetentionPolicy {
92+
RetentionPolicy::Time {
93+
interval,
94+
keys: RefCell::new(Default::default()),
95+
}
96+
}
97+
8798
fn on_event(
8899
&self,
89100
context: HandlerContext<ExampleAgent>,
@@ -104,18 +115,15 @@ impl RetentionPolicy {
104115
};
105116
handler.discard().boxed()
106117
}
107-
RetentionPolicy::Time {
108-
start,
109-
interval,
110-
keys,
111-
} => {
118+
RetentionPolicy::Time { interval, keys } => {
112119
let timestamps = &mut *keys.borrow_mut();
113120
timestamps.push_back(key);
114121

115122
let mut to_remove = Vec::new();
123+
let start = now();
116124

117125
timestamps.retain(|timestamp| {
118-
if *start - *timestamp > *interval {
126+
if start - *timestamp > *interval {
119127
to_remove.push(context.remove(ExampleAgent::HISTORY, *timestamp));
120128
false
121129
} else {
@@ -131,59 +139,6 @@ impl RetentionPolicy {
131139

132140
handler.discard().boxed()
133141
}
134-
RetentionPolicy::Recency {
135-
keys,
136-
scheduled,
137-
max,
138-
start,
139-
} => {
140-
let timestamps = &mut *keys.borrow_mut();
141-
timestamps.push_back(key);
142-
143-
loop {
144-
let handler = if !scheduled.swap(true, Ordering::Acquire) {
145-
let handler = context
146-
.effect(move || {
147-
scheduled.store(false, Ordering::Release);
148-
149-
let mut to_remove = Vec::new();
150-
151-
timestamps.retain(|timestamp| {
152-
if *start - *timestamp > *max {
153-
to_remove.push(*timestamp);
154-
false
155-
} else {
156-
true
157-
}
158-
});
159-
160-
to_remove
161-
})
162-
.and_then(|to_remove: Vec<u64>| {
163-
let handler = if to_remove.is_empty() {
164-
None
165-
} else {
166-
Some(Sequentially::new(to_remove.into_iter().map(
167-
|timestamp| {
168-
context.remove(ExampleAgent::HISTORY, timestamp)
169-
},
170-
)))
171-
};
172-
173-
handler.discard().boxed()
174-
});
175-
176-
Some(
177-
context
178-
.run_after(Duration::from_millis(*max), handler)
179-
.discard(),
180-
)
181-
} else {
182-
None
183-
};
184-
handler.boxed()
185-
}
186-
}
187142
}
188143
}
189144
}

example_apps/time_series/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
3131
let route = RoutePattern::parse_str("/example/:name}")?;
3232

3333
let agent = AgentModel::from_fn(ExampleAgent::default, move || {
34-
ExampleLifecycle::new(RetentionPolicy::Count { max: 100 }).into_lifecycle()
34+
ExampleLifecycle::new(RetentionPolicy::Count { max: 5 }).into_lifecycle()
3535
});
3636

3737
let server = ServerBuilder::with_plane_name("Example Plane")

0 commit comments

Comments
 (0)