From 9505a432ab91fe5b6d632af931a4375c251f74cf Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Sun, 27 Aug 2023 14:50:00 +0200 Subject: [PATCH 1/2] Efficient merging with empty batches This commit makes merging of batches with empty batches efficient. Previously this operation would be handled like a regular merge, by allocating a new batch and merging the input batches into it. However, if one of the batches is empty, there is no need to allocate a new batch, we can simply extend the bounds of the existing non-empty batch accordingly. The implementation of this optimization is complicated by the fact that there are `Batch` implementations for `Rc` and `Abomonated`. Both of these only provide an immutable reference to the underlying batch, making it impossible to adjust the batch description to extend the batch's bounds. This commit solves this by introducing new `RcBatch` and `AbomonatedBatch` wrappers that, in addition to the `Rc`/`Abomonated` batch, hold their own `Description`s that override the inner batch's description. --- src/trace/implementations/abomonated.rs | 294 +++++++++++++++++++++ src/trace/implementations/mod.rs | 2 + src/trace/implementations/ord.rs | 62 +++-- src/trace/implementations/rc.rs | 301 ++++++++++++++++++++++ src/trace/implementations/spine_fueled.rs | 12 +- src/trace/mod.rs | 256 +----------------- tests/trace.rs | 7 +- 7 files changed, 666 insertions(+), 268 deletions(-) create mode 100644 src/trace/implementations/abomonated.rs create mode 100644 src/trace/implementations/rc.rs diff --git a/src/trace/implementations/abomonated.rs b/src/trace/implementations/abomonated.rs new file mode 100644 index 000000000..f8f90ebba --- /dev/null +++ b/src/trace/implementations/abomonated.rs @@ -0,0 +1,294 @@ +//! Types for abomonated batch. + +use std::ops::Deref; + +use abomonation::abomonated::Abomonated; +use abomonation::{measure, Abomonation}; +use timely::communication::message::RefOrMut; +use timely::progress::frontier::AntichainRef; +use timely::progress::{Antichain, Timestamp}; +use timely::PartialOrder; + +use crate::trace::{Batch, BatchReader, Batcher, Builder, Cursor, Description, Merger}; + +/// A batch implementation that wraps underlying batches in `Abomonated`. +/// +/// Keeps a description separate from that of the wrapped batch, to enable efficient merging with +/// empty batches by extending the reported lower/upper bounds. +pub struct AbomonatedBatch { + inner: Abomonated>, + desc: Description, +} + +impl AbomonatedBatch +where + B: BatchReader + Abomonation, + B::Time: Timestamp, +{ + fn new(inner: B) -> Self { + let mut bytes = Vec::with_capacity(measure(&inner)); + unsafe { abomonation::encode(&inner, &mut bytes).unwrap() }; + let inner = unsafe { Abomonated::::new(bytes).unwrap() }; + inner.into() + } +} + +impl Deref for AbomonatedBatch +where + B: BatchReader, +{ + type Target = B; + + fn deref(&self) -> &B { + &self.inner + } +} + +impl From>> for AbomonatedBatch +where + B: BatchReader + Abomonation, + B::Time: Timestamp, +{ + fn from(inner: Abomonated>) -> Self { + let desc = inner.description().clone(); + Self { inner, desc } + } +} + +impl BatchReader for AbomonatedBatch +where + B: BatchReader, +{ + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = AbomonatedBatchCursor; + + fn cursor(&self) -> Self::Cursor { + AbomonatedBatchCursor { + inner: self.inner.cursor(), + } + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn description(&self) -> &Description { + &self.desc + } +} + +impl Batch for AbomonatedBatch +where + B: Batch + Abomonation, + B::Time: Timestamp, +{ + type Batcher = AbomonatedBatcher; + type Builder = AbomonatedBuilder; + type Merger = AbomonatedMerger; + + fn merge_empty(mut self, other: &Self) -> Self { + assert!(other.is_empty()); + + let (lower, upper) = if self.lower() == other.upper() { + (other.lower().clone(), self.upper().clone()) + } else if self.upper() == other.lower() { + (self.lower().clone(), other.upper().clone()) + } else { + panic!("trying to merge non-consecutive batches"); + }; + + self.desc = Description::new(lower, upper, self.desc.since().clone()); + self + } +} + +/// A cursor for navigating `AbomonatedBatch`es. +pub struct AbomonatedBatchCursor { + inner: B::Cursor, +} + +impl Cursor for AbomonatedBatchCursor +where + B: BatchReader, +{ + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Storage = AbomonatedBatch; + + #[inline] + fn key_valid(&self, storage: &Self::Storage) -> bool { + self.inner.key_valid(&storage.inner) + } + + #[inline] + fn val_valid(&self, storage: &Self::Storage) -> bool { + self.inner.val_valid(&storage.inner) + } + + #[inline] + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { + self.inner.key(&storage.inner) + } + + #[inline] + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { + self.inner.val(&storage.inner) + } + + #[inline] + fn map_times(&mut self, storage: &Self::Storage, logic: L) + where + L: FnMut(&Self::Time, &Self::R), + { + self.inner.map_times(&storage.inner, logic) + } + + #[inline] + fn step_key(&mut self, storage: &Self::Storage) { + self.inner.step_key(&storage.inner) + } + + #[inline] + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { + self.inner.seek_key(&storage.inner, key) + } + + #[inline] + fn step_val(&mut self, storage: &Self::Storage) { + self.inner.step_val(&storage.inner) + } + + #[inline] + fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { + self.inner.seek_val(&storage.inner, val) + } + + #[inline] + fn rewind_keys(&mut self, storage: &Self::Storage) { + self.inner.rewind_keys(&storage.inner) + } + + #[inline] + fn rewind_vals(&mut self, storage: &Self::Storage) { + self.inner.rewind_vals(&storage.inner) + } +} + +/// A type used to assemble `AbomonatedBatch`es from unordered updates. +pub struct AbomonatedBatcher { + inner: B::Batcher, +} + +impl Batcher> for AbomonatedBatcher +where + B: Batch + Abomonation, + B::Time: Timestamp, +{ + fn new() -> Self { + Self { + inner: B::Batcher::new(), + } + } + + fn push_batch(&mut self, batch: RefOrMut>) { + self.inner.push_batch(batch); + } + + fn seal(&mut self, upper: Antichain) -> AbomonatedBatch { + AbomonatedBatch::new(self.inner.seal(upper)) + } + + fn frontier(&mut self) -> AntichainRef { + self.inner.frontier() + } +} + +/// A type used to assemble `AbomonatedBatch`es from ordered update sequences. +pub struct AbomonatedBuilder { + inner: B::Builder, +} + +impl Builder> for AbomonatedBuilder +where + B: Batch + Abomonation, + B::Time: Timestamp, +{ + fn new() -> Self { + Self { + inner: B::Builder::new(), + } + } + + fn with_capacity(cap: usize) -> Self { + Self { + inner: B::Builder::with_capacity(cap), + } + } + + fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { + self.inner.push(element); + } + + fn done( + self, + lower: Antichain, + upper: Antichain, + since: Antichain, + ) -> AbomonatedBatch { + AbomonatedBatch::new(self.inner.done(lower, upper, since)) + } +} + +/// A type used to progressively merge `AbomonatedBatch`es. +pub struct AbomonatedMerger { + inner: B::Merger, + lower: Antichain, + upper: Antichain, +} + +impl Merger> for AbomonatedMerger +where + B: Batch + Abomonation, + B::Time: Timestamp, +{ + fn new( + source1: &AbomonatedBatch, + source2: &AbomonatedBatch, + compaction_frontier: Option>, + ) -> Self { + assert!(PartialOrder::less_equal(source1.upper(), source2.lower())); + + let lower = source1.lower().clone(); + let upper = source2.upper().clone(); + + Self { + inner: B::Merger::new(&source1.inner, &source2.inner, compaction_frontier), + lower, + upper, + } + } + + fn work( + &mut self, + source1: &AbomonatedBatch, + source2: &AbomonatedBatch, + fuel: &mut isize, + ) { + self.inner.work(&source1.inner, &source2.inner, fuel); + } + + fn done(self) -> AbomonatedBatch { + let inner = self.inner.done(); + let since = inner.description().since().clone(); + let mut batch = AbomonatedBatch::new(inner); + batch.desc = Description::new(self.lower, self.upper, since); + batch + } +} diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 4eee120de..734f94c18 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -45,3 +45,5 @@ mod merge_batcher; pub use self::merge_batcher::MergeBatcher as Batcher; pub mod ord; +pub mod rc; +pub mod abomonated; diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 5c995e048..cc5338823 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -8,12 +8,12 @@ //! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation //! and should consume fewer resources (computation and memory) when it applies. -use std::rc::Rc; use std::convert::{TryFrom, TryInto}; use std::marker::PhantomData; use std::fmt::Debug; use std::ops::Deref; +use timely::PartialOrder; use timely::container::columnation::TimelyStack; use timely::container::columnation::Columnation; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -34,25 +34,25 @@ use trace::layers::MergeBuilder; // use super::spine::Spine; use super::spine_fueled::Spine; use super::merge_batcher::MergeBatcher; - -use abomonation::abomonated::Abomonated; +use super::rc::RcBatch; +use super::abomonated::AbomonatedBatch; /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine, Vec>>>; +pub type OrdValSpineAbom = Spine>>>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine>>; +pub type OrdKeySpine = Spine>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine, Vec>>>; +pub type OrdKeySpineAbom = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine, TimelyStack>>>; +pub type ColValSpine = Spine, TimelyStack>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine>>>; +pub type ColKeySpine = Spine>>>; /// A container that can retain/discard from some offset onward. @@ -100,7 +100,7 @@ where /// Where all the dataz is. pub layer: OrderedLayer, O, CV>, O, CK>, /// Description of the update times this layer represents. - pub desc: Description, + desc: Description, } impl BatchReader for OrdValBatch @@ -141,6 +141,21 @@ where fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } + + fn merge_empty(mut self, other: &Self) -> Self { + assert!(other.is_empty()); + + let (lower, upper) = if self.lower() == other.upper() { + (other.lower().clone(), self.upper().clone()) + } else if self.upper() == other.lower() { + (self.lower().clone(), other.upper().clone()) + } else { + panic!("trying to merge non-consecutive batches"); + }; + + self.desc = Description::new(lower, upper, self.desc.since().clone()); + self + } } impl OrdValBatch @@ -275,8 +290,9 @@ where CV: BatchContainer+Deref+RetainFrom, { fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option>) -> Self { - - assert!(batch1.upper() == batch2.lower()); + // We cannot assert equality here, as that would break bounds-extending + // batch wrappers, like `RcBatch`. + assert!(PartialOrder::less_equal(batch1.upper(), batch2.lower())); let mut since = batch1.description().since().join(batch2.description().since()); if let Some(compaction_frontier) = compaction_frontier { @@ -473,7 +489,7 @@ where /// Where all the dataz is. pub layer: OrderedLayer, O, CK>, /// Description of the update times this layer represents. - pub desc: Description, + desc: Description, } impl BatchReader for OrdKeyBatch @@ -516,6 +532,21 @@ where fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } + + fn merge_empty(mut self, other: &Self) -> Self { + assert!(other.is_empty()); + + let (lower, upper) = if self.lower() == other.upper() { + (other.lower().clone(), self.upper().clone()) + } else if self.upper() == other.lower() { + (self.lower().clone(), other.upper().clone()) + } else { + panic!("trying to merge non-consecutive batches"); + }; + + self.desc = Description::new(lower, upper, self.desc.since().clone()); + self + } } impl OrdKeyBatch @@ -618,8 +649,9 @@ where CK: BatchContainer+Deref+RetainFrom, { fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option>) -> Self { - - assert!(batch1.upper() == batch2.lower()); + // We cannot assert equality here, as that would break bounds-extending + // batch wrappers, like `RcBatch`. + assert!(PartialOrder::less_equal(batch1.upper(), batch2.lower())); let mut since = batch1.description().since().join(batch2.description().since()); if let Some(compaction_frontier) = compaction_frontier { diff --git a/src/trace/implementations/rc.rs b/src/trace/implementations/rc.rs new file mode 100644 index 000000000..20b71c334 --- /dev/null +++ b/src/trace/implementations/rc.rs @@ -0,0 +1,301 @@ +//! Types for reference-counted batches. + +use std::ops::Deref; +use std::rc::Rc; + +use timely::communication::message::RefOrMut; +use timely::progress::frontier::AntichainRef; +use timely::progress::{Antichain, Timestamp}; +use timely::PartialOrder; + +use crate::trace::{Batch, BatchReader, Batcher, Builder, Cursor, Description, Merger}; + +/// A batch implementation that wraps underlying batches in `Rc`. +/// +/// Keeps a description separate from that of the wrapped batch, to enable efficient merging with +/// empty batches by extending the reported lower/upper bounds. +pub struct RcBatch +where + B: BatchReader, +{ + inner: Rc, + desc: Description, +} + +impl RcBatch +where + B: BatchReader, + B::Time: Timestamp, +{ + fn new(inner: B) -> Self { + Rc::new(inner).into() + } +} + +impl Deref for RcBatch +where + B: BatchReader, +{ + type Target = B; + + fn deref(&self) -> &B { + &self.inner + } +} + +impl Clone for RcBatch +where + B: BatchReader, + B::Time: Timestamp, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + desc: self.desc.clone(), + } + } +} + +impl From> for RcBatch +where + B: BatchReader, + B::Time: Timestamp, +{ + fn from(inner: Rc) -> Self { + let desc = inner.description().clone(); + Self { inner, desc } + } +} + +impl BatchReader for RcBatch +where + B: BatchReader, +{ + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = RcBatchCursor; + + fn cursor(&self) -> Self::Cursor { + RcBatchCursor { + inner: self.inner.cursor(), + } + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn description(&self) -> &Description { + &self.desc + } +} + +impl Batch for RcBatch +where + B: Batch, + B::Time: Timestamp, +{ + type Batcher = RcBatcher; + type Builder = RcBuilder; + type Merger = RcMerger; + + fn merge_empty(mut self, other: &Self) -> Self { + assert!(other.is_empty()); + + let (lower, upper) = if self.lower() == other.upper() { + (other.lower().clone(), self.upper().clone()) + } else if self.upper() == other.lower() { + (self.lower().clone(), other.upper().clone()) + } else { + panic!("trying to merge non-consecutive batches"); + }; + + self.desc = Description::new(lower, upper, self.desc.since().clone()); + self + } +} + +/// A cursor for navigating `RcBatch`es. +pub struct RcBatchCursor { + inner: B::Cursor, +} + +impl Cursor for RcBatchCursor +where + B: BatchReader, +{ + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Storage = RcBatch; + + #[inline] + fn key_valid(&self, storage: &Self::Storage) -> bool { + self.inner.key_valid(&storage.inner) + } + + #[inline] + fn val_valid(&self, storage: &Self::Storage) -> bool { + self.inner.val_valid(&storage.inner) + } + + #[inline] + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { + self.inner.key(&storage.inner) + } + + #[inline] + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { + self.inner.val(&storage.inner) + } + + #[inline] + fn map_times(&mut self, storage: &Self::Storage, logic: L) + where + L: FnMut(&Self::Time, &Self::R), + { + self.inner.map_times(&storage.inner, logic) + } + + #[inline] + fn step_key(&mut self, storage: &Self::Storage) { + self.inner.step_key(&storage.inner) + } + + #[inline] + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { + self.inner.seek_key(&storage.inner, key) + } + + #[inline] + fn step_val(&mut self, storage: &Self::Storage) { + self.inner.step_val(&storage.inner) + } + + #[inline] + fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { + self.inner.seek_val(&storage.inner, val) + } + + #[inline] + fn rewind_keys(&mut self, storage: &Self::Storage) { + self.inner.rewind_keys(&storage.inner) + } + + #[inline] + fn rewind_vals(&mut self, storage: &Self::Storage) { + self.inner.rewind_vals(&storage.inner) + } +} + +/// A type used to assemble `RcBatch`es from unordered updates. +pub struct RcBatcher { + inner: B::Batcher, +} + +impl Batcher> for RcBatcher +where + B: Batch, + B::Time: Timestamp, +{ + fn new() -> Self { + Self { + inner: B::Batcher::new(), + } + } + + fn push_batch(&mut self, batch: RefOrMut>) { + self.inner.push_batch(batch); + } + + fn seal(&mut self, upper: Antichain) -> RcBatch { + RcBatch::new(self.inner.seal(upper)) + } + + fn frontier(&mut self) -> AntichainRef { + self.inner.frontier() + } +} + +/// A type used to assemble `RcBatch`es from ordered update sequences. +pub struct RcBuilder { + inner: B::Builder, +} + +impl Builder> for RcBuilder +where + B: Batch, + B::Time: Timestamp, +{ + fn new() -> Self { + Self { + inner: B::Builder::new(), + } + } + + fn with_capacity(cap: usize) -> Self { + Self { + inner: B::Builder::with_capacity(cap), + } + } + + fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { + self.inner.push(element); + } + + fn done( + self, + lower: Antichain, + upper: Antichain, + since: Antichain, + ) -> RcBatch { + RcBatch::new(self.inner.done(lower, upper, since)) + } +} + +/// A type used to progressively merge `RcBatch`es. +pub struct RcMerger { + inner: B::Merger, + lower: Antichain, + upper: Antichain, +} + +impl Merger> for RcMerger +where + B: Batch, + B::Time: Timestamp, +{ + fn new( + source1: &RcBatch, + source2: &RcBatch, + compaction_frontier: Option>, + ) -> Self { + assert!(PartialOrder::less_equal(source1.upper(), source2.lower())); + + let lower = source1.lower().clone(); + let upper = source2.upper().clone(); + + Self { + inner: B::Merger::new(&source1.inner, &source2.inner, compaction_frontier), + lower, + upper, + } + } + + fn work(&mut self, source1: &RcBatch, source2: &RcBatch, fuel: &mut isize) { + self.inner.work(&source1.inner, &source2.inner, fuel); + } + + fn done(self) -> RcBatch { + let inner = self.inner.done(); + let since = inner.description().since().clone(); + let mut batch = RcBatch::new(inner); + batch.desc = Description::new(self.lower, self.upper, since); + batch + } +} diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index f3b61ec94..10717de20 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -867,8 +867,16 @@ impl MergeState where B::Time: Eq { match (batch1, batch2) { (Some(batch1), Some(batch2)) => { assert!(batch1.upper() == batch2.lower()); - let begin_merge = ::begin_merge(&batch1, &batch2, compaction_frontier); - MergeVariant::InProgress(batch1, batch2, begin_merge) + if batch1.is_empty() { + let batch = batch2.merge_empty(&batch1); + MergeVariant::Complete(Some((batch, None))) + } else if batch2.is_empty() { + let batch = batch1.merge_empty(&batch2); + MergeVariant::Complete(Some((batch, None))) + } else { + let merger = batch1.begin_merge(&batch2, compaction_frontier); + MergeVariant::InProgress(batch1, batch2, merger) + } } (None, Some(x)) => MergeVariant::Complete(Some((x, None))), (Some(x), None) => MergeVariant::Complete(Some((x, None))), diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 6e411475e..e84906562 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -285,6 +285,15 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized { fn empty(lower: Antichain, upper: Antichain, since: Antichain) -> Self { ::new().done(lower, upper, since) } + + /// Merges this batch with a consecutive empty batch. + /// + /// This method places the following to be true: + /// * `other` is empty. + /// * Either `self.lower() == other.upper()` or `self.upper() == other.lower()`. + /// + /// Implementers should panic if any of these requirements is violated. + fn merge_empty(self, other: &Self) -> Self; } /// Functionality for collecting and batching updates. @@ -332,250 +341,3 @@ pub trait Merger { /// progress. fn done(self) -> Output; } - - -/// Blanket implementations for reference counted batches. -pub mod rc_blanket_impls { - - use std::rc::Rc; - use timely::communication::message::RefOrMut; - - use timely::progress::{Antichain, frontier::AntichainRef}; - use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; - - impl BatchReader for Rc { - type Key = B::Key; - type Val = B::Val; - type Time = B::Time; - type R = B::R; - - /// The type used to enumerate the batch's contents. - type Cursor = RcBatchCursor; - /// Acquires a cursor to the batch's contents. - fn cursor(&self) -> Self::Cursor { - RcBatchCursor::new((&**self).cursor()) - } - - /// The number of updates in the batch. - fn len(&self) -> usize { (&**self).len() } - /// Describes the times of the updates in the batch. - fn description(&self) -> &Description { (&**self).description() } - } - - /// Wrapper to provide cursor to nested scope. - pub struct RcBatchCursor { - cursor: B::Cursor, - } - - impl RcBatchCursor { - fn new(cursor: B::Cursor) -> Self { - RcBatchCursor { - cursor, - } - } - } - - impl Cursor for RcBatchCursor { - - type Key = B::Key; - type Val = B::Val; - type Time = B::Time; - type R = B::R; - - type Storage = Rc; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } - - #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { - self.cursor.map_times(storage, logic) - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } - } - - /// An immutable collection of updates. - impl Batch for Rc { - type Batcher = RcBatcher; - type Builder = RcBuilder; - type Merger = RcMerger; - } - - /// Wrapper type for batching reference counted batches. - pub struct RcBatcher { batcher: B::Batcher } - - /// Functionality for collecting and batching updates. - impl Batcher> for RcBatcher { - fn new() -> Self { RcBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } - fn seal(&mut self, upper: Antichain) -> Rc { Rc::new(self.batcher.seal(upper)) } - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } - } - - /// Wrapper type for building reference counted batches. - pub struct RcBuilder { builder: B::Builder } - - /// Functionality for building batches from ordered update sequences. - impl Builder> for RcBuilder { - fn new() -> Self { RcBuilder { builder: >::new() } } - fn with_capacity(cap: usize) -> Self { RcBuilder { builder: >::with_capacity(cap) } } - fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) } - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } - } - - /// Wrapper type for merging reference counted batches. - pub struct RcMerger { merger: B::Merger } - - /// Represents a merge in progress. - impl Merger> for RcMerger { - fn new(source1: &Rc, source2: &Rc, compaction_frontier: Option>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } - fn work(&mut self, source1: &Rc, source2: &Rc, fuel: &mut isize) { self.merger.work(source1, source2, fuel) } - fn done(self) -> Rc { Rc::new(self.merger.done()) } - } -} - - -/// Blanket implementations for reference counted batches. -pub mod abomonated_blanket_impls { - - extern crate abomonation; - - use abomonation::{Abomonation, measure}; - use abomonation::abomonated::Abomonated; - use timely::communication::message::RefOrMut; - use timely::progress::{Antichain, frontier::AntichainRef}; - - use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; - - impl BatchReader for Abomonated> { - - type Key = B::Key; - type Val = B::Val; - type Time = B::Time; - type R = B::R; - - /// The type used to enumerate the batch's contents. - type Cursor = AbomonatedBatchCursor; - /// Acquires a cursor to the batch's contents. - fn cursor(&self) -> Self::Cursor { - AbomonatedBatchCursor::new((&**self).cursor()) - } - - /// The number of updates in the batch. - fn len(&self) -> usize { (&**self).len() } - /// Describes the times of the updates in the batch. - fn description(&self) -> &Description { (&**self).description() } - } - - /// Wrapper to provide cursor to nested scope. - pub struct AbomonatedBatchCursor { - cursor: B::Cursor, - } - - impl AbomonatedBatchCursor { - fn new(cursor: B::Cursor) -> Self { - AbomonatedBatchCursor { - cursor, - } - } - } - - impl Cursor for AbomonatedBatchCursor { - - type Key = B::Key; - type Val = B::Val; - type Time = B::Time; - type R = B::R; - - type Storage = Abomonated>; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } - - #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { - self.cursor.map_times(storage, logic) - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } - } - - /// An immutable collection of updates. - impl Batch for Abomonated> { - type Batcher = AbomonatedBatcher; - type Builder = AbomonatedBuilder; - type Merger = AbomonatedMerger; - } - - /// Wrapper type for batching reference counted batches. - pub struct AbomonatedBatcher { batcher: B::Batcher } - - /// Functionality for collecting and batching updates. - impl Batcher>> for AbomonatedBatcher { - fn new() -> Self { AbomonatedBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } - fn seal(&mut self, upper: Antichain) -> Abomonated> { - let batch = self.batcher.seal(upper); - let mut bytes = Vec::with_capacity(measure(&batch)); - unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; - unsafe { Abomonated::::new(bytes).unwrap() } - } - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } - } - - /// Wrapper type for building reference counted batches. - pub struct AbomonatedBuilder { builder: B::Builder } - - /// Functionality for building batches from ordered update sequences. - impl Builder>> for AbomonatedBuilder { - fn new() -> Self { AbomonatedBuilder { builder: >::new() } } - fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: >::with_capacity(cap) } } - fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) } - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Abomonated> { - let batch = self.builder.done(lower, upper, since); - let mut bytes = Vec::with_capacity(measure(&batch)); - unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; - unsafe { Abomonated::::new(bytes).unwrap() } - } - } - - /// Wrapper type for merging reference counted batches. - pub struct AbomonatedMerger { merger: B::Merger } - - /// Represents a merge in progress. - impl Merger>> for AbomonatedMerger { - fn new(source1: &Abomonated>, source2: &Abomonated>, compaction_frontier: Option>) -> Self { - AbomonatedMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } - } - fn work(&mut self, source1: &Abomonated>, source2: &Abomonated>, fuel: &mut isize) { - self.merger.work(source1, source2, fuel) - } - fn done(self) -> Abomonated> { - let batch = self.merger.done(); - let mut bytes = Vec::with_capacity(measure(&batch)); - unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; - unsafe { Abomonated::::new(bytes).unwrap() } - } - } -} diff --git a/tests/trace.rs b/tests/trace.rs index d00c4497e..d23cb213f 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -1,21 +1,20 @@ extern crate timely; extern crate differential_dataflow; -use std::rc::Rc; - use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; use differential_dataflow::trace::implementations::ord::OrdValBatch; +use differential_dataflow::trace::implementations::rc::RcBatch; use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher}; use differential_dataflow::trace::cursor::Cursor; use differential_dataflow::trace::implementations::spine_fueled::Spine; -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>; type IntegerTrace = OrdValSpine; -fn get_trace() -> Spine>> { +fn get_trace() -> Spine>> { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); { From 301e496db518550521bc6583616ef80416c8bf72 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Sun, 27 Aug 2023 17:07:19 +0200 Subject: [PATCH 2/2] spine: fix logging of completed merges This commit makes sure completed merges are logged also for merges that used the empty batch optimization. Also, if we log the dropping of a merged batch, we shouldn't forget to also log the merge event, so consumers don't become confused about the whereabouts of the input batches. --- src/trace/implementations/spine_fueled.rs | 38 ++++++++++++++++------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 10717de20..62b5628ab 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -343,7 +343,7 @@ where /// Drops and logs batches. Used in `set_logical_compaction` and drop. fn drop_batches(&mut self) { if let Some(logger) = &self.logger { - for batch in self.merging.drain(..) { + for (index, batch) in self.merging.drain(..).enumerate() { match batch { MergeState::Single(Some(batch)) => { logger.log(::logging::DropEvent { @@ -361,7 +361,16 @@ where length: batch2.len(), }); }, - MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { + MergeState::Double(MergeVariant::Complete(Some((batch, input_lengths)))) => { + if let Some((length1, length2)) = input_lengths { + logger.log(::logging::MergeEvent { + operator: self.operator.global_id, + scale: index, + length1, + length2, + complete: Some(batch.len()), + }); + } logger.log(::logging::DropEvent { operator: self.operator.global_id, length: batch.len(), @@ -679,15 +688,15 @@ where /// Completes and extracts what ever is at layer `index`. fn complete_at(&mut self, index: usize) -> Option { - if let Some((merged, inputs)) = self.merging[index].complete() { - if let Some((input1, input2)) = inputs { + if let Some((merged, input_lengths)) = self.merging[index].complete() { + if let Some((length1, length2)) = input_lengths { // Log the completion of a merge from existing parts. self.logger.as_ref().map(|l| l.log( ::logging::MergeEvent { operator: self.operator.global_id, scale: index, - length1: input1.len(), - length2: input2.len(), + length1, + length2, complete: Some(merged.len()), } )); @@ -816,7 +825,7 @@ impl MergeState where B::Time: Eq { /// with the `is_complete()` method. /// /// There is the addional option of input batches. - fn complete(&mut self) -> Option<(B, Option<(B, B)>)> { + fn complete(&mut self) -> Option<(B, Option<(usize, usize)>)> { match std::mem::replace(self, MergeState::Vacant) { MergeState::Vacant => None, MergeState::Single(batch) => batch.map(|b| (b, None)), @@ -867,12 +876,13 @@ impl MergeState where B::Time: Eq { match (batch1, batch2) { (Some(batch1), Some(batch2)) => { assert!(batch1.upper() == batch2.lower()); + let input_lengths = Some((batch1.len(), batch2.len())); if batch1.is_empty() { let batch = batch2.merge_empty(&batch1); - MergeVariant::Complete(Some((batch, None))) + MergeVariant::Complete(Some((batch, input_lengths))) } else if batch2.is_empty() { let batch = batch1.merge_empty(&batch2); - MergeVariant::Complete(Some((batch, None))) + MergeVariant::Complete(Some((batch, input_lengths))) } else { let merger = batch1.begin_merge(&batch2, compaction_frontier); MergeVariant::InProgress(batch1, batch2, merger) @@ -891,7 +901,10 @@ enum MergeVariant { /// Describes an actual in-progress merge between two non-trivial batches. InProgress(B, B, ::Merger), /// A merge that requires no further work. May or may not represent a non-trivial batch. - Complete(Option<(B, Option<(B, B)>)>), + /// + /// In the case of a non-trivial batch that is the result of merging two input batches, the + /// second component contains the lengths of those input batches. + Complete(Option<(B, Option<(usize, usize)>)>), } impl MergeVariant { @@ -900,7 +913,7 @@ impl MergeVariant { /// /// The result is either `None`, for structurally empty batches, /// or a batch and optionally input batches from which it derived. - fn complete(mut self) -> Option<(B, Option<(B, B)>)> { + fn complete(mut self) -> Option<(B, Option<(usize, usize)>)> { let mut fuel = isize::max_value(); self.work(&mut fuel); if let MergeVariant::Complete(batch) = self { batch } @@ -916,7 +929,8 @@ impl MergeVariant { if let MergeVariant::InProgress(b1,b2,mut merge) = variant { merge.work(&b1,&b2,fuel); if *fuel > 0 { - *self = MergeVariant::Complete(Some((merge.done(), Some((b1,b2))))); + let input_lengths = Some((b1.len(), b2.len())); + *self = MergeVariant::Complete(Some((merge.done(), input_lengths))); } else { *self = MergeVariant::InProgress(b1,b2,merge);