diff --git a/differential-dataflow/src/into_owned.rs b/differential-dataflow/src/into_owned.rs deleted file mode 100644 index a4a91f49c..000000000 --- a/differential-dataflow/src/into_owned.rs +++ /dev/null @@ -1,204 +0,0 @@ -//! Traits for converting between owned and borrowed types. - -/// A reference type corresponding to an owned type, supporting conversion in each direction. -/// -/// This trait can be implemented by a GAT, and enables owned types to be borrowed as a GAT. -/// This trait is analogous to `ToOwned`, but not as prescriptive. Specifically, it avoids the -/// requirement that the other trait implement `Borrow`, for which a borrow must result in a -/// `&'self Borrowed`, which cannot move the lifetime into a GAT borrowed type. -pub trait IntoOwned<'a> { - /// Owned type into which this type can be converted. - type Owned; - /// Conversion from an instance of this type to the owned type. - #[must_use] - fn into_owned(self) -> Self::Owned; - /// Clones `self` onto an existing instance of the owned type. - fn clone_onto(self, other: &mut Self::Owned); - /// Borrows an owned instance as oneself. - #[must_use] - fn borrow_as(owned: &'a Self::Owned) -> Self; -} - -impl<'a, T: ToOwned + ?Sized> IntoOwned<'a> for &'a T { - type Owned = T::Owned; - #[inline] - fn into_owned(self) -> Self::Owned { - self.to_owned() - } - #[inline] - fn clone_onto(self, other: &mut Self::Owned) { - ::clone_into(self, other) - } - #[inline] - fn borrow_as(owned: &'a Self::Owned) -> Self { - std::borrow::Borrow::borrow(owned) - } -} - -impl<'a, T, E> IntoOwned<'a> for Result -where - T: IntoOwned<'a>, - E: IntoOwned<'a>, -{ - type Owned = Result; - - #[inline] - fn into_owned(self) -> Self::Owned { - self.map(T::into_owned).map_err(E::into_owned) - } - - #[inline] - fn clone_onto(self, other: &mut Self::Owned) { - match (self, other) { - (Ok(item), Ok(target)) => T::clone_onto(item, target), - (Err(item), Err(target)) => E::clone_onto(item, target), - (Ok(item), target) => *target = Ok(T::into_owned(item)), - (Err(item), target) => *target = Err(E::into_owned(item)), - } - } - - #[inline] - fn borrow_as(owned: &'a Self::Owned) -> Self { - owned.as_ref().map(T::borrow_as).map_err(E::borrow_as) - } -} - -impl<'a, T> IntoOwned<'a> for Option -where - T: IntoOwned<'a>, -{ - type Owned = Option; - - #[inline] - fn into_owned(self) -> Self::Owned { - self.map(IntoOwned::into_owned) - } - - #[inline] - fn clone_onto(self, other: &mut Self::Owned) { - match (self, other) { - (Some(item), Some(target)) => T::clone_onto(item, target), - (Some(item), target) => *target = Some(T::into_owned(item)), - (None, target) => *target = None, - } - } - - #[inline] - fn borrow_as(owned: &'a Self::Owned) -> Self { - owned.as_ref().map(T::borrow_as) - } -} - -mod tuple { - use paste::paste; - - macro_rules! tuple { - ($($name:ident)+) => (paste! { - - #[allow(non_camel_case_types)] - #[allow(non_snake_case)] - impl<'a, $($name),*> crate::IntoOwned<'a> for ($($name,)*) - where - $($name: crate::IntoOwned<'a>),* - { - type Owned = ($($name::Owned,)*); - - #[inline] - fn into_owned(self) -> Self::Owned { - let ($($name,)*) = self; - ( - $($name.into_owned(),)* - ) - } - - #[inline] - fn clone_onto(self, other: &mut Self::Owned) { - let ($($name,)*) = self; - let ($([<$name _other>],)*) = other; - $($name.clone_onto([<$name _other>]);)* - } - - #[inline] - fn borrow_as(owned: &'a Self::Owned) -> Self { - let ($($name,)*) = owned; - ( - $($name::borrow_as($name),)* - ) - } - } - }) - } - - tuple!(A); - tuple!(A B); - tuple!(A B C); - tuple!(A B C D); - tuple!(A B C D E); - tuple!(A B C D E F); - tuple!(A B C D E F G); - tuple!(A B C D E F G H); - tuple!(A B C D E F G H I); - tuple!(A B C D E F G H I J); - tuple!(A B C D E F G H I J K); - tuple!(A B C D E F G H I J K L); - tuple!(A B C D E F G H I J K L M); - tuple!(A B C D E F G H I J K L M N); - tuple!(A B C D E F G H I J K L M N O); - tuple!(A B C D E F G H I J K L M N O P); -} - -mod primitive { - macro_rules! implement_for { - ($index_type:ty) => { - impl<'a> crate::IntoOwned<'a> for $index_type { - type Owned = $index_type; - - #[inline] - fn into_owned(self) -> Self::Owned { - self - } - - #[inline] - fn clone_onto(self, other: &mut Self::Owned) { - *other = self; - } - - #[inline] - fn borrow_as(owned: &'a Self::Owned) -> Self { - *owned - } - } - }; -} - - implement_for!(()); - implement_for!(bool); - implement_for!(char); - - implement_for!(u8); - implement_for!(u16); - implement_for!(u32); - implement_for!(u64); - implement_for!(u128); - implement_for!(usize); - - implement_for!(i8); - implement_for!(i16); - implement_for!(i32); - implement_for!(i64); - implement_for!(i128); - implement_for!(isize); - - implement_for!(f32); - implement_for!(f64); - - implement_for!(std::num::Wrapping); - implement_for!(std::num::Wrapping); - implement_for!(std::num::Wrapping); - implement_for!(std::num::Wrapping); - implement_for!(std::num::Wrapping); - implement_for!(std::num::Wrapping); - - implement_for!(std::time::Duration); - -} \ No newline at end of file diff --git a/differential-dataflow/src/lib.rs b/differential-dataflow/src/lib.rs index c67b356cf..1431634e4 100644 --- a/differential-dataflow/src/lib.rs +++ b/differential-dataflow/src/lib.rs @@ -79,7 +79,6 @@ use std::fmt::Debug; pub use collection::{Collection, AsCollection}; pub use hashable::Hashable; pub use difference::Abelian as Diff; -pub use into_owned::IntoOwned; /// Data type usable in differential dataflow. /// @@ -106,7 +105,6 @@ pub mod logging; pub mod consolidation; pub mod capture; pub mod containers; -mod into_owned; /// Configuration options for differential dataflow. #[derive(Default)] diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 336fe6c81..71951f856 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -36,13 +36,12 @@ pub struct TraceAgent { logging: Option, } +use crate::trace::implementations::WithLayout; +impl WithLayout for TraceAgent { + type Layout = Tr::Layout; +} + impl TraceReader for TraceAgent { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = Tr::Time; - type TimeGat<'a> = Tr::TimeGat<'a>; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = Tr::Batch; type Storage = Tr::Storage; diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 7b461f816..868f69a93 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -26,7 +26,7 @@ use timely::progress::Timestamp; use timely::progress::Antichain; use timely::dataflow::operators::Capability; -use crate::{Data, ExchangeData, Collection, AsCollection, Hashable, IntoOwned}; +use crate::{Data, ExchangeData, Collection, AsCollection, Hashable}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; @@ -285,10 +285,11 @@ where /// A direct implementation of `ReduceCore::reduce_abelian`. pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> where - for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, + T1: TraceReader, T2: for<'a> Trace< Key<'a>= T1::Key<'a>, - Val<'a> : IntoOwned<'a, Owned = V>, + KeyOwn = K, + ValOwn = V, Time=T1::Time, Diff: Abelian, >+'static, @@ -309,10 +310,11 @@ where /// A direct implementation of `ReduceCore::reduce_core`. pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> where - for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, + T1: TraceReader, T2: for<'a> Trace< Key<'a>=T1::Key<'a>, - Val<'a> : IntoOwned<'a, Owned = V>, + KeyOwn = K, + ValOwn = V, Time=T1::Time, >+'static, K: Ord + 'static, diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 423f45261..fcf9dfe46 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -59,7 +59,7 @@ //! use differential_dataflow::operators::arrange::upsert; //! //! let stream = scope.input_from(&mut input); -//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValBuilder, ValSpine>(&stream, &"test"); +//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder, ValSpine>(&stream, &"test"); //! //! arranged //! .as_collection(|k,v| (k.clone(), v.clone())) @@ -111,7 +111,9 @@ use timely::dataflow::operators::Capability; use crate::operators::arrange::arrangement::Arranged; use crate::trace::{Builder, Description}; use crate::trace::{self, Trace, TraceReader, Cursor}; -use crate::{ExchangeData, Hashable, IntoOwned}; +use crate::{ExchangeData, Hashable}; + +use crate::trace::implementations::containers::BatchContainer; use super::TraceAgent; @@ -125,21 +127,19 @@ use super::TraceAgent; /// This method is only implemented for totally ordered times, as we do not yet /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. -pub fn arrange_from_upsert( - stream: &Stream, G::Timestamp)>, +pub fn arrange_from_upsert( + stream: &Stream, G::Timestamp)>, name: &str, ) -> Arranged> where G: Scope, - Tr: Trace + for<'a> TraceReader< - Key<'a> : IntoOwned<'a, Owned = K>, - Val<'a> : IntoOwned<'a, Owned = V>, + Tr: for<'a> Trace< + KeyOwn: ExchangeData+Hashable+std::hash::Hash, + ValOwn: ExchangeData, Time: TotalOrder+ExchangeData, Diff=isize, >+'static, - K: ExchangeData+Hashable+std::hash::Hash, - V: ExchangeData, - Bu: Builder, Output = Tr::Batch>, + Bu: Builder, Output = Tr::Batch>, { let mut reader: Option> = None; @@ -148,7 +148,7 @@ where let reader = &mut reader; - let exchange = Exchange::new(move |update: &(K,Option,G::Timestamp)| (update.0).hashed().into()); + let exchange = Exchange::new(move |update: &(Tr::KeyOwn,Option,G::Timestamp)| (update.0).hashed().into()); stream.unary_frontier(exchange, name, move |_capability, info| { @@ -173,7 +173,7 @@ where let mut prev_frontier = Antichain::from_elem(::minimum()); // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap). - let mut priority_queue = BinaryHeap::)>>::new(); + let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); move |input, output| { @@ -236,11 +236,11 @@ where for (key, mut list) in to_process { // The prior value associated with the key. - let mut prev_value: Option = None; + let mut prev_value: Option = None; // Attempt to find the key in the trace. - trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key)); - if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) { + trace_cursor.seek_key(&trace_storage, Tr::KeyContainer::borrow_as(&key)); + if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&Tr::KeyContainer::borrow_as(&key))).unwrap_or(false) { // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; @@ -248,7 +248,7 @@ where assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); - prev_value = Some(val.into_owned()); + prev_value = Some(Tr::owned_val(val)); } trace_cursor.step_val(&trace_storage); } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index d60c013db..fb0042766 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -8,7 +8,7 @@ use timely::Container; use timely::container::PushInto; use crate::hashable::Hashable; -use crate::{Data, ExchangeData, Collection, IntoOwned}; +use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian}; use timely::order::PartialOrder; @@ -24,6 +24,7 @@ use crate::lattice::Lattice; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder}; +use crate::trace::implementations::containers::BatchContainer; use crate::trace::TraceReader; @@ -84,7 +85,7 @@ impl Reduce for Collection impl Reduce for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a K, Val<'a>=&'a V, Diff=R>+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a V, Diff=R>+Clone+'static, { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { @@ -160,7 +161,7 @@ impl, K: ExchangeData+Hashable, R1: ExchangeDat impl Threshold for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Diff=R1>+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R1>+Clone+'static, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { self.reduce_abelian::<_,K,(),KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) @@ -207,7 +208,7 @@ impl, K: ExchangeData+Hashable, R: ExchangeData impl Count for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Diff=R>+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R>+Clone+'static, { fn count_core + 'static>(&self) -> Collection { self.reduce_abelian::<_,K,R,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) @@ -246,7 +247,8 @@ pub trait ReduceCore, K: ToOwned + ?Sized, V: D where T2: for<'a> Trace< Key<'a>= &'a K, - Val<'a> : IntoOwned<'a, Owned = V>, + KeyOwn = K, + ValOwn = V, Time=G::Timestamp, Diff: Abelian, >+'static, @@ -271,7 +273,8 @@ pub trait ReduceCore, K: ToOwned + ?Sized, V: D where T2: for<'a> Trace< Key<'a>=&'a K, - Val<'a> : IntoOwned<'a, Owned = V>, + KeyOwn = K, + ValOwn = V, Time=G::Timestamp, >+'static, Bu: Builder, Output = T2::Batch>, @@ -292,7 +295,8 @@ where V: Data, T2: for<'a> Trace< Key<'a>=&'a K, - Val<'a> : IntoOwned<'a, Owned = V>, + KeyOwn = K, + ValOwn = V, Time=G::Timestamp, >+'static, Bu: Builder, Output = T2::Batch>, @@ -309,8 +313,8 @@ where pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, - T1: for<'a> TraceReader : IntoOwned<'a, Owned = K>> + Clone + 'static, - T2: for<'a> Trace=T1::Key<'a>, Val<'a> : IntoOwned<'a, Owned = V>, Time=T1::Time> + 'static, + T1: for<'a> TraceReader + Clone + 'static, + T2: for<'a> Trace=T1::Key<'a>, ValOwn = V, Time=T1::Time> + 'static, K: Ord + 'static, V: Data, Bu: Builder>, @@ -472,7 +476,7 @@ where use std::borrow::Borrow; // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed.get(exposed_position).map(|x| <_ as IntoOwned>::borrow_as(&x.0)); + let key1 = exposed.get(exposed_position).map(|x| T1::KeyContainer::borrow_as(&x.0)); let key2 = batch_cursor.get_key(batch_storage); let key = match (key1, key2) { (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), @@ -488,7 +492,7 @@ where interesting_times.clear(); // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. - while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(& as IntoOwned>::borrow_as(&k))).unwrap_or(false) { + while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(&T1::KeyContainer::borrow_as(&k))).unwrap_or(false) { interesting_times.push(exposed[exposed_position].1.clone()); exposed_position += 1; } @@ -516,7 +520,7 @@ where // Record future warnings about interesting times (and assert they should be "future"). for time in new_interesting_times.drain(..) { debug_assert!(upper_limit.less_equal(&time)); - interesting.push((key.into_owned(), time)); + interesting.push((T1::owned_key(key), time)); } // Sort each buffer by value and move into the corresponding builder. @@ -526,7 +530,7 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - buffer.push_into(((key.into_owned(), val), time, diff)); + buffer.push_into(((T1::owned_key(key), val), time, diff)); builders[index].push(&mut buffer); buffer.clear(); } @@ -626,7 +630,7 @@ fn sort_dedup(list: &mut Vec) { trait PerKeyCompute<'a, C1, C2, C3, V> where C1: Cursor, - C2: for<'b> Cursor = C1::Key<'a>, Val<'b> : IntoOwned<'b, Owned = V>, Time = C1::Time>, + C2: for<'b> Cursor = C1::Key<'a>, ValOwn = V, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, V: Clone + Ord, { @@ -661,7 +665,6 @@ mod history_replay { use crate::lattice::Lattice; use crate::trace::Cursor; use crate::operators::ValueHistory; - use crate::IntoOwned; use super::{PerKeyCompute, sort_dedup}; @@ -690,7 +693,7 @@ mod history_replay { impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V> where C1: Cursor, - C2: for<'b> Cursor = C1::Key<'a>, Val<'b> : IntoOwned<'b, Owned = V>, Time = C1::Time>, + C2: for<'b> Cursor = C1::Key<'a>, ValOwn = V, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, V: Clone + Ord, { @@ -897,7 +900,7 @@ mod history_replay { meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet)); for &((value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { - self.output_buffer.push((value.into_owned(), diff.clone())); + self.output_buffer.push((C2::owned_val(value), diff.clone())); } else { self.temporary.push(next_time.join(time)); diff --git a/differential-dataflow/src/trace/cursor/cursor_list.rs b/differential-dataflow/src/trace/cursor/cursor_list.rs index 6c57bce4c..6d2d5b57b 100644 --- a/differential-dataflow/src/trace/cursor/cursor_list.rs +++ b/differential-dataflow/src/trace/cursor/cursor_list.rs @@ -94,17 +94,12 @@ impl CursorList { } } +use crate::trace::implementations::WithLayout; +impl WithLayout for CursorList { + type Layout = C::Layout; +} + impl Cursor for CursorList { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = C::TimeGat<'a>; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } type Storage = Vec; diff --git a/differential-dataflow/src/trace/cursor/mod.rs b/differential-dataflow/src/trace/cursor/mod.rs index f31cfb75f..d0d973249 100644 --- a/differential-dataflow/src/trace/cursor/mod.rs +++ b/differential-dataflow/src/trace/cursor/mod.rs @@ -5,37 +5,14 @@ //! both because it allows navigation on multiple levels (key and val), but also because it //! supports efficient seeking (via the `seek_key` and `seek_val` methods). -use timely::progress::Timestamp; - -use crate::difference::Semigroup; -use crate::lattice::Lattice; - pub mod cursor_list; pub use self::cursor_list::CursorList; -/// A cursor for navigating ordered `(key, val, time, diff)` updates. -pub trait Cursor { +use crate::trace::implementations::LayoutExt; - /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord; - /// Values associated with keys. - type Val<'a>: Copy + Clone + Ord; - /// Timestamps associated with updates - type Time: Timestamp + Lattice + Ord + Clone; - /// Borrowed form of timestamp. - type TimeGat<'a>: Copy; - /// Owned form of update difference. - type Diff: Semigroup + 'static; - /// Borrowed form of update difference. - type DiffGat<'a> : Copy; - - /// An owned copy of a reference to a time. - fn owned_time(time: Self::TimeGat<'_>) -> Self::Time; - /// Clones a reference time onto an owned time. - fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time); - /// An owned copy of a reference to a diff. - fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff; +/// A cursor for navigating ordered `(key, val, time, diff)` updates. +pub trait Cursor : LayoutExt { /// Storage required by the cursor. type Storage; diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index 0bc5c2651..69ca3a50f 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -100,60 +100,79 @@ pub trait Layout { /// Container for times. type TimeContainer: BatchContainer; /// Container for diffs. - type DiffContainer: BatchContainer; + type DiffContainer: BatchContainer; /// Container for offsets. type OffsetContainer: for<'a> BatchContainer = usize>; } /// A type bearing a layout. -pub trait LaidOut { +pub trait WithLayout { /// The layout. type Layout: Layout; } /// Automatically implemented trait for types with layouts. -pub trait LayoutExt : LaidOut { +pub trait LayoutExt : WithLayout> { /// Alias for an owned key of a layout. - type Key; + type KeyOwn; /// Alias for an borrowed key of a layout. - type KeyRef<'a>; + type Key<'a>: Copy + Ord; /// Alias for an owned val of a layout. - type Val; + type ValOwn: Clone + Ord; /// Alias for an borrowed val of a layout. - type ValRef<'a>; + type Val<'a>: Copy + Ord; /// Alias for an owned time of a layout. - type Time; + type Time: Lattice + timely::progress::Timestamp; /// Alias for an borrowed time of a layout. - type TimeRef<'a>; + type TimeGat<'a>: Copy + Ord; /// Alias for an owned diff of a layout. - type Diff; + type Diff: Semigroup + 'static; /// Alias for an borrowed diff of a layout. - type DiffRef<'a>; + type DiffGat<'a>: Copy + Ord; + + /// Container for update keys. + type KeyContainer: for<'a> BatchContainer = Self::Key<'a>, Owned = Self::KeyOwn>; + /// Container for update vals. + type ValContainer: for<'a> BatchContainer = Self::Val<'a>, Owned = Self::ValOwn>; + /// Container for times. + type TimeContainer: for<'a> BatchContainer = Self::TimeGat<'a>, Owned = Self::Time>; + /// Container for diffs. + type DiffContainer: for<'a> BatchContainer = Self::DiffGat<'a>, Owned = Self::Diff>; /// Construct an owned key from a reference. - fn owned_key(key: Self::KeyRef<'_>) -> Self::Key; + fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn; /// Construct an owned val from a reference. - fn owned_val(val: Self::ValRef<'_>) -> Self::Val; + fn owned_val(val: Self::Val<'_>) -> Self::ValOwn; /// Construct an owned time from a reference. - fn owned_time(time: Self::TimeRef<'_>) -> Self::Time; + fn owned_time(time: Self::TimeGat<'_>) -> Self::Time; /// Construct an owned diff from a reference. - fn owned_diff(diff: Self::DiffRef<'_>) -> Self::Diff; + fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff; + + /// Clones a reference time onto an owned time. + fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time); } -impl LayoutExt for L { - type Key = <::KeyContainer as BatchContainer>::Owned; - type KeyRef<'a> = <::KeyContainer as BatchContainer>::ReadItem<'a>; - type Val = <::ValContainer as BatchContainer>::Owned; - type ValRef<'a> = <::ValContainer as BatchContainer>::ReadItem<'a>; +impl LayoutExt for L { + type KeyOwn = <::KeyContainer as BatchContainer>::Owned; + type Key<'a> = <::KeyContainer as BatchContainer>::ReadItem<'a>; + type ValOwn = <::ValContainer as BatchContainer>::Owned; + type Val<'a> = <::ValContainer as BatchContainer>::ReadItem<'a>; type Time = <::TimeContainer as BatchContainer>::Owned; - type TimeRef<'a> = <::TimeContainer as BatchContainer>::ReadItem<'a>; + type TimeGat<'a> = <::TimeContainer as BatchContainer>::ReadItem<'a>; type Diff = <::DiffContainer as BatchContainer>::Owned; - type DiffRef<'a> = <::DiffContainer as BatchContainer>::ReadItem<'a>; + type DiffGat<'a> = <::DiffContainer as BatchContainer>::ReadItem<'a>; + + type KeyContainer = ::KeyContainer; + type ValContainer = ::ValContainer; + type TimeContainer = ::TimeContainer; + type DiffContainer = ::DiffContainer; + + #[inline(always)] fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn { ::KeyContainer::into_owned(key) } + #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { ::ValContainer::into_owned(val) } + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { ::TimeContainer::into_owned(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { ::DiffContainer::into_owned(diff) } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { ::TimeContainer::clone_onto(time, onto) } - #[inline(always)] fn owned_key(key: Self::KeyRef<'_>) -> Self::Key { ::KeyContainer::into_owned(key) } - #[inline(always)] fn owned_val(val: Self::ValRef<'_>) -> Self::Val { ::ValContainer::into_owned(val) } - #[inline(always)] fn owned_time(time: Self::TimeRef<'_>) -> Self::Time { ::TimeContainer::into_owned(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffRef<'_>) -> Self::Diff { ::DiffContainer::into_owned(diff) } } // An easy way to provide an explicit layout: as a 5-tuple. @@ -503,7 +522,7 @@ pub mod containers { /// A general-purpose container resembling `Vec`. pub trait BatchContainer: 'static { /// An owned instance of `Self::ReadItem<'_>`. - type Owned; + type Owned: Clone + Ord; /// The type that can be read back out of the container. type ReadItem<'a>: Copy + Ord; diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 40a5b0c28..8cde8383f 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -322,13 +322,11 @@ pub mod val_batch { pub updates: usize, } + impl WithLayout for OrdValBatch { + type Layout = L; + } + impl BatchReader for OrdValBatch { - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = layout::ValRef<'a, L>; - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { @@ -594,18 +592,12 @@ pub mod val_batch { phantom: PhantomData, } - impl Cursor for OrdValCursor { - - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = layout::ValRef<'a, L>; - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; + use crate::trace::implementations::WithLayout; + impl WithLayout for OrdValCursor { + type Layout = L; + } - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { L::TimeContainer::into_owned(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { L::TimeContainer::clone_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { L::DiffContainer::into_owned(diff) } + impl Cursor for OrdValCursor { type Storage = OrdValBatch; @@ -802,14 +794,11 @@ pub mod key_batch { pub updates: usize, } - impl BatchReader for OrdKeyBatch { + impl Layout = &'a ()>>> WithLayout for OrdKeyBatch { + type Layout = L; + } - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = &'a (); - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; + impl Layout = &'a ()>>> BatchReader for OrdKeyBatch { type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { @@ -827,7 +816,7 @@ pub mod key_batch { fn description(&self) -> &Description> { &self.description } } - impl Batch for OrdKeyBatch { + impl Layout = &'a ()>>> Batch for OrdKeyBatch { type Merger = OrdKeyMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef>) -> Self::Merger { @@ -994,17 +983,12 @@ pub mod key_batch { phantom: PhantomData, } - impl Cursor for OrdKeyCursor { - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = &'a (); - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; + use crate::trace::implementations::WithLayout; + impl Layout = &'a ()>>> WithLayout for OrdKeyCursor { + type Layout = L; + } - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { L::TimeContainer::into_owned(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { L::TimeContainer::clone_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { L::DiffContainer::into_owned(diff) } + impl Layout = &'a ()>>> Cursor for OrdKeyCursor { type Storage = OrdKeyBatch; diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index e844ebadf..9dcf68ede 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -273,18 +273,19 @@ mod val_batch { pub updates: usize, } - impl BatchReader for RhhValBatch + impl WithLayout for RhhValBatch where layout::Key: Default + HashOrdered, for<'a> layout::KeyRef<'a, L>: HashOrdered, { - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = layout::ValRef<'a, L>; - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; + type Layout = L; + } + impl BatchReader for RhhValBatch + where + layout::Key: Default + HashOrdered, + for<'a> layout::KeyRef<'a, L>: HashOrdered, + { type Cursor = RhhValCursor; fn cursor(&self) -> Self::Cursor { let mut cursor = RhhValCursor { @@ -650,22 +651,20 @@ mod val_batch { phantom: PhantomData, } - impl Cursor for RhhValCursor + use crate::trace::implementations::WithLayout; + impl WithLayout for RhhValCursor where layout::Key: Default + HashOrdered, for<'a> layout::KeyRef<'a, L>: HashOrdered, { - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = layout::ValRef<'a, L>; - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { L::TimeContainer::into_owned(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { L::TimeContainer::clone_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { L::DiffContainer::into_owned(diff) } + type Layout = L; + } + impl Cursor for RhhValCursor + where + layout::Key: Default + HashOrdered, + for<'a> layout::KeyRef<'a, L>: HashOrdered, + { type Storage = RhhValBatch; fn get_key<'a>(&self, storage: &'a RhhValBatch) -> Option> { storage.storage.keys.get(self.key_cursor) } diff --git a/differential-dataflow/src/trace/implementations/spine_fueled.rs b/differential-dataflow/src/trace/implementations/spine_fueled.rs index ea83f1f7c..9cdd27443 100644 --- a/differential-dataflow/src/trace/implementations/spine_fueled.rs +++ b/differential-dataflow/src/trace/implementations/spine_fueled.rs @@ -99,13 +99,12 @@ pub struct Spine { exert_logic: Option, } +use crate::trace::WithLayout; +impl WithLayout for Spine { + type Layout = B::Layout; +} + impl TraceReader for Spine { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = B::Time; - type TimeGat<'a> = B::TimeGat<'a>; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; type Batch = B; type Storage = Vec; diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 82c3dd6c3..27598b59a 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -16,11 +16,11 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; use crate::logging::Logger; -use crate::difference::Semigroup; -use crate::lattice::Lattice; pub use self::cursor::Cursor; pub use self::description::Description; +use crate::trace::implementations::LayoutExt; + /// A type used to express how much effort a trace should exert even in the absence of updates. pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize)])->Option+Send+Sync>; @@ -44,34 +44,52 @@ pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize /// This is a restricted interface to the more general `Trace` trait, which extends this trait with further methods /// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's /// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen). -pub trait TraceReader { - - /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord; - /// Values associated with keys. - type Val<'a>: Copy + Clone; - /// Timestamps associated with updates - type Time: Timestamp + Lattice + Ord + Clone; - /// Borrowed form of timestamp. - type TimeGat<'a>: Copy; - /// Owned form of update difference. - type Diff: Semigroup + 'static; - /// Borrowed form of update difference. - type DiffGat<'a> : Copy; - - /// An owned copy of a reference to a time. - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { Self::Cursor::owned_time(time) } - /// An owned copy of a reference to a diff. - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { Self::Cursor::owned_diff(diff) } +pub trait TraceReader : LayoutExt { /// The type of an immutable collection of updates. - type Batch: for<'a> BatchReader = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>+Clone+'static; + type Batch: + 'static + + Clone + + BatchReader + + WithLayout + + for<'a> LayoutExt< + Key<'a> = Self::Key<'a>, + KeyOwn = Self::KeyOwn, + Val<'a> = Self::Val<'a>, + ValOwn = Self::ValOwn, + Time = Self::Time, + TimeGat<'a> = Self::TimeGat<'a>, + Diff = Self::Diff, + DiffGat<'a> = Self::DiffGat<'a>, + KeyContainer = Self::KeyContainer, + ValContainer = Self::ValContainer, + TimeContainer = Self::TimeContainer, + DiffContainer = Self::DiffContainer, + >; + /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`. type Storage; /// The type used to enumerate the collections contents. - type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>; + type Cursor: + Cursor + + WithLayout + + for<'a> LayoutExt< + Key<'a> = Self::Key<'a>, + KeyOwn = Self::KeyOwn, + Val<'a> = Self::Val<'a>, + ValOwn = Self::ValOwn, + Time = Self::Time, + TimeGat<'a> = Self::TimeGat<'a>, + Diff = Self::Diff, + DiffGat<'a> = Self::DiffGat<'a>, + KeyContainer = Self::KeyContainer, + ValContainer = Self::ValContainer, + TimeContainer = Self::TimeContainer, + DiffContainer = Self::DiffContainer, + >; + /// Provides a cursor over updates contained in the trace. fn cursor(&mut self) -> (Self::Cursor, Self::Storage) { @@ -219,34 +237,36 @@ pub trait Trace : TraceReader { fn close(&mut self); } +use crate::trace::implementations::WithLayout; + /// A batch of updates whose contents may be read. /// /// This is a restricted interface to batches of updates, which support the reading of the batch's contents, /// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is /// especially useful for views derived from other sources in ways that prevent the construction of batches /// from the type of data in the view (for example, filtered views, or views with extended time coordinates). -pub trait BatchReader : Sized { - /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord; - /// Values associated with keys. - type Val<'a>: Copy + Clone; - /// Timestamps associated with updates - type Time: Timestamp + Lattice + Ord + Clone; - /// Borrowed form of timestamp. - type TimeGat<'a>: Copy; - /// Owned form of update difference. - type Diff: Semigroup + 'static; - /// Borrowed form of update difference. - type DiffGat<'a> : Copy; - - /// An owned copy of a reference to a time. - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { Self::Cursor::owned_time(time) } - /// An owned copy of a reference to a diff. - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { Self::Cursor::owned_diff(diff) } +pub trait BatchReader : LayoutExt + Sized { /// The type used to enumerate the batch's contents. - type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>; - /// Acquires a cursor to the batch's contents. + type Cursor: + Cursor + + WithLayout + + for<'a> LayoutExt< + Key<'a> = Self::Key<'a>, + KeyOwn = Self::KeyOwn, + Val<'a> = Self::Val<'a>, + ValOwn = Self::ValOwn, + Time = Self::Time, + TimeGat<'a> = Self::TimeGat<'a>, + Diff = Self::Diff, + DiffGat<'a> = Self::DiffGat<'a>, + KeyContainer = Self::KeyContainer, + ValContainer = Self::ValContainer, + TimeContainer = Self::TimeContainer, + DiffContainer = Self::DiffContainer, + >; + + /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor; /// The number of updates in the batch. fn len(&self) -> usize; @@ -357,13 +377,11 @@ pub mod rc_blanket_impls { use timely::progress::{Antichain, frontier::AntichainRef}; use super::{Batch, BatchReader, Builder, Merger, Cursor, Description}; + impl WithLayout for Rc { + type Layout = B::Layout; + } + impl BatchReader for Rc { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = B::Time; - type TimeGat<'a> = B::TimeGat<'a>; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; /// The type used to enumerate the batch's contents. type Cursor = RcBatchCursor; @@ -383,6 +401,11 @@ pub mod rc_blanket_impls { cursor: C, } + use crate::trace::implementations::WithLayout; + impl WithLayout for RcBatchCursor { + type Layout = C::Layout; + } + impl RcBatchCursor { fn new(cursor: C) -> Self { RcBatchCursor { @@ -393,17 +416,6 @@ pub mod rc_blanket_impls { impl Cursor for RcBatchCursor { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = C::TimeGat<'a>; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = Rc; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index 72c488fbd..6886631df 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -25,18 +25,25 @@ impl Clone for TraceEnter { } } -impl TraceReader for TraceEnter +impl WithLayout for TraceEnter where Tr: TraceReader, TInner: Refines+Lattice, { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} +impl TraceReader for TraceEnter +where + Tr: TraceReader, + TInner: Refines+Lattice, +{ type Batch = BatchEnter; type Storage = Tr::Storage; type Cursor = CursorEnter; @@ -109,18 +116,25 @@ pub struct BatchEnter { description: Description, } -impl BatchReader for BatchEnter +impl WithLayout for BatchEnter where B: BatchReader, TInner: Refines+Lattice, { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} +impl BatchReader for BatchEnter +where + B: BatchReader, + TInner: Refines+Lattice, +{ type Cursor = BatchCursorEnter; fn cursor(&self) -> Self::Cursor { @@ -154,6 +168,21 @@ pub struct CursorEnter { cursor: C, } +use crate::trace::implementations::{Layout, WithLayout}; +impl WithLayout for CursorEnter +where + C: Cursor, + TInner: Refines+Lattice, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl CursorEnter { fn new(cursor: C) -> Self { CursorEnter { @@ -168,17 +197,6 @@ where C: Cursor, TInner: Refines+Lattice, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -224,21 +242,24 @@ impl BatchCursorEnter { } } -impl Cursor for BatchCursorEnter +impl WithLayout for BatchCursorEnter where + C: Cursor, TInner: Refines+Lattice, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} +impl Cursor for BatchCursorEnter +where + TInner: Refines+Lattice, +{ type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index 6f9d51e22..3a789f473 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -40,6 +40,22 @@ where } } +impl WithLayout for TraceEnter +where + Tr: TraceReader, + TInner: Refines+Lattice, + F: Clone, + G: Clone, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl TraceReader for TraceEnter where Tr: TraceReader, @@ -48,13 +64,6 @@ where F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone, G: FnMut(&TInner)->Tr::Time+Clone+'static, { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; - type Batch = BatchEnter; type Storage = Tr::Storage; type Cursor = CursorEnter; @@ -131,19 +140,27 @@ pub struct BatchEnter { logic: F, } -impl BatchReader for BatchEnter +impl WithLayout for BatchEnter where B: BatchReader, TInner: Refines+Lattice, - F: FnMut(B::Key<'_>, ::Val<'_>, B::TimeGat<'_>)->TInner+Clone, { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} +use crate::trace::implementations::LayoutExt; +impl BatchReader for BatchEnter +where + B: BatchReader, + TInner: Refines+Lattice, + F: FnMut(B::Key<'_>, ::Val<'_>, B::TimeGat<'_>)->TInner+Clone, +{ type Cursor = BatchCursorEnter; fn cursor(&self) -> Self::Cursor { @@ -179,6 +196,21 @@ pub struct CursorEnter { logic: F, } +use crate::trace::implementations::{Layout, WithLayout}; +impl WithLayout for CursorEnter +where + C: Cursor, + TInner: Refines+Lattice, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl CursorEnter { fn new(cursor: C, logic: F) -> Self { CursorEnter { @@ -195,17 +227,6 @@ where TInner: Refines+Lattice, F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -246,6 +267,20 @@ pub struct BatchCursorEnter { logic: F, } +impl WithLayout for BatchCursorEnter +where + C: Cursor, + TInner: Refines+Lattice, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl BatchCursorEnter { fn new(cursor: C, logic: F) -> Self { BatchCursorEnter { @@ -261,17 +296,6 @@ where TInner: Refines+Lattice, F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/filter.rs b/differential-dataflow/src/trace/wrappers/filter.rs index 617dafc7b..ad49543bd 100644 --- a/differential-dataflow/src/trace/wrappers/filter.rs +++ b/differential-dataflow/src/trace/wrappers/filter.rs @@ -24,18 +24,15 @@ where } } +impl WithLayout for TraceFilter { + type Layout = Tr::Layout; +} + impl TraceReader for TraceFilter where Tr: TraceReader, F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static, { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = Tr::Time; - type TimeGat<'a> = Tr::TimeGat<'a>; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; - type Batch = BatchFilter; type Storage = Tr::Storage; type Cursor = CursorFilter; @@ -75,18 +72,15 @@ pub struct BatchFilter { logic: F, } +impl WithLayout for BatchFilter { + type Layout = B::Layout; +} + impl BatchReader for BatchFilter where B: BatchReader, F: FnMut(B::Key<'_>, B::Val<'_>)->bool+Clone+'static { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = B::Time; - type TimeGat<'a> = B::TimeGat<'a>; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; - type Cursor = BatchCursorFilter; fn cursor(&self) -> Self::Cursor { @@ -112,6 +106,11 @@ pub struct CursorFilter { logic: F, } +use crate::trace::implementations::WithLayout; +impl WithLayout for CursorFilter { + type Layout = C::Layout; +} + impl CursorFilter { fn new(cursor: C, logic: F) -> Self { CursorFilter { @@ -126,17 +125,6 @@ where C: Cursor, F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = C::TimeGat<'a>; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -175,6 +163,10 @@ pub struct BatchCursorFilter { logic: F, } +impl WithLayout for BatchCursorFilter { + type Layout = C::Layout; +} + impl BatchCursorFilter { fn new(cursor: C, logic: F) -> Self { BatchCursorFilter { @@ -188,17 +180,6 @@ impl Cursor for BatchCursorFilter where F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = C::TimeGat<'a>; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = BatchFilter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/freeze.rs b/differential-dataflow/src/trace/wrappers/freeze.rs index aed9c781e..5af2cdbf3 100644 --- a/differential-dataflow/src/trace/wrappers/freeze.rs +++ b/differential-dataflow/src/trace/wrappers/freeze.rs @@ -69,18 +69,25 @@ where } } +impl WithLayout for TraceFreeze +where + Tr: TraceReader, + F: Fn(Tr::TimeGat<'_>)->Option, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl TraceReader for TraceFreeze where Tr: TraceReader, F: Fn(Tr::TimeGat<'_>)->Option+'static, { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = Tr::Time; - type TimeGat<'a> = &'a Tr::Time; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; - type Batch = BatchFreeze; type Storage = Tr::Storage; type Cursor = CursorFreeze; @@ -132,18 +139,25 @@ impl Clone for BatchFreeze { } } -impl BatchReader for BatchFreeze +impl WithLayout for BatchFreeze where B: BatchReader, F: Fn(B::TimeGat<'_>)->Option, { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = B::Time; - type TimeGat<'a> = &'a B::Time; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} +impl BatchReader for BatchFreeze +where + B: BatchReader, + F: Fn(B::TimeGat<'_>)->Option, +{ type Cursor = BatchCursorFreeze; fn cursor(&self) -> Self::Cursor { @@ -170,6 +184,17 @@ pub struct CursorFreeze { func: Rc, } +use crate::trace::implementations::{Layout, WithLayout}; +impl WithLayout for CursorFreeze { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl CursorFreeze { fn new(cursor: C, func: Rc) -> Self { Self { cursor, func } @@ -181,17 +206,6 @@ where C: Cursor, F: Fn(C::TimeGat<'_>)->Option, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = &'a C::Time; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -229,6 +243,16 @@ pub struct BatchCursorFreeze { func: Rc, } +impl WithLayout for BatchCursorFreeze { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl BatchCursorFreeze { fn new(cursor: C, func: Rc) -> Self { Self { cursor, func } @@ -240,17 +264,6 @@ impl Cursor for BatchCursorFreeze where F: Fn(C::TimeGat<'_>)->Option, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = &'a C::Time; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = BatchFreeze; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/frontier.rs b/differential-dataflow/src/trace/wrappers/frontier.rs index f69dc1f59..22cca80fe 100644 --- a/differential-dataflow/src/trace/wrappers/frontier.rs +++ b/differential-dataflow/src/trace/wrappers/frontier.rs @@ -31,13 +31,17 @@ impl Clone for TraceFrontier { } } +impl WithLayout for TraceFrontier { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl TraceReader for TraceFrontier { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = Tr::Time; - type TimeGat<'a> = &'a Tr::Time; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = BatchFrontier; type Storage = Tr::Storage; @@ -82,13 +86,17 @@ pub struct BatchFrontier { until: Antichain, } +impl WithLayout for BatchFrontier { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl BatchReader for BatchFrontier { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = B::Time; - type TimeGat<'a> = &'a B::Time; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; type Cursor = BatchCursorFrontier; @@ -117,6 +125,17 @@ pub struct CursorFrontier { until: Antichain } +use crate::trace::implementations::{Layout, WithLayout}; +impl WithLayout for CursorFrontier { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl CursorFrontier { fn new(cursor: C, since: AntichainRef, until: AntichainRef) -> Self { CursorFrontier { @@ -128,16 +147,6 @@ impl CursorFrontier { } impl Cursor for CursorFrontier { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = &'a C::Time; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } type Storage = C::Storage; @@ -183,6 +192,16 @@ pub struct BatchCursorFrontier { until: Antichain, } +impl WithLayout for BatchCursorFrontier { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl BatchCursorFrontier { fn new(cursor: C, since: AntichainRef, until: AntichainRef) -> Self { BatchCursorFrontier { @@ -194,16 +213,6 @@ impl BatchCursorFrontier { } impl> Cursor for BatchCursorFrontier { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = &'a C::Time; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } type Storage = BatchFrontier; diff --git a/differential-dataflow/src/trace/wrappers/rc.rs b/differential-dataflow/src/trace/wrappers/rc.rs index 4a877031a..9051db631 100644 --- a/differential-dataflow/src/trace/wrappers/rc.rs +++ b/differential-dataflow/src/trace/wrappers/rc.rs @@ -78,13 +78,12 @@ pub struct TraceRc { pub wrapper: Rc>>, } +use crate::trace::WithLayout; +impl WithLayout for TraceRc { + type Layout = Tr::Layout; +} + impl TraceReader for TraceRc { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = Tr::Time; - type TimeGat<'a> = Tr::TimeGat<'a>; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = Tr::Batch; type Storage = Tr::Storage; diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index d2bbbf1cd..2633d0b62 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -4,7 +4,6 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; -use differential_dataflow::IntoOwned; /// Reports a number of extensions to a stream of prefixes. /// @@ -20,11 +19,7 @@ pub fn count( ) -> Collection where G: Scope, - Tr: for<'a> TraceReader< - Key<'a>: IntoOwned<'a, Owned = K>, - TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, - Diff=isize, - >+Clone+'static, + Tr: TraceReader+Clone+'static, for<'a> Tr::Diff : Semigroup>, K: Hashable + Ord + Default + 'static, R: Monoid+Multiply+ExchangeData, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 0ed560232..bf9b3b589 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -50,7 +50,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{Cursor, TraceReader}; use differential_dataflow::consolidation::{consolidate, consolidate_updates}; -use differential_dataflow::IntoOwned; +use differential_dataflow::trace::implementations::BatchContainer; /// A binary equijoin that responds to updates on only its first input. /// @@ -86,7 +86,7 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>, TimeGat<'a> : IntoOwned<'a, Owned = Tr::Time>>+Clone+'static, + Tr: TraceReader+Clone+'static, R: Mul, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, @@ -156,7 +156,7 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>, TimeGat<'a> : IntoOwned<'a, Owned = Tr::Time>>+Clone+'static, + Tr: for<'a> TraceReader+Clone+'static, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(std::time::Instant, usize) -> bool + 'static, @@ -306,17 +306,17 @@ fn process_proposals( comparison: &CF, yield_function: &Y, output_func: &mut S, - mut output_buffer: &mut Vec<(::Time, ::Diff)>, + mut output_buffer: &mut Vec<(Tr::Time, Tr::Diff)>, timer: Instant, work: &mut usize, trace: &mut Tr, - proposals: &mut Vec<((K, V, ::Time), ::Time, R)>, + proposals: &mut Vec<((K, V, Tr::Time), Tr::Time, R)>, mut session: SessionFor, - frontier: AntichainRef<::Time> + frontier: AntichainRef ) -> bool where G: Scope, - Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>, TimeGat<'a> : IntoOwned<'a, Owned = Tr::Time>>, + Tr: for<'a> TraceReader, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(Instant, usize) -> bool + 'static, S: FnMut(&mut SessionFor, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static, @@ -335,14 +335,13 @@ where for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() { // Use TOTAL ORDER to allow the release of `time`. yielded = yielded || yield_function(timer, *work); - if !yielded && !frontier.iter().any(|t| comparison( as IntoOwned>::borrow_as(t), initial)) { - use differential_dataflow::IntoOwned; - cursor.seek_key(&storage, IntoOwned::borrow_as(key)); - if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) { + if !yielded && !frontier.iter().any(|t| comparison(Tr::TimeContainer::borrow_as(t), initial)) { + cursor.seek_key(&storage, Tr::KeyContainer::borrow_as(key)); + if cursor.get_key(&storage) == Some(Tr::KeyContainer::borrow_as(key)) { while let Some(val2) = cursor.get_val(&storage) { cursor.map_times(&storage, |t, d| { if comparison(t, initial) { - let mut t = t.into_owned(); + let mut t = Tr::owned_time(t); t.join_assign(time); output_buffer.push((t, Tr::owned_diff(d))) } diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 436f8da7c..0b0f6df5a 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{IsZero, Semigroup, Monoid}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{Cursor, TraceReader}; -use differential_dataflow::IntoOwned; +use differential_dataflow::trace::implementations::BatchContainer; /// Proposes extensions to a stream of prefixes. /// @@ -29,8 +29,7 @@ pub fn lookup_map( where G: Scope, Tr: for<'a> TraceReader< - Key<'a>: IntoOwned<'a, Owned = K>, - TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, + KeyOwn = K, Diff : Semigroup>+Monoid+ExchangeData, >+Clone+'static, K: Hashable + Ord + 'static, @@ -94,13 +93,12 @@ where for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() { if !input2.frontier.less_equal(time) { logic2(prefix, &mut key1); - use differential_dataflow::IntoOwned; - cursor.seek_key(&storage, IntoOwned::borrow_as(&key1)); - if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) { + cursor.seek_key(&storage, Tr::KeyContainer::borrow_as(&key1)); + if cursor.get_key(&storage) == Some(Tr::KeyContainer::borrow_as(&key1)) { while let Some(value) = cursor.get_val(&storage) { let mut count = Tr::Diff::zero(); cursor.map_times(&storage, |t, d| { - if t.into_owned().less_equal(time) { count.plus_equals(&d); } + if Tr::owned_time(t).less_equal(time) { count.plus_equals(&d); } }); if !count.is_zero() { let (dout, rout) = output_func(prefix, diff, value, &count); diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index 7b96f00fd..b519ecf47 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -4,7 +4,6 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; -use differential_dataflow::IntoOwned; /// Proposes extensions to a prefix stream. /// @@ -22,9 +21,8 @@ pub fn propose( where G: Scope, Tr: for<'a> TraceReader< - Key<'a> : IntoOwned<'a, Owned = K>, - Val<'a> : IntoOwned<'a, Owned = V>, - TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, + KeyOwn = K, + ValOwn = V, Diff: Monoid+Multiply+ExchangeData+Semigroup>, >+Clone+'static, K: Hashable + Default + Ord + 'static, @@ -36,7 +34,7 @@ where prefixes, arrangement, move |p: &P, k: &mut K | { *k = key_selector(p); }, - move |prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)), + move |prefix, diff, value, sum| ((prefix.clone(), Tr::owned_val(value)), diff.clone().multiply(sum)), Default::default(), Default::default(), Default::default(), @@ -56,9 +54,8 @@ pub fn propose_distinct( where G: Scope, Tr: for<'a> TraceReader< - Key<'a> : IntoOwned<'a, Owned = K>, - Val<'a> : IntoOwned<'a, Owned = V>, - TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, + KeyOwn = K, + ValOwn = V, Diff : Semigroup>+Monoid+Multiply+ExchangeData, >+Clone+'static, K: Hashable + Default + Ord + 'static, @@ -70,7 +67,7 @@ where prefixes, arrangement, move |p: &P, k: &mut K| { *k = key_selector(p); }, - move |prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()), + move |prefix, diff, value, _sum| ((prefix.clone(), Tr::owned_val(value)), diff.clone()), Default::default(), Default::default(), Default::default(), diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 330661027..9c111d252 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -6,7 +6,6 @@ use differential_dataflow::{ExchangeData, Collection}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; -use differential_dataflow::IntoOwned; /// Proposes extensions to a stream of prefixes. /// @@ -21,8 +20,7 @@ pub fn validate( where G: Scope, Tr: for<'a> TraceReader< - Key<'a> : IntoOwned<'a, Owned = (K, V)>, - TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, + KeyOwn = (K, V), Diff : Semigroup>+Monoid+Multiply+ExchangeData, >+Clone+'static, K: Ord+Hash+Clone+Default + 'static,