diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 52d5e141f..9c59871be 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -25,7 +25,7 @@ fn main() { let size: usize = std::env::args().nth(2).expect("missing argument 2").parse().unwrap(); let timer1 = ::std::time::Instant::now(); - let timer2 = timer1.clone(); + let timer2 = timer1; // initializes and runs a timely dataflow. // timely::execute(_config, move |worker| { @@ -33,7 +33,7 @@ fn main() { let mut data_input = >>::new(); let mut keys_input = >>::new(); - let mut probe = ProbeHandle::new(); + let probe = ProbeHandle::new(); // create a new input, exchange data, and inspect its output worker.dataflow::(|scope| { @@ -41,14 +41,14 @@ fn main() { let data = data_input.to_stream(scope); let keys = keys_input.to_stream(scope); - let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); - let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); + let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::()); + let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::()); let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) - .probe_with(&mut probe); + .probe_with(&probe); }); @@ -142,7 +142,7 @@ mod container { match self { Column::Typed(t) => Column::Typed(t.clone()), Column::Bytes(b) => { - assert!(b.len() % 8 == 0); + assert_eq!(b.len() % 8, 0); let mut alloc: Vec = vec![0; b.len() / 8]; bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]); Self::Align(alloc.into()) @@ -225,7 +225,7 @@ mod container { use timely::container::PushInto; impl>> PushInto for Column { - #[inline] + #[inline(always)] fn push_into(&mut self, item: T) { use columnar::Push; match self { @@ -245,8 +245,8 @@ mod container { // Our expectation / hope is that `bytes` is `u64` aligned and sized. // If the alignment is borked, we can relocate. IF the size is borked, // not sure what we do in that case. - assert!(bytes.len() % 8 == 0); - if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) { + assert_eq!(bytes.len() % 8, 0); + if bytemuck::try_cast_slice::<_, u64>(&bytes).is_ok() { Self::Bytes(bytes) } else { @@ -298,7 +298,7 @@ mod builder { use timely::container::PushInto; impl>> PushInto for ColumnBuilder { - #[inline] + #[inline(always)] fn push_into(&mut self, item: T) { self.current.push(item); // If there is less than 10% slop with 2MB backing allocations, mint a container. @@ -306,10 +306,33 @@ mod builder { let words = Indexed::length_in_words(&self.current.borrow()); let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); if round - words < round / 10 { - let mut alloc = Vec::with_capacity(words); - Indexed::encode(&mut alloc, &self.current.borrow()); - self.pending.push_back(Column::Align(alloc.into_boxed_slice())); - self.current.clear(); + /// Move the contents from `current` to an aligned allocation, and push it to `pending`. + /// The contents must fit in `round` words (u64). + #[cold] + fn outlined_align( + current: &mut C::Container, + round: usize, + pending: &mut VecDeque>, + empty: Option>, + ) where + C: Columnar, + { + let mut alloc = if let Some(Column::Align(allocation)) = empty { + let mut alloc: Vec<_> = allocation.into(); + alloc.clear(); + if alloc.capacity() < round { + alloc.reserve(round - alloc.len()); + } + alloc + } else { + Vec::with_capacity(round) + }; + Indexed::encode(&mut alloc, ¤t.borrow()); + pending.push_back(Column::Align(alloc.into_boxed_slice())); + current.clear(); + } + + outlined_align(&mut self.current, round, &mut self.pending, self.empty.take()); } } } @@ -328,7 +351,7 @@ mod builder { impl> ContainerBuilder for ColumnBuilder { type Container = Column; - #[inline] + #[inline(always)] fn extract(&mut self) -> Option<&mut Self::Container> { if let Some(container) = self.pending.pop_front() { self.empty = Some(container); @@ -338,18 +361,26 @@ mod builder { } } - #[inline] + #[inline(always)] fn finish(&mut self) -> Option<&mut Self::Container> { if !self.current.is_empty() { use columnar::Container; let words = Indexed::length_in_words(&self.current.borrow()); - let mut alloc = Vec::with_capacity(words); + let mut alloc = if let Some(Column::Align(allocation)) = self.empty.take() { + let mut alloc: Vec<_> = allocation.into(); + alloc.clear(); + if alloc.capacity() < words { + alloc.reserve(words - alloc.len()); + } + alloc + } else { + Vec::with_capacity(words) + }; Indexed::encode(&mut alloc, &self.current.borrow()); self.pending.push_back(Column::Align(alloc.into_boxed_slice())); self.current.clear(); } - self.empty = self.pending.pop_front(); - self.empty.as_mut() + self.extract() } } @@ -361,193 +392,157 @@ use batcher::Col2KeyBatcher; /// Types for consolidating, merging, and extracting columnar update collections. pub mod batcher { - use std::collections::VecDeque; use columnar::Columnar; use timely::Container; use timely::container::{ContainerBuilder, PushInto}; use differential_dataflow::difference::Semigroup; - use crate::Column; - + use differential_dataflow::trace::implementations::merge_batcher::container::ContainerMerger; use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; + use crate::Column; + use crate::builder::ColumnBuilder; + /// A batcher for columnar storage. - pub type Col2ValBatcher = MergeBatcher, Chunker>, merger::ColumnMerger<(K,V),T,R>>; + pub type Col2ValBatcher = MergeBatcher, Chunker>, ContainerMerger>>; pub type Col2KeyBatcher = Col2ValBatcher; // First draft: build a "chunker" and a "merger". #[derive(Default)] - pub struct Chunker { - /// Buffer into which we'll consolidate. - /// - /// Also the buffer where we'll stage responses to `extract` and `finish`. - /// When these calls return, the buffer is available for reuse. - empty: C, - /// Consolidated buffers ready to go. - ready: VecDeque, + pub struct Chunker { + /// Builder to absorb sorted data. + builder: CB, } - impl ContainerBuilder for Chunker { - type Container = C; + impl ContainerBuilder for Chunker { + type Container = CB::Container; fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(ready) = self.ready.pop_front() { - self.empty = ready; - Some(&mut self.empty) - } else { - None - } + self.builder.extract() } fn finish(&mut self) -> Option<&mut Self::Container> { - self.extract() + self.builder.finish() } } - impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker + impl<'a, D, T, R, CB> PushInto<&'a mut Column<(D, T, R)>> for Chunker where D: for<'b> Columnar: Ord>, T: for<'b> Columnar: Ord>, R: for<'b> Columnar: Ord> + for<'b> Semigroup>, - C2: Container + for<'b, 'c> PushInto<(D::Ref<'b>, T::Ref<'b>, &'c R)>, + CB: ContainerBuilder + for<'b, 'c> PushInto<(D::Ref<'b>, T::Ref<'b>, &'c R)>, { fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) { - - // Scoped to let borrow through `permutation` drop. - { - // Sort input data - // TODO: consider `Vec` that we retain, containing indexes. - let mut permutation = Vec::with_capacity(container.len()); - permutation.extend(container.drain()); - permutation.sort(); - - self.empty.clear(); - // Iterate over the data, accumulating diffs for like keys. - let mut iter = permutation.drain(..); - if let Some((data, time, diff)) = iter.next() { - - let mut prev_data = data; - let mut prev_time = time; - let mut prev_diff = ::into_owned(diff); - - for (data, time, diff) in iter { - if (&prev_data, &prev_time) == (&data, &time) { - prev_diff.plus_equals(&diff); - } - else { - if !prev_diff.is_zero() { - let tuple = (prev_data, prev_time, &prev_diff); - self.empty.push_into(tuple); - } - prev_data = data; - prev_time = time; - prev_diff = ::into_owned(diff); - } + // Sort input data + // TODO: consider `Vec` that we retain, containing indexes. + let mut permutation = Vec::with_capacity(container.len()); + permutation.extend(container.drain()); + permutation.sort(); + + // Iterate over the data, accumulating diffs for like keys. + let mut iter = permutation.drain(..); + if let Some((data, time, diff)) = iter.next() { + + let mut prev_data = data; + let mut prev_time = time; + let mut prev_diff = ::into_owned(diff); + + for (data, time, diff) in iter { + if (&prev_data, &prev_time) == (&data, &time) { + prev_diff.plus_equals(&diff); } - - if !prev_diff.is_zero() { - let tuple = (prev_data, prev_time, &prev_diff); - self.empty.push_into(tuple); + else { + if !prev_diff.is_zero() { + let tuple = (prev_data, prev_time, &prev_diff); + self.builder.push_into(tuple); + } + prev_data = data; + prev_time = time; + prev_diff = ::into_owned(diff); } } - } - if !self.empty.is_empty() { - self.ready.push_back(std::mem::take(&mut self.empty)); + if !prev_diff.is_zero() { + let tuple = (prev_data, prev_time, &prev_diff); + self.builder.push_into(tuple); + } } } } /// Implementations of `ContainerQueue` and `MergerChunk` for `Column` containers (columnar). pub mod merger { - - use timely::progress::{Antichain, frontier::AntichainRef}; - use columnar::Columnar; - + use timely::progress::{Antichain, frontier::AntichainRef, Timestamp}; + use columnar::{Columnar, Index}; + use crate::builder::ColumnBuilder; use crate::container::Column; use differential_dataflow::difference::Semigroup; - use differential_dataflow::trace::implementations::merge_batcher::container::{ContainerQueue, MergerChunk}; - use differential_dataflow::trace::implementations::merge_batcher::container::ContainerMerger; - - /// A `Merger` implementation backed by `Column` containers (Columnar). - pub type ColumnMerger = ContainerMerger,ColumnQueue<(D, T, R)>>; + use differential_dataflow::trace::implementations::merge_batcher::container::{ContainerQueue, MergerChunk, PushAndAdd}; - - /// TODO - pub struct ColumnQueue { - list: Column, + /// A queue for extracting items from a `Column` container. + pub struct ColumnQueue<'a, T: Columnar> { + list: >::Borrowed<'a>, head: usize, } - impl ContainerQueue> for ColumnQueue<(D, T, R)> + impl<'a, D, T, R> ContainerQueue<'a, Column<(D, T, R)>> for ColumnQueue<'a, (D, T, R)> where - D: for<'a> Columnar: Ord>, - T: for<'a> Columnar: Ord>, + D: for<'b> Columnar: Ord>, + T: for<'b> Columnar: Ord>, R: Columnar, { - fn next_or_alloc(&mut self) -> Result<<(D, T, R) as Columnar>::Ref<'_>, Column<(D, T, R)>> { - if self.is_empty() { - Err(std::mem::take(&mut self.list)) - } - else { - Ok(self.pop()) - } + type Item = <(D, T, R) as Columnar>::Ref<'a>; + type SelfGAT<'b> = ColumnQueue<'b, (D, T, R)>; + fn pop(&mut self) -> Self::Item { + self.head += 1; + self.list.get(self.head - 1) } - fn is_empty(&self) -> bool { - use timely::Container; + fn is_empty(&mut self) -> bool { + use columnar::Len; self.head == self.list.len() } - fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { + fn cmp_heads<'b>(&mut self, other: &mut ColumnQueue<'b, (D, T, R)>) -> std::cmp::Ordering { let (data1, time1, _) = self.peek(); let (data2, time2, _) = other.peek(); + let data1 = D::reborrow(data1); + let time1 = T::reborrow(time1); + let data2 = D::reborrow(data2); + let time2 = T::reborrow(time2); + (data1, time1).cmp(&(data2, time2)) } - fn from(list: Column<(D, T, R)>) -> Self { - ColumnQueue { list, head: 0 } + fn new(list: &'a mut Column<(D, T, R)>) -> ColumnQueue<'a, (D, T, R)>{ + ColumnQueue { list: list.borrow(), head: 0 } } } - impl ColumnQueue { - fn pop(&mut self) -> T::Ref<'_> { - self.head += 1; - self.list.get(self.head - 1) - } - - fn peek(&self) -> T::Ref<'_> { + impl<'a, T: Columnar> ColumnQueue<'a, T> { + fn peek(&self) -> T::Ref<'a> { self.list.get(self.head) } } impl MergerChunk for Column<(D, T, R)> where - D: Columnar + 'static, - T: timely::PartialOrder + Clone + Columnar + 'static, - R: Default + Semigroup + Columnar + 'static + D: for<'a> Columnar: Ord>, + T: for<'a> Columnar: Ord> + Timestamp, + for<'a> ::Ref<'a> : Copy, + R: Default + Semigroup + Columnar, { + type ContainerQueue<'a> = ColumnQueue<'a, (D, T, R)>; type TimeOwned = T; - type DiffOwned = R; - fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { - let time = T::into_owned(*time); - if upper.less_equal(&time) { - frontier.insert(time); + fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain, stash: &mut T) -> bool { + stash.copy_from(*time); + if upper.less_equal(stash) { + frontier.insert_ref(stash); true } else { false } } - fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) { - let (data, time, diff1) = item1; - let (_data, _time, diff2) = item2; - stash.copy_from(diff1); - let stash2: R = R::into_owned(diff2); - stash.plus_equals(&stash2); - if !stash.is_zero() { - use timely::Container; - self.push((data, time, &*stash)); - } - } fn account(&self) -> (usize, usize, usize, usize) { (0, 0, 0, 0) // unimplemented!() @@ -562,6 +557,26 @@ pub mod batcher { // (self.len(), size, capacity, allocations) } } + + impl PushAndAdd for ColumnBuilder<(D, T, R)> + where + D: Columnar, + T: timely::PartialOrder + Columnar, + R: Default + Semigroup + Columnar, + { + type Item<'a> = <(D, T, R) as Columnar>::Ref<'a>; + type DiffOwned = R; + + fn push_and_add(&mut self, (data, time, diff1): Self::Item<'_>, (_, _, diff2): Self::Item<'_>, stash: &mut Self::DiffOwned) { + stash.copy_from(diff1); + let stash2: R = R::into_owned(diff2); + stash.plus_equals(&stash2); + if !stash.is_zero() { + use timely::container::PushInto; + self.push_into((data, time, &*stash)); + } + } + } } } @@ -591,6 +606,17 @@ pub mod dd_builder { pub type ColValBuilder = RcBuilder>>; pub type ColKeyBuilder = RcBuilder>>; + // Utility types to save some typing and avoid visual noise. Extract the owned type and + // the batch's read items from a layout. + type OwnedKey = <::KeyContainer as BatchContainer>::Owned; + type ReadItemKey<'a, L> = <::KeyContainer as BatchContainer>::ReadItem<'a>; + type OwnedVal = <::ValContainer as BatchContainer>::Owned; + type ReadItemVal<'a, L> = <::ValContainer as BatchContainer>::ReadItem<'a>; + type OwnedTime = <::TimeContainer as BatchContainer>::Owned; + type ReadItemTime<'a, L> = <::TimeContainer as BatchContainer>::ReadItem<'a>; + type OwnedDiff = <::DiffContainer as BatchContainer>::Owned; + type ReadItemDiff<'a, L> = <::DiffContainer as BatchContainer>::ReadItem<'a>; + /// A builder for creating layers from unsorted update tuples. pub struct OrdValBuilder { /// The in-progress result. @@ -619,8 +645,8 @@ pub mod dd_builder { /// to recover the singleton to push it into `updates` to join the second update. fn push_update(&mut self, time: ::Time, diff: ::Diff) { // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it. - if self.result.times.last().map(|t| t == <::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) && - self.result.diffs.last().map(|d| d == <::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true) + if self.result.times.last().map(|t| t == ReadItemTime::::borrow_as(&time)) == Some(true) && + self.result.diffs.last().map(|d| d == ReadItemDiff::::borrow_as(&diff)) == Some(true) { assert!(self.singleton.is_none()); self.singleton = Some((time, diff)); @@ -641,17 +667,17 @@ pub mod dd_builder { impl Builder for OrdValBuilder where L: Layout, - ::Owned: Columnar, - ::Owned: Columnar, - ::Owned: Columnar, - ::Owned: Columnar, + OwnedKey: Columnar, + OwnedVal: Columnar, + OwnedTime: Columnar, + OwnedDiff: Columnar, // These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`. - for<'a> L::KeyContainer: PushInto<&'a ::Owned>, - for<'a> L::ValContainer: PushInto<&'a ::Owned>, - for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, - for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, + for<'a> L::KeyContainer: PushInto<&'a OwnedKey>, + for<'a> L::ValContainer: PushInto<&'a OwnedVal>, + for<'a> ReadItemTime<'a, L> : IntoOwned<'a, Owned = ::Time>, + for<'a> ReadItemDiff<'a, L> : IntoOwned<'a, Owned = ::Diff>, { - type Input = Column<((::Owned,::Owned),::Owned,::Owned)>; + type Input = Column<((OwnedKey,OwnedVal),OwnedTime,OwnedDiff)>; type Time = ::Time; type Output = OrdValBatch; @@ -681,25 +707,37 @@ pub mod dd_builder { // Owned key and val would need to be members of `self`, as this method can be called multiple times, // and we need to correctly cache last for reasons of correctness, not just performance. + let mut owned_key = None; + let mut owned_val = None; + for ((key,val),time,diff) in chunk.drain() { - // It would be great to avoid. - let key = <::Owned as Columnar>::into_owned(key); - let val = <::Owned as Columnar>::into_owned(val); - // These feel fine (wrt the other versions) - let time = <::Owned as Columnar>::into_owned(time); - let diff = <::Owned as Columnar>::into_owned(diff); + let key = if let Some(owned_key) = owned_key.as_mut() { + OwnedKey::::copy_from(owned_key, key); + owned_key + } else { + owned_key.insert(OwnedKey::::into_owned(key)) + }; + let val = if let Some(owned_val) = owned_val.as_mut() { + OwnedVal::::copy_from(owned_val, val); + owned_val + } else { + owned_val.insert(OwnedVal::::into_owned(val)) + }; + + let time = OwnedTime::::into_owned(time); + let diff = OwnedDiff::::into_owned(diff); // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| <::ReadItem<'_> as IntoOwned>::borrow_as(&key).eq(&k)).unwrap_or(false) { + if self.result.keys.last().map(|k| ReadItemKey::::borrow_as(key).eq(&k)).unwrap_or(false) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| <::ReadItem<'_> as IntoOwned>::borrow_as(&val).eq(&v)).unwrap_or(false) { + if self.result.vals.last().map(|v| ReadItemVal::::borrow_as(val).eq(&v)).unwrap_or(false) { self.push_update(time, diff); } else { // New value; complete representation of prior value. self.result.vals_offs.push(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); - self.result.vals.push(&val); + self.result.vals.push(val); } } else { // New key; complete representation of prior key. @@ -707,8 +745,8 @@ pub mod dd_builder { if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(&val); - self.result.keys.push(&key); + self.result.vals.push(val); + self.result.keys.push(key); } } } @@ -767,8 +805,8 @@ pub mod dd_builder { /// to recover the singleton to push it into `updates` to join the second update. fn push_update(&mut self, time: ::Time, diff: ::Diff) { // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it. - if self.result.times.last().map(|t| t == <::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) && - self.result.diffs.last().map(|d| d == <::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true) + if self.result.times.last().map(|t| t == ReadItemTime::::borrow_as(&time)) == Some(true) && + self.result.diffs.last().map(|d| d == ReadItemDiff::::borrow_as(&diff)) == Some(true) { assert!(self.singleton.is_none()); self.singleton = Some((time, diff)); @@ -789,17 +827,17 @@ pub mod dd_builder { impl Builder for OrdKeyBuilder where L: Layout, - ::Owned: Columnar, - ::Owned: Columnar, - ::Owned: Columnar, - ::Owned: Columnar, + OwnedKey: Columnar, + OwnedVal: Columnar, + OwnedTime: Columnar, + OwnedDiff: Columnar, // These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`. - for<'a> L::KeyContainer: PushInto<&'a ::Owned>, - for<'a> L::ValContainer: PushInto<&'a ::Owned>, - for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, - for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, + for<'a> L::KeyContainer: PushInto<&'a OwnedKey>, + for<'a> L::ValContainer: PushInto<&'a OwnedVal>, + for<'a> ReadItemTime<'a, L> : IntoOwned<'a, Owned = ::Time>, + for<'a> ReadItemDiff<'a, L> : IntoOwned<'a, Owned = ::Diff>, { - type Input = Column<((::Owned,::Owned),::Owned,::Owned)>; + type Input = Column<((OwnedKey,OwnedVal),OwnedTime,OwnedDiff)>; type Time = ::Time; type Output = OrdKeyBatch; @@ -827,22 +865,28 @@ pub mod dd_builder { // Owned key and val would need to be members of `self`, as this method can be called multiple times, // and we need to correctly cache last for reasons of correctness, not just performance. + let mut owned_key = None; + for ((key,_val),time,diff) in chunk.drain() { - // It would be great to avoid. - let key = <::Owned as Columnar>::into_owned(key); - // These feel fine (wrt the other versions) - let time = <::Owned as Columnar>::into_owned(time); - let diff = <::Owned as Columnar>::into_owned(diff); + let key = if let Some(owned_key) = owned_key.as_mut() { + OwnedKey::::copy_from(owned_key, key); + owned_key + } else { + owned_key.insert(OwnedKey::::into_owned(key)) + }; + + let time = OwnedTime::::into_owned(time); + let diff = OwnedDiff::::into_owned(diff); // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| <::ReadItem<'_> as IntoOwned>::borrow_as(&key).eq(&k)).unwrap_or(false) { + if self.result.keys.last().map(|k| ReadItemKey::::borrow_as(key).eq(&k)).unwrap_or(false) { self.push_update(time, diff); } else { // New key; complete representation of prior key. self.result.keys_offs.push(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); - self.result.keys.push(&key); + self.result.keys.push(key); } } } diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 47c26e3c7..b7f0312b3 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -50,7 +50,7 @@ pub struct MergeBatcher { impl Batcher for MergeBatcher where C: ContainerBuilder + Default + for<'a> PushInto<&'a mut Input>, - M: Merger, + M: Merger, { type Input = Input; type Time = M::Time; @@ -74,10 +74,12 @@ where /// needed. fn push_container(&mut self, container: &mut Input) { self.chunker.push_into(container); - while let Some(chunk) = self.chunker.extract() { + let mut chain = Vec::new(); + while let Some(chunk) = self.chunker.finish() { let chunk = std::mem::take(chunk); - self.insert_chain(vec![chunk]); + chain.push(chunk); } + self.insert_chain(chain); } // Sealing a batch means finding those updates with times not greater or equal to any time @@ -120,7 +122,7 @@ where } /// The frontier of elements remaining after the most recent call to `self.seal`. - #[inline] + #[inline(always)] fn frontier(&mut self) -> AntichainRef { self.frontier.borrow() } @@ -202,7 +204,7 @@ pub trait Merger: Default { /// The internal representation of chunks of data. type Chunk: Container; /// The type of time in frontiers to extract updates. - type Time; + type Time: Timestamp; /// Merge chains into an output chain. fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec); /// Extract ready updates based on the `upper` frontier. @@ -226,59 +228,57 @@ pub mod container { //! A general purpose `Merger` implementation for arbitrary containers. //! - //! The implementation requires implementations of two traits, `ContainerQueue` and `MergerChunk`. + //! The implementation requires implementations of three traits, `ContainerQueue`, `PushAndAdd` and `MergerChunk`. //! The `ContainerQueue` trait is meant to wrap a container and provide iterable access to it, as //! well as the ability to return the container when iteration is complete. + //! The `PushAndAdd` trait is meant to provide a way to push two items together, if their summed + //! diff is non-zero. //! The `MergerChunk` trait is meant to be implemented by containers, and it explains how container //! items should be interpreted with respect to times, and with respect to differences. //! These two traits exist instead of a stack of constraints on the structure of the associated items //! of the containers, allowing them to perform their functions without destructuring their guts. //! - //! Standard implementations exist in the `vec`, `columnation`, and `flat_container` modules. + //! Standard implementations exist in the `vec`, `columnation` modules. use std::cmp::Ordering; - use std::marker::PhantomData; - use timely::{Container, container::{PushInto, SizableContainer}}; + + use timely::Container; use timely::progress::frontier::{Antichain, AntichainRef}; use timely::{Data, PartialOrder}; - + use timely::container::{ContainerBuilder, PushInto}; + use timely::progress::Timestamp; use crate::trace::implementations::merge_batcher::Merger; /// An abstraction for a container that can be iterated over, and conclude by returning itself. - pub trait ContainerQueue { - /// Returns either the next item in the container, or the container itself. - fn next_or_alloc(&mut self) -> Result, C>; - /// Indicates whether `next_or_alloc` will return `Ok`, and whether `peek` will return `Some`. - fn is_empty(&self) -> bool; + pub trait ContainerQueue<'a, C> { + /// Items exposed by this queue. + type Item; + /// The same type as `Self`, but with a different lifetime. + type SelfGAT<'b>: ContainerQueue<'b, C>; + /// Returns the next item in the container. Might panic if the container is empty. + fn pop(&mut self) -> Self::Item; + /// Indicates whether `pop` will succeed. + fn is_empty(&mut self) -> bool; /// Compare the heads of two queues, where empty queues come last. - fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering; + fn cmp_heads(&mut self, other: &mut Self::SelfGAT<'_>) -> Ordering; /// Create a new queue from an existing container. - fn from(container: C) -> Self; + fn new(container: &'a mut C) -> Self; } - /// Behavior to dissect items of chunks in the merge batcher - pub trait MergerChunk : SizableContainer { + /// Behavior to observe the time of an item and account for a container's size. + pub trait MergerChunk : Container { + /// The queue type that can be used to iterate over the container. + type ContainerQueue<'a>: ContainerQueue<'a, Self>; /// An owned time type. /// /// This type is provided so that users can maintain antichains of something, in order to track /// the forward movement of time and extract intervals from chains of updates. - type TimeOwned; - /// The owned diff type. - /// - /// This type is provided so that users can provide an owned instance to the `push_and_add` method, - /// to act as a scratch space when the type is substantial and could otherwise require allocations. - type DiffOwned: Default; + type TimeOwned: Timestamp; /// Relates a borrowed time to antichains of owned times. /// /// If `upper` is less or equal to `time`, the method returns `true` and ensures that `frontier` reflects `time`. - fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool; - - /// Push an entry that adds together two diffs. - /// - /// This is only called when two items are deemed mergeable by the container queue. - /// If the two diffs added together is zero do not push anything. - fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned); + fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain, stash: &mut Self::TimeOwned) -> bool; /// Account the allocations behind the chunk. // TODO: Find a more universal home for this: `Container`? @@ -288,122 +288,140 @@ pub mod container { } } + /// Push and add two items together, if their summed diff is non-zero. + pub trait PushAndAdd { + /// The item type that can be pushed into the container. + type Item<'a>; + /// The owned diff type. + /// + /// This type is provided so that users can provide an owned instance to the `push_and_add` method, + /// to act as a scratch space when the type is substantial and could otherwise require allocations. + type DiffOwned: Default; + + /// Push an entry that adds together two diffs. + /// + /// This is only called when two items are deemed mergeable by the container queue. + /// If the two diffs added together is zero do not push anything. + fn push_and_add(&mut self, item1: Self::Item<'_>, item2: Self::Item<'_>, stash: &mut Self::DiffOwned); + } + /// A merger for arbitrary containers. /// /// `MC` is a [`Container`] that implements [`MergerChunk`]. /// `CQ` is a [`ContainerQueue`] supporting `MC`. - pub struct ContainerMerger { - _marker: PhantomData<(MC, CQ)>, + #[derive(Default, Debug)] + pub struct ContainerMerger { + builder: MCB, } - impl Default for ContainerMerger { - fn default() -> Self { - Self { _marker: PhantomData, } + impl ContainerMerger { + /// Helper to extract chunks from the builder and push them into the output. + #[inline(always)] + fn extract_chunks(builder: &mut MCB, output: &mut Vec, stash: &mut Vec) { + while let Some(chunk) = builder.extract() { + let chunk = std::mem::replace(chunk, stash.pop().unwrap_or_default()); + output.push_into(chunk); + } } - } - impl ContainerMerger { - /// Helper to get pre-sized vector from the stash. + /// Helper to finish the builder and push the chunks into the output. #[inline] - fn empty(&self, stash: &mut Vec) -> MC { - stash.pop().unwrap_or_else(|| { - let mut container = MC::default(); - container.ensure_capacity(&mut None); - container - }) - } - /// Helper to return a chunk to the stash. - #[inline] - fn recycle(&self, mut chunk: MC, stash: &mut Vec) { - // TODO: Should we only retain correctly sized containers? - chunk.clear(); - stash.push(chunk); + fn finish_chunks(builder: &mut MCB, output: &mut Vec, stash: &mut Vec) { + while let Some(chunk) = builder.finish() { + let chunk = std::mem::replace(chunk, stash.pop().unwrap_or_default()); + output.push_into(chunk); + } } } - impl Merger for ContainerMerger + /// The container queue for a merger chunk builder. + type CQ<'a, MCB> = <::Container as MergerChunk>::ContainerQueue<'a>; + /// The container queue's item for a merger chunk builder. + type CQI<'a, MCB> = as ContainerQueue<'a, ::Container>>::Item; + + /// The core of the algorithm, implementing the `Merger` trait for `ContainerMerger`. + /// + /// The type bounds look intimidating, but they are not too complex: + /// * `MCB` must be a container builder that can absorb its own container's items, + /// and the items produced by the merger chunk's container queue. + /// * The `MCB::Container` must implement `MergerChunk`, which defines the container queue. + /// * The container queue has a `SelfGAT` type that is a lifetime-dependent version of itself, + /// which allows it to be used in the `cmp_heads` function. + impl Merger for ContainerMerger where - for<'a> MC: MergerChunk + Clone + PushInto<::Item<'a>> + 'static, - CQ: ContainerQueue, + MCB: ContainerBuilder + + for<'a> PushInto<::Item<'a>> + + for<'a> PushInto> + + for<'a> PushAndAdd=CQI<'a, MCB>>, + for<'a, 'b> MCB::Container: MergerChunk, + for<'a, 'b> CQ<'a, MCB>: ContainerQueue<'a, MCB::Container, SelfGAT<'b>=CQ<'b, MCB>>, { - type Time = MC::TimeOwned; - type Chunk = MC; + type Time = ::TimeOwned; + type Chunk = MCB::Container; - // TODO: Consider integrating with `ConsolidateLayout`. fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); - let mut head1 = CQ::from(list1.next().unwrap_or_default()); - let mut head2 = CQ::from(list2.next().unwrap_or_default()); - - let mut result = self.empty(stash); + let mut head1 = list1.next().unwrap_or_default(); + let mut borrow1 = CQ::::new(&mut head1); + let mut head2 = list2.next().unwrap_or_default(); + let mut borrow2 = CQ::::new(&mut head2); let mut diff_owned = Default::default(); // while we have valid data in each input, merge. - while !head1.is_empty() && !head2.is_empty() { - while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() { - let cmp = head1.cmp_heads(&head2); + while !borrow1.is_empty() && !borrow2.is_empty() { + while !borrow1.is_empty() && !borrow2.is_empty() { + let cmp = borrow1.cmp_heads(&mut borrow2); // TODO: The following less/greater branches could plausibly be a good moment for // `copy_range`, on account of runs of records that might benefit more from a // `memcpy`. match cmp { Ordering::Less => { - result.push_into(head1.next_or_alloc().ok().unwrap()); + self.builder.push_into(borrow1.pop()); } Ordering::Greater => { - result.push_into(head2.next_or_alloc().ok().unwrap()); + self.builder.push_into(borrow2.pop()); } Ordering::Equal => { - let item1 = head1.next_or_alloc().ok().unwrap(); - let item2 = head2.next_or_alloc().ok().unwrap(); - result.push_and_add(item1, item2, &mut diff_owned); + let item1 = borrow1.pop(); + let item2 = borrow2.pop(); + self.builder.push_and_add(item1, item2, &mut diff_owned); } } } - if result.at_capacity() { - output.push_into(result); - result = self.empty(stash); - } + Self::extract_chunks(&mut self.builder, output, stash); - if head1.is_empty() { - self.recycle(head1.next_or_alloc().err().unwrap(), stash); - head1 = CQ::from(list1.next().unwrap_or_default()); + if borrow1.is_empty() { + drop(borrow1); + let chunk = head1; + stash.push(chunk); + head1 = list1.next().unwrap_or_default(); + borrow1 = CQ::::new(&mut head1); } - if head2.is_empty() { - self.recycle(head2.next_or_alloc().err().unwrap(), stash); - head2 = CQ::from(list2.next().unwrap_or_default()); + if borrow2.is_empty() { + drop(borrow2); + let chunk = head2; + stash.push(chunk); + head2 = list2.next().unwrap_or_default(); + borrow2 = CQ::::new(&mut head2); } } - // TODO: recycle `head1` rather than discarding. - while let Ok(next) = head1.next_or_alloc() { - result.push_into(next); - if result.at_capacity() { - output.push_into(result); - result = self.empty(stash); - } - } - if !result.is_empty() { - output.push_into(result); - result = self.empty(stash); + while !borrow1.is_empty() { + self.builder.push_into(borrow1.pop()); + Self::extract_chunks(&mut self.builder, output, stash); } + Self::finish_chunks(&mut self.builder, output, stash); output.extend(list1); - // TODO: recycle `head2` rather than discarding. - while let Ok(next) = head2.next_or_alloc() { - result.push_into(next); - if result.at_capacity() { - output.push(result); - result = self.empty(stash); - } - } - if !result.is_empty() { - output.push_into(result); - // result = self.empty(stash); + while !borrow2.is_empty() { + self.builder.push_into(borrow2.pop()); + Self::extract_chunks(&mut self.builder, output, stash); } + Self::finish_chunks(&mut self.builder, output, stash); output.extend(list2); } @@ -416,35 +434,26 @@ pub mod container { kept: &mut Vec, stash: &mut Vec, ) { - let mut keep = self.empty(stash); - let mut ready = self.empty(stash); + let mut keep = MCB::default(); + + let mut time_stash = Self::Time::minimum(); for mut buffer in merged { for item in buffer.drain() { - if MC::time_kept(&item, &upper, frontier) { - if keep.at_capacity() && !keep.is_empty() { - kept.push(keep); - keep = self.empty(stash); - } + if ::time_kept(&item, &upper, frontier, &mut time_stash) { keep.push_into(item); + Self::extract_chunks(&mut keep, kept, stash); } else { - if ready.at_capacity() && !ready.is_empty() { - readied.push(ready); - ready = self.empty(stash); - } - ready.push_into(item); + self.builder.push_into(item); + Self::extract_chunks(&mut self.builder, readied, stash); } } // Recycling buffer. - self.recycle(buffer, stash); - } - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } - if !ready.is_empty() { - readied.push(ready); + stash.push(buffer); } + // Finish the kept and readied data. + Self::finish_chunks(&mut keep, kept, stash); + Self::finish_chunks(&mut self.builder, readied, stash); } /// Account the allocations behind the chunk. @@ -456,56 +465,61 @@ pub mod container { pub use vec::VecMerger; /// Implementations of `ContainerQueue` and `MergerChunk` for `Vec` containers. pub mod vec { - - use std::collections::VecDeque; - use timely::progress::{Antichain, frontier::AntichainRef}; + use timely::container::{CapacityContainerBuilder, PushInto}; + use timely::progress::{Antichain, frontier::AntichainRef, Timestamp}; use crate::difference::Semigroup; - use super::{ContainerQueue, MergerChunk}; + use super::{ContainerQueue, MergerChunk, PushAndAdd}; /// A `Merger` implementation backed by vector containers. - pub type VecMerger = super::ContainerMerger, std::collections::VecDeque<(D, T, R)>>; + pub type VecMerger = super::ContainerMerger>>; - impl ContainerQueue> for VecDeque<(D, T, R)> { - fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> { - if self.is_empty() { - Err(Vec::from(std::mem::take(self))) - } - else { - Ok(self.pop_front().unwrap()) + impl PushAndAdd for CapacityContainerBuilder> { + type Item<'a> = (D, T, R); + type DiffOwned = (); + + fn push_and_add<'a>(&mut self, (data, time, mut diff1): (D, T, R), (_, _, diff2): (D, T, R), _stash: &mut Self::DiffOwned) { + diff1.plus_equals(&diff2); + if !diff1.is_zero() { + self.push_into((data, time, diff1)); } } - fn is_empty(&self) -> bool { - self.is_empty() + } + + impl<'a, D, T, R> ContainerQueue<'a, Vec<(D, T, R)>> for std::iter::Peekable> + where + D: Ord + 'static, + T: Ord + 'static, + R: 'static + { + type Item = (D, T, R); + type SelfGAT<'b> = std::iter::Peekable>; + fn pop(&mut self) -> Self::Item { + self.next().expect("ContainerQueue: pop called on empty queue") + } + fn is_empty(&mut self) -> bool { + self.peek().is_none() } - fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { - let (data1, time1, _) = self.front().unwrap(); - let (data2, time2, _) = other.front().unwrap(); + fn cmp_heads(&mut self, other: &mut Self::SelfGAT<'_>) -> std::cmp::Ordering { + let (data1, time1, _) = self.peek().unwrap(); + let (data2, time2, _) = other.peek().unwrap(); (data1, time1).cmp(&(data2, time2)) } - fn from(list: Vec<(D, T, R)>) -> Self { - >::from(list) + fn new(list: &'a mut Vec<(D, T, R)>) -> Self { + list.drain(..).peekable() } } - impl MergerChunk for Vec<(D, T, R)> { + impl MergerChunk for Vec<(D, T, R)> { + type ContainerQueue<'a> = std::iter::Peekable>; type TimeOwned = T; - type DiffOwned = (); - fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { + fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain, _stash: &mut T) -> bool { if upper.less_equal(time) { - frontier.insert_with(&time, |time| time.clone()); + frontier.insert_with(time, |time| time.clone()); true } else { false } } - fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) { - let (data, time, mut diff1) = item1; - let (_data, _time, diff2) = item2; - diff1.plus_equals(&diff2); - if !diff1.is_zero() { - self.push((data, time, diff1)); - } - } fn account(&self) -> (usize, usize, usize, usize) { let (size, capacity, allocations) = (0, 0, 0); (self.len(), size, capacity, allocations) @@ -516,77 +530,47 @@ pub mod container { pub use columnation::ColMerger; /// Implementations of `ContainerQueue` and `MergerChunk` for `TimelyStack` containers (columnation). pub mod columnation { - - use timely::progress::{Antichain, frontier::AntichainRef}; + use std::collections::VecDeque; + use timely::progress::{Antichain, frontier::AntichainRef, Timestamp}; use columnation::Columnation; - + use timely::container::{ContainerBuilder, PushInto, SizableContainer}; use crate::containers::TimelyStack; use crate::difference::Semigroup; - - use super::{ContainerQueue, MergerChunk}; + use super::{ContainerQueue, MergerChunk, PushAndAdd}; /// A `Merger` implementation backed by `TimelyStack` containers (columnation). - pub type ColMerger = super::ContainerMerger,TimelyStackQueue<(D, T, R)>>; + pub type ColMerger = super::ContainerMerger>; - /// TODO - pub struct TimelyStackQueue { - list: TimelyStack, - head: usize, - } + impl<'a, D: Ord + Columnation + 'static, T: Ord + Columnation + 'static, R: Columnation + 'static> ContainerQueue<'a, TimelyStack<(D, T, R)>> for std::iter::Peekable> { + type Item = &'a (D, T, R); + type SelfGAT<'b> = std::iter::Peekable>; - impl ContainerQueue> for TimelyStackQueue<(D, T, R)> { - fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> { - if self.is_empty() { - Err(std::mem::take(&mut self.list)) - } - else { - Ok(self.pop()) - } + fn pop(&mut self) -> Self::Item { + self.next().expect("ContainerQueue: pop called on empty queue") } - fn is_empty(&self) -> bool { - self.head == self.list[..].len() + fn is_empty(&mut self) -> bool { + self.peek().is_none() } - fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { - let (data1, time1, _) = self.peek(); - let (data2, time2, _) = other.peek(); + fn cmp_heads(&mut self, other: &mut Self::SelfGAT<'_>) -> std::cmp::Ordering { + let (data1, time1, _) = self.peek().unwrap(); + let (data2, time2, _) = other.peek().unwrap(); (data1, time1).cmp(&(data2, time2)) } - fn from(list: TimelyStack<(D, T, R)>) -> Self { - TimelyStackQueue { list, head: 0 } - } - } - - impl TimelyStackQueue { - fn pop(&mut self) -> &T { - self.head += 1; - &self.list[self.head - 1] - } - - fn peek(&self) -> &T { - &self.list[self.head] + fn new(list: &'a mut TimelyStack<(D, T, R)>) -> Self { + list.iter().peekable() } } - impl MergerChunk for TimelyStack<(D, T, R)> { + impl MergerChunk for TimelyStack<(D, T, R)> { + type ContainerQueue<'a> = std::iter::Peekable>; type TimeOwned = T; - type DiffOwned = R; - - fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { + fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain, _stash: &mut T) -> bool { if upper.less_equal(time) { - frontier.insert_with(&time, |time| time.clone()); + frontier.insert_with(time, |time| time.clone()); true } else { false } } - fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) { - let (data, time, diff1) = item1; - let (_data, _time, diff2) = item2; - stash.clone_from(diff1); - stash.plus_equals(&diff2); - if !stash.is_zero() { - self.copy_destructured(data, time, stash); - } - } fn account(&self) -> (usize, usize, usize, usize) { let (mut size, mut capacity, mut allocations) = (0, 0, 0); let cb = |siz, cap| { @@ -598,5 +582,101 @@ pub mod container { (self.len(), size, capacity, allocations) } } + + /// A capacity container builder for TimelyStack where we have access to the internals. + /// We need to have a different `push_into` implementation that can call `copy_destructured` + /// to avoid unnecessary allocations. + /// + /// A container builder that uses length and preferred capacity to chunk data for [`TimelyStack`]. + /// + /// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not + /// across [`Self::finish`] to maintain a low memory footprint. + /// + /// Maintains FIFO order. + #[derive(Debug)] + pub struct TimelyStackBuilder{ + /// Container that we're writing to. + pub(crate) current: TimelyStack, + /// Empty allocation. + pub(crate) empty: Option>, + /// Completed containers pending to be sent. + pub(crate) pending: VecDeque>, + } + + impl Default for TimelyStackBuilder { + fn default() -> Self { + Self { + current: TimelyStack::default(), + empty: None, + pending: VecDeque::new(), + } + } + } + + impl PushInto<&T> for TimelyStackBuilder { + #[inline(always)] + fn push_into(&mut self, item: &T) { + // Ensure capacity + self.current.ensure_capacity(&mut self.empty); + + // Push item + self.current.copy(item); + + // Maybe flush + if self.current.at_capacity() { + self.pending.push_back(std::mem::take(&mut self.current)); + } + } + } + + impl ContainerBuilder for TimelyStackBuilder { + type Container = TimelyStack; + + #[inline] + fn extract(&mut self) -> Option<&mut TimelyStack> { + if let Some(container) = self.pending.pop_front() { + self.empty = Some(container); + self.empty.as_mut() + } else { + None + } + } + + #[inline] + fn finish(&mut self) -> Option<&mut TimelyStack> { + if !self.current.is_empty() { + self.pending.push_back(std::mem::take(&mut self.current)); + } + self.empty = self.pending.pop_front(); + self.empty.as_mut() + } + } + + impl PushAndAdd for TimelyStackBuilder<(D, T, R)> + where + D: Columnation + 'static, + T: Columnation + 'static, + R: Columnation + Default + Semigroup + 'static, + { + type Item<'a> = &'a (D, T, R); + type DiffOwned = R; + + fn push_and_add(&mut self, (data, time, diff1): &(D, T, R), (_, _, diff2): &(D, T, R), stash: &mut Self::DiffOwned) { + stash.clone_from(diff1); + stash.plus_equals(diff2); + if !stash.is_zero() { + // Ensure capacity + self.current.ensure_capacity(&mut self.empty); + + // Push item + self.current.copy_destructured(data, time, stash); + + // Maybe flush + if self.current.at_capacity() { + self.pending.push_back(std::mem::take(&mut self.current)); + } + } + } + } } }