diff --git a/src/trace/implementations/abomonated.rs b/src/trace/implementations/abomonated.rs new file mode 100644 index 000000000..f8f90ebba --- /dev/null +++ b/src/trace/implementations/abomonated.rs @@ -0,0 +1,294 @@ +//! Types for abomonated batch. + +use std::ops::Deref; + +use abomonation::abomonated::Abomonated; +use abomonation::{measure, Abomonation}; +use timely::communication::message::RefOrMut; +use timely::progress::frontier::AntichainRef; +use timely::progress::{Antichain, Timestamp}; +use timely::PartialOrder; + +use crate::trace::{Batch, BatchReader, Batcher, Builder, Cursor, Description, Merger}; + +/// A batch implementation that wraps underlying batches in `Abomonated`. +/// +/// Keeps a description separate from that of the wrapped batch, to enable efficient merging with +/// empty batches by extending the reported lower/upper bounds. +pub struct AbomonatedBatch { + inner: Abomonated>, + desc: Description, +} + +impl AbomonatedBatch +where + B: BatchReader + Abomonation, + B::Time: Timestamp, +{ + fn new(inner: B) -> Self { + let mut bytes = Vec::with_capacity(measure(&inner)); + unsafe { abomonation::encode(&inner, &mut bytes).unwrap() }; + let inner = unsafe { Abomonated::::new(bytes).unwrap() }; + inner.into() + } +} + +impl Deref for AbomonatedBatch +where + B: BatchReader, +{ + type Target = B; + + fn deref(&self) -> &B { + &self.inner + } +} + +impl From>> for AbomonatedBatch +where + B: BatchReader + Abomonation, + B::Time: Timestamp, +{ + fn from(inner: Abomonated>) -> Self { + let desc = inner.description().clone(); + Self { inner, desc } + } +} + +impl BatchReader for AbomonatedBatch +where + B: BatchReader, +{ + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = AbomonatedBatchCursor; + + fn cursor(&self) -> Self::Cursor { + AbomonatedBatchCursor { + inner: self.inner.cursor(), + } + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn description(&self) -> &Description { + &self.desc + } +} + +impl Batch for AbomonatedBatch +where + B: Batch + Abomonation, + B::Time: Timestamp, +{ + type Batcher = AbomonatedBatcher; + type Builder = AbomonatedBuilder; + type Merger = AbomonatedMerger; + + fn merge_empty(mut self, other: &Self) -> Self { + assert!(other.is_empty()); + + let (lower, upper) = if self.lower() == other.upper() { + (other.lower().clone(), self.upper().clone()) + } else if self.upper() == other.lower() { + (self.lower().clone(), other.upper().clone()) + } else { + panic!("trying to merge non-consecutive batches"); + }; + + self.desc = Description::new(lower, upper, self.desc.since().clone()); + self + } +} + +/// A cursor for navigating `AbomonatedBatch`es. +pub struct AbomonatedBatchCursor { + inner: B::Cursor, +} + +impl Cursor for AbomonatedBatchCursor +where + B: BatchReader, +{ + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Storage = AbomonatedBatch; + + #[inline] + fn key_valid(&self, storage: &Self::Storage) -> bool { + self.inner.key_valid(&storage.inner) + } + + #[inline] + fn val_valid(&self, storage: &Self::Storage) -> bool { + self.inner.val_valid(&storage.inner) + } + + #[inline] + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { + self.inner.key(&storage.inner) + } + + #[inline] + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { + self.inner.val(&storage.inner) + } + + #[inline] + fn map_times(&mut self, storage: &Self::Storage, logic: L) + where + L: FnMut(&Self::Time, &Self::R), + { + self.inner.map_times(&storage.inner, logic) + } + + #[inline] + fn step_key(&mut self, storage: &Self::Storage) { + self.inner.step_key(&storage.inner) + } + + #[inline] + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { + self.inner.seek_key(&storage.inner, key) + } + + #[inline] + fn step_val(&mut self, storage: &Self::Storage) { + self.inner.step_val(&storage.inner) + } + + #[inline] + fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { + self.inner.seek_val(&storage.inner, val) + } + + #[inline] + fn rewind_keys(&mut self, storage: &Self::Storage) { + self.inner.rewind_keys(&storage.inner) + } + + #[inline] + fn rewind_vals(&mut self, storage: &Self::Storage) { + self.inner.rewind_vals(&storage.inner) + } +} + +/// A type used to assemble `AbomonatedBatch`es from unordered updates. +pub struct AbomonatedBatcher { + inner: B::Batcher, +} + +impl Batcher> for AbomonatedBatcher +where + B: Batch + Abomonation, + B::Time: Timestamp, +{ + fn new() -> Self { + Self { + inner: B::Batcher::new(), + } + } + + fn push_batch(&mut self, batch: RefOrMut>) { + self.inner.push_batch(batch); + } + + fn seal(&mut self, upper: Antichain) -> AbomonatedBatch { + AbomonatedBatch::new(self.inner.seal(upper)) + } + + fn frontier(&mut self) -> AntichainRef { + self.inner.frontier() + } +} + +/// A type used to assemble `AbomonatedBatch`es from ordered update sequences. +pub struct AbomonatedBuilder { + inner: B::Builder, +} + +impl Builder> for AbomonatedBuilder +where + B: Batch + Abomonation, + B::Time: Timestamp, +{ + fn new() -> Self { + Self { + inner: B::Builder::new(), + } + } + + fn with_capacity(cap: usize) -> Self { + Self { + inner: B::Builder::with_capacity(cap), + } + } + + fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { + self.inner.push(element); + } + + fn done( + self, + lower: Antichain, + upper: Antichain, + since: Antichain, + ) -> AbomonatedBatch { + AbomonatedBatch::new(self.inner.done(lower, upper, since)) + } +} + +/// A type used to progressively merge `AbomonatedBatch`es. +pub struct AbomonatedMerger { + inner: B::Merger, + lower: Antichain, + upper: Antichain, +} + +impl Merger> for AbomonatedMerger +where + B: Batch + Abomonation, + B::Time: Timestamp, +{ + fn new( + source1: &AbomonatedBatch, + source2: &AbomonatedBatch, + compaction_frontier: Option>, + ) -> Self { + assert!(PartialOrder::less_equal(source1.upper(), source2.lower())); + + let lower = source1.lower().clone(); + let upper = source2.upper().clone(); + + Self { + inner: B::Merger::new(&source1.inner, &source2.inner, compaction_frontier), + lower, + upper, + } + } + + fn work( + &mut self, + source1: &AbomonatedBatch, + source2: &AbomonatedBatch, + fuel: &mut isize, + ) { + self.inner.work(&source1.inner, &source2.inner, fuel); + } + + fn done(self) -> AbomonatedBatch { + let inner = self.inner.done(); + let since = inner.description().since().clone(); + let mut batch = AbomonatedBatch::new(inner); + batch.desc = Description::new(self.lower, self.upper, since); + batch + } +} diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 4eee120de..734f94c18 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -45,3 +45,5 @@ mod merge_batcher; pub use self::merge_batcher::MergeBatcher as Batcher; pub mod ord; +pub mod rc; +pub mod abomonated; diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 5c995e048..cc5338823 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -8,12 +8,12 @@ //! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation //! and should consume fewer resources (computation and memory) when it applies. -use std::rc::Rc; use std::convert::{TryFrom, TryInto}; use std::marker::PhantomData; use std::fmt::Debug; use std::ops::Deref; +use timely::PartialOrder; use timely::container::columnation::TimelyStack; use timely::container::columnation::Columnation; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -34,25 +34,25 @@ use trace::layers::MergeBuilder; // use super::spine::Spine; use super::spine_fueled::Spine; use super::merge_batcher::MergeBatcher; - -use abomonation::abomonated::Abomonated; +use super::rc::RcBatch; +use super::abomonated::AbomonatedBatch; /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine, Vec>>>; +pub type OrdValSpineAbom = Spine>>>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine>>; +pub type OrdKeySpine = Spine>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine, Vec>>>; +pub type OrdKeySpineAbom = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine, TimelyStack>>>; +pub type ColValSpine = Spine, TimelyStack>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine>>>; +pub type ColKeySpine = Spine>>>; /// A container that can retain/discard from some offset onward. @@ -100,7 +100,7 @@ where /// Where all the dataz is. pub layer: OrderedLayer, O, CV>, O, CK>, /// Description of the update times this layer represents. - pub desc: Description, + desc: Description, } impl BatchReader for OrdValBatch @@ -141,6 +141,21 @@ where fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } + + fn merge_empty(mut self, other: &Self) -> Self { + assert!(other.is_empty()); + + let (lower, upper) = if self.lower() == other.upper() { + (other.lower().clone(), self.upper().clone()) + } else if self.upper() == other.lower() { + (self.lower().clone(), other.upper().clone()) + } else { + panic!("trying to merge non-consecutive batches"); + }; + + self.desc = Description::new(lower, upper, self.desc.since().clone()); + self + } } impl OrdValBatch @@ -275,8 +290,9 @@ where CV: BatchContainer+Deref+RetainFrom, { fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option>) -> Self { - - assert!(batch1.upper() == batch2.lower()); + // We cannot assert equality here, as that would break bounds-extending + // batch wrappers, like `RcBatch`. + assert!(PartialOrder::less_equal(batch1.upper(), batch2.lower())); let mut since = batch1.description().since().join(batch2.description().since()); if let Some(compaction_frontier) = compaction_frontier { @@ -473,7 +489,7 @@ where /// Where all the dataz is. pub layer: OrderedLayer, O, CK>, /// Description of the update times this layer represents. - pub desc: Description, + desc: Description, } impl BatchReader for OrdKeyBatch @@ -516,6 +532,21 @@ where fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } + + fn merge_empty(mut self, other: &Self) -> Self { + assert!(other.is_empty()); + + let (lower, upper) = if self.lower() == other.upper() { + (other.lower().clone(), self.upper().clone()) + } else if self.upper() == other.lower() { + (self.lower().clone(), other.upper().clone()) + } else { + panic!("trying to merge non-consecutive batches"); + }; + + self.desc = Description::new(lower, upper, self.desc.since().clone()); + self + } } impl OrdKeyBatch @@ -618,8 +649,9 @@ where CK: BatchContainer+Deref+RetainFrom, { fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option>) -> Self { - - assert!(batch1.upper() == batch2.lower()); + // We cannot assert equality here, as that would break bounds-extending + // batch wrappers, like `RcBatch`. + assert!(PartialOrder::less_equal(batch1.upper(), batch2.lower())); let mut since = batch1.description().since().join(batch2.description().since()); if let Some(compaction_frontier) = compaction_frontier { diff --git a/src/trace/implementations/rc.rs b/src/trace/implementations/rc.rs new file mode 100644 index 000000000..20b71c334 --- /dev/null +++ b/src/trace/implementations/rc.rs @@ -0,0 +1,301 @@ +//! Types for reference-counted batches. + +use std::ops::Deref; +use std::rc::Rc; + +use timely::communication::message::RefOrMut; +use timely::progress::frontier::AntichainRef; +use timely::progress::{Antichain, Timestamp}; +use timely::PartialOrder; + +use crate::trace::{Batch, BatchReader, Batcher, Builder, Cursor, Description, Merger}; + +/// A batch implementation that wraps underlying batches in `Rc`. +/// +/// Keeps a description separate from that of the wrapped batch, to enable efficient merging with +/// empty batches by extending the reported lower/upper bounds. +pub struct RcBatch +where + B: BatchReader, +{ + inner: Rc, + desc: Description, +} + +impl RcBatch +where + B: BatchReader, + B::Time: Timestamp, +{ + fn new(inner: B) -> Self { + Rc::new(inner).into() + } +} + +impl Deref for RcBatch +where + B: BatchReader, +{ + type Target = B; + + fn deref(&self) -> &B { + &self.inner + } +} + +impl Clone for RcBatch +where + B: BatchReader, + B::Time: Timestamp, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + desc: self.desc.clone(), + } + } +} + +impl From> for RcBatch +where + B: BatchReader, + B::Time: Timestamp, +{ + fn from(inner: Rc) -> Self { + let desc = inner.description().clone(); + Self { inner, desc } + } +} + +impl BatchReader for RcBatch +where + B: BatchReader, +{ + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = RcBatchCursor; + + fn cursor(&self) -> Self::Cursor { + RcBatchCursor { + inner: self.inner.cursor(), + } + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn description(&self) -> &Description { + &self.desc + } +} + +impl Batch for RcBatch +where + B: Batch, + B::Time: Timestamp, +{ + type Batcher = RcBatcher; + type Builder = RcBuilder; + type Merger = RcMerger; + + fn merge_empty(mut self, other: &Self) -> Self { + assert!(other.is_empty()); + + let (lower, upper) = if self.lower() == other.upper() { + (other.lower().clone(), self.upper().clone()) + } else if self.upper() == other.lower() { + (self.lower().clone(), other.upper().clone()) + } else { + panic!("trying to merge non-consecutive batches"); + }; + + self.desc = Description::new(lower, upper, self.desc.since().clone()); + self + } +} + +/// A cursor for navigating `RcBatch`es. +pub struct RcBatchCursor { + inner: B::Cursor, +} + +impl Cursor for RcBatchCursor +where + B: BatchReader, +{ + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Storage = RcBatch; + + #[inline] + fn key_valid(&self, storage: &Self::Storage) -> bool { + self.inner.key_valid(&storage.inner) + } + + #[inline] + fn val_valid(&self, storage: &Self::Storage) -> bool { + self.inner.val_valid(&storage.inner) + } + + #[inline] + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { + self.inner.key(&storage.inner) + } + + #[inline] + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { + self.inner.val(&storage.inner) + } + + #[inline] + fn map_times(&mut self, storage: &Self::Storage, logic: L) + where + L: FnMut(&Self::Time, &Self::R), + { + self.inner.map_times(&storage.inner, logic) + } + + #[inline] + fn step_key(&mut self, storage: &Self::Storage) { + self.inner.step_key(&storage.inner) + } + + #[inline] + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { + self.inner.seek_key(&storage.inner, key) + } + + #[inline] + fn step_val(&mut self, storage: &Self::Storage) { + self.inner.step_val(&storage.inner) + } + + #[inline] + fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { + self.inner.seek_val(&storage.inner, val) + } + + #[inline] + fn rewind_keys(&mut self, storage: &Self::Storage) { + self.inner.rewind_keys(&storage.inner) + } + + #[inline] + fn rewind_vals(&mut self, storage: &Self::Storage) { + self.inner.rewind_vals(&storage.inner) + } +} + +/// A type used to assemble `RcBatch`es from unordered updates. +pub struct RcBatcher { + inner: B::Batcher, +} + +impl Batcher> for RcBatcher +where + B: Batch, + B::Time: Timestamp, +{ + fn new() -> Self { + Self { + inner: B::Batcher::new(), + } + } + + fn push_batch(&mut self, batch: RefOrMut>) { + self.inner.push_batch(batch); + } + + fn seal(&mut self, upper: Antichain) -> RcBatch { + RcBatch::new(self.inner.seal(upper)) + } + + fn frontier(&mut self) -> AntichainRef { + self.inner.frontier() + } +} + +/// A type used to assemble `RcBatch`es from ordered update sequences. +pub struct RcBuilder { + inner: B::Builder, +} + +impl Builder> for RcBuilder +where + B: Batch, + B::Time: Timestamp, +{ + fn new() -> Self { + Self { + inner: B::Builder::new(), + } + } + + fn with_capacity(cap: usize) -> Self { + Self { + inner: B::Builder::with_capacity(cap), + } + } + + fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { + self.inner.push(element); + } + + fn done( + self, + lower: Antichain, + upper: Antichain, + since: Antichain, + ) -> RcBatch { + RcBatch::new(self.inner.done(lower, upper, since)) + } +} + +/// A type used to progressively merge `RcBatch`es. +pub struct RcMerger { + inner: B::Merger, + lower: Antichain, + upper: Antichain, +} + +impl Merger> for RcMerger +where + B: Batch, + B::Time: Timestamp, +{ + fn new( + source1: &RcBatch, + source2: &RcBatch, + compaction_frontier: Option>, + ) -> Self { + assert!(PartialOrder::less_equal(source1.upper(), source2.lower())); + + let lower = source1.lower().clone(); + let upper = source2.upper().clone(); + + Self { + inner: B::Merger::new(&source1.inner, &source2.inner, compaction_frontier), + lower, + upper, + } + } + + fn work(&mut self, source1: &RcBatch, source2: &RcBatch, fuel: &mut isize) { + self.inner.work(&source1.inner, &source2.inner, fuel); + } + + fn done(self) -> RcBatch { + let inner = self.inner.done(); + let since = inner.description().since().clone(); + let mut batch = RcBatch::new(inner); + batch.desc = Description::new(self.lower, self.upper, since); + batch + } +} diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index f3b61ec94..62b5628ab 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -343,7 +343,7 @@ where /// Drops and logs batches. Used in `set_logical_compaction` and drop. fn drop_batches(&mut self) { if let Some(logger) = &self.logger { - for batch in self.merging.drain(..) { + for (index, batch) in self.merging.drain(..).enumerate() { match batch { MergeState::Single(Some(batch)) => { logger.log(::logging::DropEvent { @@ -361,7 +361,16 @@ where length: batch2.len(), }); }, - MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { + MergeState::Double(MergeVariant::Complete(Some((batch, input_lengths)))) => { + if let Some((length1, length2)) = input_lengths { + logger.log(::logging::MergeEvent { + operator: self.operator.global_id, + scale: index, + length1, + length2, + complete: Some(batch.len()), + }); + } logger.log(::logging::DropEvent { operator: self.operator.global_id, length: batch.len(), @@ -679,15 +688,15 @@ where /// Completes and extracts what ever is at layer `index`. fn complete_at(&mut self, index: usize) -> Option { - if let Some((merged, inputs)) = self.merging[index].complete() { - if let Some((input1, input2)) = inputs { + if let Some((merged, input_lengths)) = self.merging[index].complete() { + if let Some((length1, length2)) = input_lengths { // Log the completion of a merge from existing parts. self.logger.as_ref().map(|l| l.log( ::logging::MergeEvent { operator: self.operator.global_id, scale: index, - length1: input1.len(), - length2: input2.len(), + length1, + length2, complete: Some(merged.len()), } )); @@ -816,7 +825,7 @@ impl MergeState where B::Time: Eq { /// with the `is_complete()` method. /// /// There is the addional option of input batches. - fn complete(&mut self) -> Option<(B, Option<(B, B)>)> { + fn complete(&mut self) -> Option<(B, Option<(usize, usize)>)> { match std::mem::replace(self, MergeState::Vacant) { MergeState::Vacant => None, MergeState::Single(batch) => batch.map(|b| (b, None)), @@ -867,8 +876,17 @@ impl MergeState where B::Time: Eq { match (batch1, batch2) { (Some(batch1), Some(batch2)) => { assert!(batch1.upper() == batch2.lower()); - let begin_merge = ::begin_merge(&batch1, &batch2, compaction_frontier); - MergeVariant::InProgress(batch1, batch2, begin_merge) + let input_lengths = Some((batch1.len(), batch2.len())); + if batch1.is_empty() { + let batch = batch2.merge_empty(&batch1); + MergeVariant::Complete(Some((batch, input_lengths))) + } else if batch2.is_empty() { + let batch = batch1.merge_empty(&batch2); + MergeVariant::Complete(Some((batch, input_lengths))) + } else { + let merger = batch1.begin_merge(&batch2, compaction_frontier); + MergeVariant::InProgress(batch1, batch2, merger) + } } (None, Some(x)) => MergeVariant::Complete(Some((x, None))), (Some(x), None) => MergeVariant::Complete(Some((x, None))), @@ -883,7 +901,10 @@ enum MergeVariant { /// Describes an actual in-progress merge between two non-trivial batches. InProgress(B, B, ::Merger), /// A merge that requires no further work. May or may not represent a non-trivial batch. - Complete(Option<(B, Option<(B, B)>)>), + /// + /// In the case of a non-trivial batch that is the result of merging two input batches, the + /// second component contains the lengths of those input batches. + Complete(Option<(B, Option<(usize, usize)>)>), } impl MergeVariant { @@ -892,7 +913,7 @@ impl MergeVariant { /// /// The result is either `None`, for structurally empty batches, /// or a batch and optionally input batches from which it derived. - fn complete(mut self) -> Option<(B, Option<(B, B)>)> { + fn complete(mut self) -> Option<(B, Option<(usize, usize)>)> { let mut fuel = isize::max_value(); self.work(&mut fuel); if let MergeVariant::Complete(batch) = self { batch } @@ -908,7 +929,8 @@ impl MergeVariant { if let MergeVariant::InProgress(b1,b2,mut merge) = variant { merge.work(&b1,&b2,fuel); if *fuel > 0 { - *self = MergeVariant::Complete(Some((merge.done(), Some((b1,b2))))); + let input_lengths = Some((b1.len(), b2.len())); + *self = MergeVariant::Complete(Some((merge.done(), input_lengths))); } else { *self = MergeVariant::InProgress(b1,b2,merge); diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 6e411475e..e84906562 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -285,6 +285,15 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized { fn empty(lower: Antichain, upper: Antichain, since: Antichain) -> Self { ::new().done(lower, upper, since) } + + /// Merges this batch with a consecutive empty batch. + /// + /// This method places the following to be true: + /// * `other` is empty. + /// * Either `self.lower() == other.upper()` or `self.upper() == other.lower()`. + /// + /// Implementers should panic if any of these requirements is violated. + fn merge_empty(self, other: &Self) -> Self; } /// Functionality for collecting and batching updates. @@ -332,250 +341,3 @@ pub trait Merger { /// progress. fn done(self) -> Output; } - - -/// Blanket implementations for reference counted batches. -pub mod rc_blanket_impls { - - use std::rc::Rc; - use timely::communication::message::RefOrMut; - - use timely::progress::{Antichain, frontier::AntichainRef}; - use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; - - impl BatchReader for Rc { - type Key = B::Key; - type Val = B::Val; - type Time = B::Time; - type R = B::R; - - /// The type used to enumerate the batch's contents. - type Cursor = RcBatchCursor; - /// Acquires a cursor to the batch's contents. - fn cursor(&self) -> Self::Cursor { - RcBatchCursor::new((&**self).cursor()) - } - - /// The number of updates in the batch. - fn len(&self) -> usize { (&**self).len() } - /// Describes the times of the updates in the batch. - fn description(&self) -> &Description { (&**self).description() } - } - - /// Wrapper to provide cursor to nested scope. - pub struct RcBatchCursor { - cursor: B::Cursor, - } - - impl RcBatchCursor { - fn new(cursor: B::Cursor) -> Self { - RcBatchCursor { - cursor, - } - } - } - - impl Cursor for RcBatchCursor { - - type Key = B::Key; - type Val = B::Val; - type Time = B::Time; - type R = B::R; - - type Storage = Rc; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } - - #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { - self.cursor.map_times(storage, logic) - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } - } - - /// An immutable collection of updates. - impl Batch for Rc { - type Batcher = RcBatcher; - type Builder = RcBuilder; - type Merger = RcMerger; - } - - /// Wrapper type for batching reference counted batches. - pub struct RcBatcher { batcher: B::Batcher } - - /// Functionality for collecting and batching updates. - impl Batcher> for RcBatcher { - fn new() -> Self { RcBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } - fn seal(&mut self, upper: Antichain) -> Rc { Rc::new(self.batcher.seal(upper)) } - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } - } - - /// Wrapper type for building reference counted batches. - pub struct RcBuilder { builder: B::Builder } - - /// Functionality for building batches from ordered update sequences. - impl Builder> for RcBuilder { - fn new() -> Self { RcBuilder { builder: >::new() } } - fn with_capacity(cap: usize) -> Self { RcBuilder { builder: >::with_capacity(cap) } } - fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) } - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } - } - - /// Wrapper type for merging reference counted batches. - pub struct RcMerger { merger: B::Merger } - - /// Represents a merge in progress. - impl Merger> for RcMerger { - fn new(source1: &Rc, source2: &Rc, compaction_frontier: Option>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } - fn work(&mut self, source1: &Rc, source2: &Rc, fuel: &mut isize) { self.merger.work(source1, source2, fuel) } - fn done(self) -> Rc { Rc::new(self.merger.done()) } - } -} - - -/// Blanket implementations for reference counted batches. -pub mod abomonated_blanket_impls { - - extern crate abomonation; - - use abomonation::{Abomonation, measure}; - use abomonation::abomonated::Abomonated; - use timely::communication::message::RefOrMut; - use timely::progress::{Antichain, frontier::AntichainRef}; - - use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; - - impl BatchReader for Abomonated> { - - type Key = B::Key; - type Val = B::Val; - type Time = B::Time; - type R = B::R; - - /// The type used to enumerate the batch's contents. - type Cursor = AbomonatedBatchCursor; - /// Acquires a cursor to the batch's contents. - fn cursor(&self) -> Self::Cursor { - AbomonatedBatchCursor::new((&**self).cursor()) - } - - /// The number of updates in the batch. - fn len(&self) -> usize { (&**self).len() } - /// Describes the times of the updates in the batch. - fn description(&self) -> &Description { (&**self).description() } - } - - /// Wrapper to provide cursor to nested scope. - pub struct AbomonatedBatchCursor { - cursor: B::Cursor, - } - - impl AbomonatedBatchCursor { - fn new(cursor: B::Cursor) -> Self { - AbomonatedBatchCursor { - cursor, - } - } - } - - impl Cursor for AbomonatedBatchCursor { - - type Key = B::Key; - type Val = B::Val; - type Time = B::Time; - type R = B::R; - - type Storage = Abomonated>; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { self.cursor.val(storage) } - - #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { - self.cursor.map_times(storage, logic) - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek_key(storage, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.seek_val(storage, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } - } - - /// An immutable collection of updates. - impl Batch for Abomonated> { - type Batcher = AbomonatedBatcher; - type Builder = AbomonatedBuilder; - type Merger = AbomonatedMerger; - } - - /// Wrapper type for batching reference counted batches. - pub struct AbomonatedBatcher { batcher: B::Batcher } - - /// Functionality for collecting and batching updates. - impl Batcher>> for AbomonatedBatcher { - fn new() -> Self { AbomonatedBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } - fn seal(&mut self, upper: Antichain) -> Abomonated> { - let batch = self.batcher.seal(upper); - let mut bytes = Vec::with_capacity(measure(&batch)); - unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; - unsafe { Abomonated::::new(bytes).unwrap() } - } - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } - } - - /// Wrapper type for building reference counted batches. - pub struct AbomonatedBuilder { builder: B::Builder } - - /// Functionality for building batches from ordered update sequences. - impl Builder>> for AbomonatedBuilder { - fn new() -> Self { AbomonatedBuilder { builder: >::new() } } - fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: >::with_capacity(cap) } } - fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) } - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Abomonated> { - let batch = self.builder.done(lower, upper, since); - let mut bytes = Vec::with_capacity(measure(&batch)); - unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; - unsafe { Abomonated::::new(bytes).unwrap() } - } - } - - /// Wrapper type for merging reference counted batches. - pub struct AbomonatedMerger { merger: B::Merger } - - /// Represents a merge in progress. - impl Merger>> for AbomonatedMerger { - fn new(source1: &Abomonated>, source2: &Abomonated>, compaction_frontier: Option>) -> Self { - AbomonatedMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } - } - fn work(&mut self, source1: &Abomonated>, source2: &Abomonated>, fuel: &mut isize) { - self.merger.work(source1, source2, fuel) - } - fn done(self) -> Abomonated> { - let batch = self.merger.done(); - let mut bytes = Vec::with_capacity(measure(&batch)); - unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; - unsafe { Abomonated::::new(bytes).unwrap() } - } - } -} diff --git a/tests/trace.rs b/tests/trace.rs index d00c4497e..d23cb213f 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -1,21 +1,20 @@ extern crate timely; extern crate differential_dataflow; -use std::rc::Rc; - use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; use differential_dataflow::trace::implementations::ord::OrdValBatch; +use differential_dataflow::trace::implementations::rc::RcBatch; use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher}; use differential_dataflow::trace::cursor::Cursor; use differential_dataflow::trace::implementations::spine_fueled::Spine; -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>; type IntegerTrace = OrdValSpine; -fn get_trace() -> Spine>> { +fn get_trace() -> Spine>> { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); {