diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 13bbb915b..75c3571f1 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -205,6 +205,7 @@ where I::Item: Data, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, { + let mut buffer = Vec::new(); stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| { input.for_each(|time, data| { let mut session = output.session(&time); @@ -213,11 +214,19 @@ where let mut cursor = batch.cursor(); while let Some(key) = cursor.get_key(batch) { while let Some(val) = cursor.get_val(batch) { - for datum in logic(key, val) { + let mut iter = logic(key, val).into_iter().peekable(); + if iter.peek().is_some() { cursor.map_times(batch, |time, diff| { - session.give((datum.clone(), time.clone(), diff.clone())); + buffer.push((time.clone(), diff.clone())); }); + crate::consolidation::consolidate(&mut buffer); + } + for datum in iter { + for (time, diff) in buffer.iter() { + session.give((datum.clone(), time.clone(), diff.clone())); + } } + buffer.clear(); cursor.step_val(batch); } cursor.step_key(batch);