Skip to content

Commit 8ab22cc

Browse files
committed
Reworks count-based retention policy
1 parent eded047 commit 8ab22cc

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

example_apps/time_series/src/agent.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,12 @@ impl ExampleLifecycle {
5959
pub fn on_update(
6060
&self,
6161
context: HandlerContext<ExampleAgent>,
62-
map: &HashMap<u64, String>,
62+
_map: &HashMap<u64, String>,
6363
key: u64,
6464
_prev: Option<String>,
6565
_new_value: &str,
6666
) -> impl EventHandler<ExampleAgent> {
67-
self.policy.on_event(context, key, map.clone())
67+
self.policy.on_event(context, key)
6868
}
6969
}
7070

@@ -74,6 +74,7 @@ impl ExampleLifecycle {
7474
pub enum RetentionPolicy {
7575
Count {
7676
max: usize,
77+
keys: RefCell<VecDeque<u64>>,
7778
},
7879
Time {
7980
interval: u64,
@@ -84,7 +85,10 @@ pub enum RetentionPolicy {
8485
impl RetentionPolicy {
8586
#[allow(dead_code)]
8687
pub fn count(max: usize) -> RetentionPolicy {
87-
RetentionPolicy::Count { max }
88+
RetentionPolicy::Count {
89+
max,
90+
keys: RefCell::new(Default::default()),
91+
}
8892
}
8993

9094
#[allow(dead_code)]
@@ -99,16 +103,19 @@ impl RetentionPolicy {
99103
&self,
100104
context: HandlerContext<ExampleAgent>,
101105
key: u64,
102-
map: HashMap<u64, String>,
103106
) -> impl EventHandler<ExampleAgent> {
104107
match self {
105-
RetentionPolicy::Count { max } => {
106-
let drop = map.len().checked_sub(*max).unwrap_or_default();
108+
RetentionPolicy::Count { max, keys } => {
109+
let timestamps = &mut *keys.borrow_mut();
110+
timestamps.push_front(key);
111+
112+
let drop = timestamps.len().checked_sub(*max).unwrap_or_default();
107113
let handler = if drop > 0 {
108-
let keys = map
114+
let keys = timestamps
115+
.split_off(drop)
109116
.into_iter()
110117
.take(drop)
111-
.map(move |(key, _value)| context.remove(ExampleAgent::HISTORY, key));
118+
.map(move |key| context.remove(ExampleAgent::HISTORY, key));
112119
Some(Sequentially::new(keys))
113120
} else {
114121
None

example_apps/time_series/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
3131
let route = RoutePattern::parse_str("/example/:name}")?;
3232

3333
let agent = AgentModel::from_fn(ExampleAgent::default, move || {
34-
ExampleLifecycle::new(RetentionPolicy::Count { max: 5 }).into_lifecycle()
34+
ExampleLifecycle::new(RetentionPolicy::count(5)).into_lifecycle()
3535
});
3636

3737
let server = ServerBuilder::with_plane_name("Example Plane")

0 commit comments

Comments
 (0)