From 99d2beec78967b2beb2c5523596b9f6d1e928f85 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 15 Jul 2025 10:34:48 -0400 Subject: [PATCH 1/6] Cursor : LayoutExt --- .../src/trace/cursor/cursor_list.rs | 5 ++ differential-dataflow/src/trace/cursor/mod.rs | 4 +- .../src/trace/implementations/mod.rs | 56 ++++++++++++------- .../src/trace/implementations/ord_neu.rs | 10 ++++ .../src/trace/implementations/rhh.rs | 9 +++ differential-dataflow/src/trace/mod.rs | 5 ++ .../src/trace/wrappers/enter.rs | 29 ++++++++++ .../src/trace/wrappers/enter_at.rs | 29 ++++++++++ .../src/trace/wrappers/filter.rs | 9 +++ .../src/trace/wrappers/freeze.rs | 21 +++++++ .../src/trace/wrappers/frontier.rs | 21 +++++++ 11 files changed, 176 insertions(+), 22 deletions(-) diff --git a/differential-dataflow/src/trace/cursor/cursor_list.rs b/differential-dataflow/src/trace/cursor/cursor_list.rs index 6c57bce4c..334781911 100644 --- a/differential-dataflow/src/trace/cursor/cursor_list.rs +++ b/differential-dataflow/src/trace/cursor/cursor_list.rs @@ -94,6 +94,11 @@ impl CursorList { } } +use crate::trace::implementations::LaidOut; +impl LaidOut for CursorList { + type Layout = C::Layout; +} + impl Cursor for CursorList { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; diff --git a/differential-dataflow/src/trace/cursor/mod.rs b/differential-dataflow/src/trace/cursor/mod.rs index f31cfb75f..629ba78ae 100644 --- a/differential-dataflow/src/trace/cursor/mod.rs +++ b/differential-dataflow/src/trace/cursor/mod.rs @@ -14,8 +14,10 @@ pub mod cursor_list; pub use self::cursor_list::CursorList; +use crate::trace::implementations::LayoutExt; + /// A cursor for navigating ordered `(key, val, time, diff)` updates. -pub trait Cursor { +pub trait Cursor : LayoutExt { /// Key by which updates are indexed. type Key<'a>: Copy + Clone + Ord; diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index 0bc5c2651..e7c1ab6df 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -112,48 +112,62 @@ pub trait LaidOut { } /// Automatically implemented trait for types with layouts. -pub trait LayoutExt : LaidOut { +pub trait LayoutExt : LaidOut> { /// Alias for an owned key of a layout. - type Key; + type KeyOwn; /// Alias for an borrowed key of a layout. - type KeyRef<'a>; + type KeyRef<'a>: Copy + Ord; /// Alias for an owned val of a layout. - type Val; + type ValOwn; /// Alias for an borrowed val of a layout. - type ValRef<'a>; + type ValRef<'a>: Copy + Ord; /// Alias for an owned time of a layout. - type Time; + type TimeOwn: Lattice + timely::progress::Timestamp; /// Alias for an borrowed time of a layout. - type TimeRef<'a>; + type TimeRef<'a>: Copy + Ord; /// Alias for an owned diff of a layout. - type Diff; + type DiffOwn: Semigroup; /// Alias for an borrowed diff of a layout. - type DiffRef<'a>; + type DiffRef<'a>: Copy + Ord; + + /// Container for update keys. + type KeyContainer: for<'a> BatchContainer = Self::KeyRef<'a>, Owned = Self::KeyOwn>; + /// Container for update vals. + type ValContainer: for<'a> BatchContainer = Self::ValRef<'a>, Owned = Self::ValOwn>; + /// Container for times. + type TimeContainer: for<'a> BatchContainer = Self::TimeRef<'a>, Owned = Self::TimeOwn>; + /// Container for diffs. + type DiffContainer: for<'a> BatchContainer = Self::DiffRef<'a>, Owned = Self::DiffOwn>; /// Construct an owned key from a reference. - fn owned_key(key: Self::KeyRef<'_>) -> Self::Key; + fn owned_key(key: Self::KeyRef<'_>) -> Self::KeyOwn; /// Construct an owned val from a reference. - fn owned_val(val: Self::ValRef<'_>) -> Self::Val; + fn owned_val(val: Self::ValRef<'_>) -> Self::ValOwn; /// Construct an owned time from a reference. - fn owned_time(time: Self::TimeRef<'_>) -> Self::Time; + fn owned_time2(time: Self::TimeRef<'_>) -> Self::TimeOwn; /// Construct an owned diff from a reference. - fn owned_diff(diff: Self::DiffRef<'_>) -> Self::Diff; + fn owned_diff2(diff: Self::DiffRef<'_>) -> Self::DiffOwn; } impl LayoutExt for L { - type Key = <::KeyContainer as BatchContainer>::Owned; + type KeyOwn = <::KeyContainer as BatchContainer>::Owned; type KeyRef<'a> = <::KeyContainer as BatchContainer>::ReadItem<'a>; - type Val = <::ValContainer as BatchContainer>::Owned; + type ValOwn = <::ValContainer as BatchContainer>::Owned; type ValRef<'a> = <::ValContainer as BatchContainer>::ReadItem<'a>; - type Time = <::TimeContainer as BatchContainer>::Owned; + type TimeOwn = <::TimeContainer as BatchContainer>::Owned; type TimeRef<'a> = <::TimeContainer as BatchContainer>::ReadItem<'a>; - type Diff = <::DiffContainer as BatchContainer>::Owned; + type DiffOwn = <::DiffContainer as BatchContainer>::Owned; type DiffRef<'a> = <::DiffContainer as BatchContainer>::ReadItem<'a>; - #[inline(always)] fn owned_key(key: Self::KeyRef<'_>) -> Self::Key { ::KeyContainer::into_owned(key) } - #[inline(always)] fn owned_val(val: Self::ValRef<'_>) -> Self::Val { ::ValContainer::into_owned(val) } - #[inline(always)] fn owned_time(time: Self::TimeRef<'_>) -> Self::Time { ::TimeContainer::into_owned(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffRef<'_>) -> Self::Diff { ::DiffContainer::into_owned(diff) } + type KeyContainer = ::KeyContainer; + type ValContainer = ::ValContainer; + type TimeContainer = ::TimeContainer; + type DiffContainer = ::DiffContainer; + + #[inline(always)] fn owned_key(key: Self::KeyRef<'_>) -> Self::KeyOwn { ::KeyContainer::into_owned(key) } + #[inline(always)] fn owned_val(val: Self::ValRef<'_>) -> Self::ValOwn { ::ValContainer::into_owned(val) } + #[inline(always)] fn owned_time2(time: Self::TimeRef<'_>) -> Self::TimeOwn { ::TimeContainer::into_owned(time) } + #[inline(always)] fn owned_diff2(diff: Self::DiffRef<'_>) -> Self::DiffOwn { ::DiffContainer::into_owned(diff) } } // An easy way to provide an explicit layout: as a 5-tuple. diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 40a5b0c28..4c83fc43e 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -594,6 +594,11 @@ pub mod val_batch { phantom: PhantomData, } + use crate::trace::implementations::LaidOut; + impl LaidOut for OrdValCursor { + type Layout = L; + } + impl Cursor for OrdValCursor { type Key<'a> = layout::KeyRef<'a, L>; @@ -994,6 +999,11 @@ pub mod key_batch { phantom: PhantomData, } + use crate::trace::implementations::LaidOut; + impl LaidOut for OrdKeyCursor { + type Layout = L; + } + impl Cursor for OrdKeyCursor { type Key<'a> = layout::KeyRef<'a, L>; type Val<'a> = &'a (); diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index e844ebadf..03456e648 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -650,6 +650,15 @@ mod val_batch { phantom: PhantomData, } + use crate::trace::implementations::LaidOut; + impl LaidOut for RhhValCursor + where + layout::Key: Default + HashOrdered, + for<'a> layout::KeyRef<'a, L>: HashOrdered, + { + type Layout = L; + } + impl Cursor for RhhValCursor where layout::Key: Default + HashOrdered, diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 82c3dd6c3..b6fd65d41 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -383,6 +383,11 @@ pub mod rc_blanket_impls { cursor: C, } + use crate::trace::implementations::LaidOut; + impl LaidOut for RcBatchCursor { + type Layout = C::Layout; + } + impl RcBatchCursor { fn new(cursor: C) -> Self { RcBatchCursor { diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index 72c488fbd..b591bafd3 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -154,6 +154,21 @@ pub struct CursorEnter { cursor: C, } +use crate::trace::implementations::{Layout, LaidOut}; +impl LaidOut for CursorEnter +where + C: Cursor, + TInner: Refines+Lattice, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl CursorEnter { fn new(cursor: C) -> Self { CursorEnter { @@ -224,6 +239,20 @@ impl BatchCursorEnter { } } +impl LaidOut for BatchCursorEnter +where + C: Cursor, + TInner: Refines+Lattice, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl Cursor for BatchCursorEnter where TInner: Refines+Lattice, diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index 6f9d51e22..e5ce2f0a4 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -179,6 +179,21 @@ pub struct CursorEnter { logic: F, } +use crate::trace::implementations::{Layout, LaidOut}; +impl LaidOut for CursorEnter +where + C: Cursor, + TInner: Refines+Lattice, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl CursorEnter { fn new(cursor: C, logic: F) -> Self { CursorEnter { @@ -246,6 +261,20 @@ pub struct BatchCursorEnter { logic: F, } +impl LaidOut for BatchCursorEnter +where + C: Cursor, + TInner: Refines+Lattice, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl BatchCursorEnter { fn new(cursor: C, logic: F) -> Self { BatchCursorEnter { diff --git a/differential-dataflow/src/trace/wrappers/filter.rs b/differential-dataflow/src/trace/wrappers/filter.rs index 617dafc7b..2181b1fb5 100644 --- a/differential-dataflow/src/trace/wrappers/filter.rs +++ b/differential-dataflow/src/trace/wrappers/filter.rs @@ -112,6 +112,11 @@ pub struct CursorFilter { logic: F, } +use crate::trace::implementations::LaidOut; +impl LaidOut for CursorFilter { + type Layout = C::Layout; +} + impl CursorFilter { fn new(cursor: C, logic: F) -> Self { CursorFilter { @@ -175,6 +180,10 @@ pub struct BatchCursorFilter { logic: F, } +impl LaidOut for BatchCursorFilter { + type Layout = C::Layout; +} + impl BatchCursorFilter { fn new(cursor: C, logic: F) -> Self { BatchCursorFilter { diff --git a/differential-dataflow/src/trace/wrappers/freeze.rs b/differential-dataflow/src/trace/wrappers/freeze.rs index aed9c781e..e680d5c24 100644 --- a/differential-dataflow/src/trace/wrappers/freeze.rs +++ b/differential-dataflow/src/trace/wrappers/freeze.rs @@ -170,6 +170,17 @@ pub struct CursorFreeze { func: Rc, } +use crate::trace::implementations::{Layout, LaidOut}; +impl LaidOut for CursorFreeze { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl CursorFreeze { fn new(cursor: C, func: Rc) -> Self { Self { cursor, func } @@ -229,6 +240,16 @@ pub struct BatchCursorFreeze { func: Rc, } +impl LaidOut for BatchCursorFreeze { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl BatchCursorFreeze { fn new(cursor: C, func: Rc) -> Self { Self { cursor, func } diff --git a/differential-dataflow/src/trace/wrappers/frontier.rs b/differential-dataflow/src/trace/wrappers/frontier.rs index f69dc1f59..97c6ca795 100644 --- a/differential-dataflow/src/trace/wrappers/frontier.rs +++ b/differential-dataflow/src/trace/wrappers/frontier.rs @@ -117,6 +117,17 @@ pub struct CursorFrontier { until: Antichain } +use crate::trace::implementations::{Layout, LaidOut}; +impl LaidOut for CursorFrontier { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl CursorFrontier { fn new(cursor: C, since: AntichainRef, until: AntichainRef) -> Self { CursorFrontier { @@ -183,6 +194,16 @@ pub struct BatchCursorFrontier { until: Antichain, } +impl LaidOut for BatchCursorFrontier { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl BatchCursorFrontier { fn new(cursor: C, since: AntichainRef, until: AntichainRef) -> Self { BatchCursorFrontier { From 36176eb18cdec7eaea3072829ec437e743dfe985 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 15 Jul 2025 10:47:25 -0400 Subject: [PATCH 2/6] Remove Cursor associated types --- .../src/trace/cursor/cursor_list.rs | 10 ---- differential-dataflow/src/trace/cursor/mod.rs | 25 --------- .../src/trace/implementations/mod.rs | 53 ++++++++++--------- .../src/trace/implementations/ord_neu.rs | 29 ++-------- .../src/trace/implementations/rhh.rs | 11 ---- differential-dataflow/src/trace/mod.rs | 13 +---- .../src/trace/wrappers/enter.rs | 22 -------- .../src/trace/wrappers/enter_at.rs | 25 +-------- .../src/trace/wrappers/filter.rs | 22 -------- .../src/trace/wrappers/freeze.rs | 22 -------- .../src/trace/wrappers/frontier.rs | 20 ------- 11 files changed, 37 insertions(+), 215 deletions(-) diff --git a/differential-dataflow/src/trace/cursor/cursor_list.rs b/differential-dataflow/src/trace/cursor/cursor_list.rs index 334781911..94fbda7cb 100644 --- a/differential-dataflow/src/trace/cursor/cursor_list.rs +++ b/differential-dataflow/src/trace/cursor/cursor_list.rs @@ -100,16 +100,6 @@ impl LaidOut for CursorList { } impl Cursor for CursorList { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = C::TimeGat<'a>; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } type Storage = Vec; diff --git a/differential-dataflow/src/trace/cursor/mod.rs b/differential-dataflow/src/trace/cursor/mod.rs index 629ba78ae..d0d973249 100644 --- a/differential-dataflow/src/trace/cursor/mod.rs +++ b/differential-dataflow/src/trace/cursor/mod.rs @@ -5,11 +5,6 @@ //! both because it allows navigation on multiple levels (key and val), but also because it //! supports efficient seeking (via the `seek_key` and `seek_val` methods). -use timely::progress::Timestamp; - -use crate::difference::Semigroup; -use crate::lattice::Lattice; - pub mod cursor_list; pub use self::cursor_list::CursorList; @@ -19,26 +14,6 @@ use crate::trace::implementations::LayoutExt; /// A cursor for navigating ordered `(key, val, time, diff)` updates. pub trait Cursor : LayoutExt { - /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord; - /// Values associated with keys. - type Val<'a>: Copy + Clone + Ord; - /// Timestamps associated with updates - type Time: Timestamp + Lattice + Ord + Clone; - /// Borrowed form of timestamp. - type TimeGat<'a>: Copy; - /// Owned form of update difference. - type Diff: Semigroup + 'static; - /// Borrowed form of update difference. - type DiffGat<'a> : Copy; - - /// An owned copy of a reference to a time. - fn owned_time(time: Self::TimeGat<'_>) -> Self::Time; - /// Clones a reference time onto an owned time. - fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time); - /// An owned copy of a reference to a diff. - fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff; - /// Storage required by the cursor. type Storage; diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index e7c1ab6df..e6bc2864e 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -116,58 +116,63 @@ pub trait LayoutExt : LaidOut: Copy + Ord; + type Key<'a>: Copy + Ord; /// Alias for an owned val of a layout. type ValOwn; /// Alias for an borrowed val of a layout. - type ValRef<'a>: Copy + Ord; + type Val<'a>: Copy + Ord; /// Alias for an owned time of a layout. - type TimeOwn: Lattice + timely::progress::Timestamp; + type Time: Lattice + timely::progress::Timestamp; /// Alias for an borrowed time of a layout. - type TimeRef<'a>: Copy + Ord; + type TimeGat<'a>: Copy + Ord; /// Alias for an owned diff of a layout. - type DiffOwn: Semigroup; + type Diff: Semigroup; /// Alias for an borrowed diff of a layout. - type DiffRef<'a>: Copy + Ord; + type DiffGat<'a>: Copy + Ord; /// Container for update keys. - type KeyContainer: for<'a> BatchContainer = Self::KeyRef<'a>, Owned = Self::KeyOwn>; + type KeyContainer: for<'a> BatchContainer = Self::Key<'a>, Owned = Self::KeyOwn>; /// Container for update vals. - type ValContainer: for<'a> BatchContainer = Self::ValRef<'a>, Owned = Self::ValOwn>; + type ValContainer: for<'a> BatchContainer = Self::Val<'a>, Owned = Self::ValOwn>; /// Container for times. - type TimeContainer: for<'a> BatchContainer = Self::TimeRef<'a>, Owned = Self::TimeOwn>; + type TimeContainer: for<'a> BatchContainer = Self::TimeGat<'a>, Owned = Self::Time>; /// Container for diffs. - type DiffContainer: for<'a> BatchContainer = Self::DiffRef<'a>, Owned = Self::DiffOwn>; + type DiffContainer: for<'a> BatchContainer = Self::DiffGat<'a>, Owned = Self::Diff>; /// Construct an owned key from a reference. - fn owned_key(key: Self::KeyRef<'_>) -> Self::KeyOwn; + fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn; /// Construct an owned val from a reference. - fn owned_val(val: Self::ValRef<'_>) -> Self::ValOwn; + fn owned_val(val: Self::Val<'_>) -> Self::ValOwn; /// Construct an owned time from a reference. - fn owned_time2(time: Self::TimeRef<'_>) -> Self::TimeOwn; + fn owned_time(time: Self::TimeGat<'_>) -> Self::Time; /// Construct an owned diff from a reference. - fn owned_diff2(diff: Self::DiffRef<'_>) -> Self::DiffOwn; + fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff; + + /// Clones a reference time onto an owned time. + fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time); } impl LayoutExt for L { type KeyOwn = <::KeyContainer as BatchContainer>::Owned; - type KeyRef<'a> = <::KeyContainer as BatchContainer>::ReadItem<'a>; + type Key<'a> = <::KeyContainer as BatchContainer>::ReadItem<'a>; type ValOwn = <::ValContainer as BatchContainer>::Owned; - type ValRef<'a> = <::ValContainer as BatchContainer>::ReadItem<'a>; - type TimeOwn = <::TimeContainer as BatchContainer>::Owned; - type TimeRef<'a> = <::TimeContainer as BatchContainer>::ReadItem<'a>; - type DiffOwn = <::DiffContainer as BatchContainer>::Owned; - type DiffRef<'a> = <::DiffContainer as BatchContainer>::ReadItem<'a>; + type Val<'a> = <::ValContainer as BatchContainer>::ReadItem<'a>; + type Time = <::TimeContainer as BatchContainer>::Owned; + type TimeGat<'a> = <::TimeContainer as BatchContainer>::ReadItem<'a>; + type Diff = <::DiffContainer as BatchContainer>::Owned; + type DiffGat<'a> = <::DiffContainer as BatchContainer>::ReadItem<'a>; type KeyContainer = ::KeyContainer; type ValContainer = ::ValContainer; type TimeContainer = ::TimeContainer; type DiffContainer = ::DiffContainer; - #[inline(always)] fn owned_key(key: Self::KeyRef<'_>) -> Self::KeyOwn { ::KeyContainer::into_owned(key) } - #[inline(always)] fn owned_val(val: Self::ValRef<'_>) -> Self::ValOwn { ::ValContainer::into_owned(val) } - #[inline(always)] fn owned_time2(time: Self::TimeRef<'_>) -> Self::TimeOwn { ::TimeContainer::into_owned(time) } - #[inline(always)] fn owned_diff2(diff: Self::DiffRef<'_>) -> Self::DiffOwn { ::DiffContainer::into_owned(diff) } + #[inline(always)] fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn { ::KeyContainer::into_owned(key) } + #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { ::ValContainer::into_owned(val) } + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { ::TimeContainer::into_owned(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { ::DiffContainer::into_owned(diff) } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { ::TimeContainer::clone_onto(time, onto) } + } // An easy way to provide an explicit layout: as a 5-tuple. diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 4c83fc43e..6df3b8e05 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -601,17 +601,6 @@ pub mod val_batch { impl Cursor for OrdValCursor { - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = layout::ValRef<'a, L>; - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { L::TimeContainer::into_owned(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { L::TimeContainer::clone_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { L::DiffContainer::into_owned(diff) } - type Storage = OrdValBatch; fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { storage.storage.keys.get(self.key_cursor) } @@ -807,7 +796,7 @@ pub mod key_batch { pub updates: usize, } - impl BatchReader for OrdKeyBatch { + impl Layout = &'a ()>>> BatchReader for OrdKeyBatch { type Key<'a> = layout::KeyRef<'a, L>; type Val<'a> = &'a (); @@ -832,7 +821,7 @@ pub mod key_batch { fn description(&self) -> &Description> { &self.description } } - impl Batch for OrdKeyBatch { + impl Layout = &'a ()>>> Batch for OrdKeyBatch { type Merger = OrdKeyMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef>) -> Self::Merger { @@ -1000,21 +989,11 @@ pub mod key_batch { } use crate::trace::implementations::LaidOut; - impl LaidOut for OrdKeyCursor { + impl Layout = &'a ()>>> LaidOut for OrdKeyCursor { type Layout = L; } - impl Cursor for OrdKeyCursor { - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = &'a (); - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { L::TimeContainer::into_owned(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { L::TimeContainer::clone_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { L::DiffContainer::into_owned(diff) } + impl Layout = &'a ()>>> Cursor for OrdKeyCursor { type Storage = OrdKeyBatch; diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index 03456e648..d722d0aa5 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -664,17 +664,6 @@ mod val_batch { layout::Key: Default + HashOrdered, for<'a> layout::KeyRef<'a, L>: HashOrdered, { - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = layout::ValRef<'a, L>; - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { L::TimeContainer::into_owned(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { L::TimeContainer::clone_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { L::DiffContainer::into_owned(diff) } - type Storage = RhhValBatch; fn get_key<'a>(&self, storage: &'a RhhValBatch) -> Option> { storage.storage.keys.get(self.key_cursor) } diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index b6fd65d41..0bdb89cb5 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -21,6 +21,8 @@ use crate::lattice::Lattice; pub use self::cursor::Cursor; pub use self::description::Description; +use crate::trace::implementations::LayoutExt; + /// A type used to express how much effort a trace should exert even in the absence of updates. pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize)])->Option+Send+Sync>; @@ -398,17 +400,6 @@ pub mod rc_blanket_impls { impl Cursor for RcBatchCursor { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = C::TimeGat<'a>; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = Rc; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index b591bafd3..d0cc74b03 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -183,17 +183,6 @@ where C: Cursor, TInner: Refines+Lattice, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -257,17 +246,6 @@ impl Cursor for BatchCursorEnter where TInner: Refines+Lattice, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index e5ce2f0a4..65752beab 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -131,11 +131,12 @@ pub struct BatchEnter { logic: F, } +use crate::trace::implementations::LayoutExt; impl BatchReader for BatchEnter where B: BatchReader, TInner: Refines+Lattice, - F: FnMut(B::Key<'_>, ::Val<'_>, B::TimeGat<'_>)->TInner+Clone, + F: FnMut(B::Key<'_>, ::Val<'_>, B::TimeGat<'_>)->TInner+Clone, { type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; @@ -210,17 +211,6 @@ where TInner: Refines+Lattice, F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -290,17 +280,6 @@ where TInner: Refines+Lattice, F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/filter.rs b/differential-dataflow/src/trace/wrappers/filter.rs index 2181b1fb5..f6b6dc381 100644 --- a/differential-dataflow/src/trace/wrappers/filter.rs +++ b/differential-dataflow/src/trace/wrappers/filter.rs @@ -131,17 +131,6 @@ where C: Cursor, F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = C::TimeGat<'a>; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -197,17 +186,6 @@ impl Cursor for BatchCursorFilter where F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = C::TimeGat<'a>; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = BatchFilter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/freeze.rs b/differential-dataflow/src/trace/wrappers/freeze.rs index e680d5c24..72f6f0048 100644 --- a/differential-dataflow/src/trace/wrappers/freeze.rs +++ b/differential-dataflow/src/trace/wrappers/freeze.rs @@ -192,17 +192,6 @@ where C: Cursor, F: Fn(C::TimeGat<'_>)->Option, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = &'a C::Time; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -261,17 +250,6 @@ impl Cursor for BatchCursorFreeze where F: Fn(C::TimeGat<'_>)->Option, { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = &'a C::Time; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } - type Storage = BatchFreeze; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/frontier.rs b/differential-dataflow/src/trace/wrappers/frontier.rs index 97c6ca795..3dd4b4c48 100644 --- a/differential-dataflow/src/trace/wrappers/frontier.rs +++ b/differential-dataflow/src/trace/wrappers/frontier.rs @@ -139,16 +139,6 @@ impl CursorFrontier { } impl Cursor for CursorFrontier { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = &'a C::Time; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } type Storage = C::Storage; @@ -215,16 +205,6 @@ impl BatchCursorFrontier { } impl> Cursor for BatchCursorFrontier { - type Key<'a> = C::Key<'a>; - type Val<'a> = C::Val<'a>; - type Time = C::Time; - type TimeGat<'a> = &'a C::Time; - type Diff = C::Diff; - type DiffGat<'a> = C::DiffGat<'a>; - - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } - #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } type Storage = BatchFrontier; From af596dce93c5d6e3190fcae95a7a8eb65b6c92ef Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 15 Jul 2025 14:59:59 -0400 Subject: [PATCH 3/6] Non-working moment --- .../src/operators/arrange/agent.rs | 11 +- .../src/trace/implementations/mod.rs | 4 +- .../src/trace/implementations/ord_neu.rs | 21 ++-- .../src/trace/implementations/rhh.rs | 15 +-- .../src/trace/implementations/spine_fueled.rs | 17 ++- differential-dataflow/src/trace/mod.rs | 108 ++++++++++-------- .../src/trace/wrappers/enter.rs | 42 ++++--- .../src/trace/wrappers/enter_at.rs | 44 ++++--- .../src/trace/wrappers/filter.rs | 22 ++-- .../src/trace/wrappers/freeze.rs | 42 ++++--- .../src/trace/wrappers/frontier.rs | 32 ++++-- .../src/trace/wrappers/rc.rs | 11 +- 12 files changed, 212 insertions(+), 157 deletions(-) diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 336fe6c81..a35815860 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -36,13 +36,12 @@ pub struct TraceAgent { logging: Option, } +use crate::trace::implementations::LaidOut; +impl LaidOut for TraceAgent { + type Layout = Tr::Layout; +} + impl TraceReader for TraceAgent { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = Tr::Time; - type TimeGat<'a> = Tr::TimeGat<'a>; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = Tr::Batch; type Storage = Tr::Storage; diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index e6bc2864e..f6ea686ac 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -100,7 +100,7 @@ pub trait Layout { /// Container for times. type TimeContainer: BatchContainer; /// Container for diffs. - type DiffContainer: BatchContainer; + type DiffContainer: BatchContainer; /// Container for offsets. type OffsetContainer: for<'a> BatchContainer = usize>; } @@ -126,7 +126,7 @@ pub trait LayoutExt : LaidOut: Copy + Ord; /// Alias for an owned diff of a layout. - type Diff: Semigroup; + type Diff: Semigroup + 'static; /// Alias for an borrowed diff of a layout. type DiffGat<'a>: Copy + Ord; diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 6df3b8e05..69d08bc67 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -322,13 +322,11 @@ pub mod val_batch { pub updates: usize, } + impl LaidOut for OrdValBatch { + type Layout = L; + } + impl BatchReader for OrdValBatch { - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = layout::ValRef<'a, L>; - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { @@ -796,14 +794,11 @@ pub mod key_batch { pub updates: usize, } - impl Layout = &'a ()>>> BatchReader for OrdKeyBatch { + impl Layout = &'a ()>>> LaidOut for OrdKeyBatch { + type Layout = L; + } - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = &'a (); - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; + impl Layout = &'a ()>>> BatchReader for OrdKeyBatch { type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index d722d0aa5..75de4486e 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -273,18 +273,19 @@ mod val_batch { pub updates: usize, } - impl BatchReader for RhhValBatch + impl LaidOut for RhhValBatch where layout::Key: Default + HashOrdered, for<'a> layout::KeyRef<'a, L>: HashOrdered, { - type Key<'a> = layout::KeyRef<'a, L>; - type Val<'a> = layout::ValRef<'a, L>; - type Time = layout::Time; - type TimeGat<'a> = layout::TimeRef<'a, L>; - type Diff = layout::Diff; - type DiffGat<'a> = layout::DiffRef<'a, L>; + type Layout = L; + } + impl BatchReader for RhhValBatch + where + layout::Key: Default + HashOrdered, + for<'a> layout::KeyRef<'a, L>: HashOrdered, + { type Cursor = RhhValCursor; fn cursor(&self) -> Self::Cursor { let mut cursor = RhhValCursor { diff --git a/differential-dataflow/src/trace/implementations/spine_fueled.rs b/differential-dataflow/src/trace/implementations/spine_fueled.rs index ea83f1f7c..ca78386b3 100644 --- a/differential-dataflow/src/trace/implementations/spine_fueled.rs +++ b/differential-dataflow/src/trace/implementations/spine_fueled.rs @@ -99,13 +99,18 @@ pub struct Spine { exert_logic: Option, } +use crate::trace::LaidOut; +impl LaidOut for Spine { + type Layout = B::Layout; +} + impl TraceReader for Spine { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = B::Time; - type TimeGat<'a> = B::TimeGat<'a>; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; + // type Key<'a> = B::Key<'a>; + // type Val<'a> = B::Val<'a>; + // type Time = B::Time; + // type TimeGat<'a> = B::TimeGat<'a>; + // type Diff = B::Diff; + // type DiffGat<'a> = B::DiffGat<'a>; type Batch = B; type Storage = Vec; diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 0bdb89cb5..769500e8b 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -16,8 +16,6 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; use crate::logging::Logger; -use crate::difference::Semigroup; -use crate::lattice::Lattice; pub use self::cursor::Cursor; pub use self::description::Description; @@ -46,34 +44,48 @@ pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize /// This is a restricted interface to the more general `Trace` trait, which extends this trait with further methods /// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's /// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen). -pub trait TraceReader { - - /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord; - /// Values associated with keys. - type Val<'a>: Copy + Clone; - /// Timestamps associated with updates - type Time: Timestamp + Lattice + Ord + Clone; - /// Borrowed form of timestamp. - type TimeGat<'a>: Copy; - /// Owned form of update difference. - type Diff: Semigroup + 'static; - /// Borrowed form of update difference. - type DiffGat<'a> : Copy; - - /// An owned copy of a reference to a time. - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { Self::Cursor::owned_time(time) } - /// An owned copy of a reference to a diff. - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { Self::Cursor::owned_diff(diff) } +pub trait TraceReader : LayoutExt { /// The type of an immutable collection of updates. - type Batch: for<'a> BatchReader = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>+Clone+'static; + type Batch: + 'static + + Clone + + BatchReader + + LaidOut + + for<'a> LayoutExt< + Key<'a> = Self::Key<'a>, + Val<'a> = Self::Val<'a>, + Time = Self::Time, + TimeGat<'a> = Self::TimeGat<'a>, + Diff = Self::Diff, + DiffGat<'a> = Self::DiffGat<'a>, + KeyContainer = Self::KeyContainer, + ValContainer = Self::ValContainer, + TimeContainer = Self::TimeContainer, + DiffContainer = Self::DiffContainer, + >; + /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`. type Storage; /// The type used to enumerate the collections contents. - type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>; + type Cursor: + Cursor + + LaidOut + + for<'a> LayoutExt< + Key<'a> = Self::Key<'a>, + Val<'a> = Self::Val<'a>, + Time = Self::Time, + TimeGat<'a> = Self::TimeGat<'a>, + Diff = Self::Diff, + DiffGat<'a> = Self::DiffGat<'a>, + KeyContainer = Self::KeyContainer, + ValContainer = Self::ValContainer, + TimeContainer = Self::TimeContainer, + DiffContainer = Self::DiffContainer, + >; + /// Provides a cursor over updates contained in the trace. fn cursor(&mut self) -> (Self::Cursor, Self::Storage) { @@ -221,34 +233,34 @@ pub trait Trace : TraceReader { fn close(&mut self); } +use crate::trace::implementations::LaidOut; + /// A batch of updates whose contents may be read. /// /// This is a restricted interface to batches of updates, which support the reading of the batch's contents, /// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is /// especially useful for views derived from other sources in ways that prevent the construction of batches /// from the type of data in the view (for example, filtered views, or views with extended time coordinates). -pub trait BatchReader : Sized { - /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + Ord; - /// Values associated with keys. - type Val<'a>: Copy + Clone; - /// Timestamps associated with updates - type Time: Timestamp + Lattice + Ord + Clone; - /// Borrowed form of timestamp. - type TimeGat<'a>: Copy; - /// Owned form of update difference. - type Diff: Semigroup + 'static; - /// Borrowed form of update difference. - type DiffGat<'a> : Copy; - - /// An owned copy of a reference to a time. - #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { Self::Cursor::owned_time(time) } - /// An owned copy of a reference to a diff. - #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { Self::Cursor::owned_diff(diff) } +pub trait BatchReader : LayoutExt + Sized { /// The type used to enumerate the batch's contents. - type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>; - /// Acquires a cursor to the batch's contents. + type Cursor: + Cursor + + LaidOut + + for<'a> LayoutExt< + Key<'a> = Self::Key<'a>, + Val<'a> = Self::Val<'a>, + Time = Self::Time, + TimeGat<'a> = Self::TimeGat<'a>, + Diff = Self::Diff, + DiffGat<'a> = Self::DiffGat<'a>, + KeyContainer = Self::KeyContainer, + ValContainer = Self::ValContainer, + TimeContainer = Self::TimeContainer, + DiffContainer = Self::DiffContainer, + >; + + /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor; /// The number of updates in the batch. fn len(&self) -> usize; @@ -359,13 +371,11 @@ pub mod rc_blanket_impls { use timely::progress::{Antichain, frontier::AntichainRef}; use super::{Batch, BatchReader, Builder, Merger, Cursor, Description}; + impl LaidOut for Rc { + type Layout = B::Layout; + } + impl BatchReader for Rc { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = B::Time; - type TimeGat<'a> = B::TimeGat<'a>; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; /// The type used to enumerate the batch's contents. type Cursor = RcBatchCursor; diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index d0cc74b03..dedcd9400 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -25,18 +25,25 @@ impl Clone for TraceEnter { } } -impl TraceReader for TraceEnter +impl LaidOut for TraceEnter where Tr: TraceReader, TInner: Refines+Lattice, { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} +impl TraceReader for TraceEnter +where + Tr: TraceReader, + TInner: Refines+Lattice, +{ type Batch = BatchEnter; type Storage = Tr::Storage; type Cursor = CursorEnter; @@ -109,18 +116,25 @@ pub struct BatchEnter { description: Description, } -impl BatchReader for BatchEnter +impl LaidOut for BatchEnter where B: BatchReader, TInner: Refines+Lattice, { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} +impl BatchReader for BatchEnter +where + B: BatchReader, + TInner: Refines+Lattice, +{ type Cursor = BatchCursorEnter; fn cursor(&self) -> Self::Cursor { diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index 65752beab..844a8858c 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -40,6 +40,22 @@ where } } +impl LaidOut for TraceEnter +where + Tr: TraceReader, + TInner: Refines+Lattice, + F: Clone, + G: Clone, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl TraceReader for TraceEnter where Tr: TraceReader, @@ -48,13 +64,6 @@ where F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone, G: FnMut(&TInner)->Tr::Time+Clone+'static, { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; - type Batch = BatchEnter; type Storage = Tr::Storage; type Cursor = CursorEnter; @@ -131,6 +140,20 @@ pub struct BatchEnter { logic: F, } +impl LaidOut for BatchEnter +where + B: BatchReader, + TInner: Refines+Lattice, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + use crate::trace::implementations::LayoutExt; impl BatchReader for BatchEnter where @@ -138,13 +161,6 @@ where TInner: Refines+Lattice, F: FnMut(B::Key<'_>, ::Val<'_>, B::TimeGat<'_>)->TInner+Clone, { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = TInner; - type TimeGat<'a> = &'a TInner; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; - type Cursor = BatchCursorEnter; fn cursor(&self) -> Self::Cursor { diff --git a/differential-dataflow/src/trace/wrappers/filter.rs b/differential-dataflow/src/trace/wrappers/filter.rs index f6b6dc381..ef1e7c034 100644 --- a/differential-dataflow/src/trace/wrappers/filter.rs +++ b/differential-dataflow/src/trace/wrappers/filter.rs @@ -24,18 +24,15 @@ where } } +impl LaidOut for TraceFilter { + type Layout = Tr::Layout; +} + impl TraceReader for TraceFilter where Tr: TraceReader, F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static, { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = Tr::Time; - type TimeGat<'a> = Tr::TimeGat<'a>; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; - type Batch = BatchFilter; type Storage = Tr::Storage; type Cursor = CursorFilter; @@ -75,18 +72,15 @@ pub struct BatchFilter { logic: F, } +impl LaidOut for BatchFilter { + type Layout = B::Layout; +} + impl BatchReader for BatchFilter where B: BatchReader, F: FnMut(B::Key<'_>, B::Val<'_>)->bool+Clone+'static { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = B::Time; - type TimeGat<'a> = B::TimeGat<'a>; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; - type Cursor = BatchCursorFilter; fn cursor(&self) -> Self::Cursor { diff --git a/differential-dataflow/src/trace/wrappers/freeze.rs b/differential-dataflow/src/trace/wrappers/freeze.rs index 72f6f0048..1ce3f0889 100644 --- a/differential-dataflow/src/trace/wrappers/freeze.rs +++ b/differential-dataflow/src/trace/wrappers/freeze.rs @@ -69,18 +69,25 @@ where } } +impl LaidOut for TraceFreeze +where + Tr: TraceReader, + F: Fn(Tr::TimeGat<'_>)->Option, +{ + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl TraceReader for TraceFreeze where Tr: TraceReader, F: Fn(Tr::TimeGat<'_>)->Option+'static, { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = Tr::Time; - type TimeGat<'a> = &'a Tr::Time; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; - type Batch = BatchFreeze; type Storage = Tr::Storage; type Cursor = CursorFreeze; @@ -132,18 +139,25 @@ impl Clone for BatchFreeze { } } -impl BatchReader for BatchFreeze +impl LaidOut for BatchFreeze where B: BatchReader, F: Fn(B::TimeGat<'_>)->Option, { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = B::Time; - type TimeGat<'a> = &'a B::Time; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} +impl BatchReader for BatchFreeze +where + B: BatchReader, + F: Fn(B::TimeGat<'_>)->Option, +{ type Cursor = BatchCursorFreeze; fn cursor(&self) -> Self::Cursor { diff --git a/differential-dataflow/src/trace/wrappers/frontier.rs b/differential-dataflow/src/trace/wrappers/frontier.rs index 3dd4b4c48..b22fb5b90 100644 --- a/differential-dataflow/src/trace/wrappers/frontier.rs +++ b/differential-dataflow/src/trace/wrappers/frontier.rs @@ -31,13 +31,17 @@ impl Clone for TraceFrontier { } } +impl LaidOut for TraceFrontier { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl TraceReader for TraceFrontier { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = Tr::Time; - type TimeGat<'a> = &'a Tr::Time; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = BatchFrontier; type Storage = Tr::Storage; @@ -82,13 +86,17 @@ pub struct BatchFrontier { until: Antichain, } +impl LaidOut for BatchFrontier { + type Layout = ( + ::KeyContainer, + ::ValContainer, + Vec, + ::DiffContainer, + ::OffsetContainer, + ); +} + impl BatchReader for BatchFrontier { - type Key<'a> = B::Key<'a>; - type Val<'a> = B::Val<'a>; - type Time = B::Time; - type TimeGat<'a> = &'a B::Time; - type Diff = B::Diff; - type DiffGat<'a> = B::DiffGat<'a>; type Cursor = BatchCursorFrontier; diff --git a/differential-dataflow/src/trace/wrappers/rc.rs b/differential-dataflow/src/trace/wrappers/rc.rs index 4a877031a..bc2fa2064 100644 --- a/differential-dataflow/src/trace/wrappers/rc.rs +++ b/differential-dataflow/src/trace/wrappers/rc.rs @@ -78,13 +78,12 @@ pub struct TraceRc { pub wrapper: Rc>>, } +use crate::trace::LaidOut; +impl LaidOut for TraceRc { + type Layout = Tr::Layout; +} + impl TraceReader for TraceRc { - type Key<'a> = Tr::Key<'a>; - type Val<'a> = Tr::Val<'a>; - type Time = Tr::Time; - type TimeGat<'a> = Tr::TimeGat<'a>; - type Diff = Tr::Diff; - type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = Tr::Batch; type Storage = Tr::Storage; From b73cc96353ee7eabee0f0499603dabb4d384820f Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 15 Jul 2025 17:44:01 -0400 Subject: [PATCH 4/6] Remove into_owned.rs --- differential-dataflow/src/into_owned.rs | 204 ------------------ differential-dataflow/src/lib.rs | 2 - .../src/operators/arrange/arrangement.rs | 12 +- .../src/operators/arrange/upsert.rs | 18 +- differential-dataflow/src/operators/reduce.rs | 37 ++-- .../src/trace/implementations/mod.rs | 4 +- differential-dataflow/src/trace/mod.rs | 6 + dogsdogsdogs/src/operators/count.rs | 7 +- dogsdogsdogs/src/operators/half_join.rs | 23 +- dogsdogsdogs/src/operators/lookup_map.rs | 12 +- dogsdogsdogs/src/operators/propose.rs | 15 +- dogsdogsdogs/src/operators/validate.rs | 4 +- 12 files changed, 69 insertions(+), 275 deletions(-) delete mode 100644 differential-dataflow/src/into_owned.rs diff --git a/differential-dataflow/src/into_owned.rs b/differential-dataflow/src/into_owned.rs deleted file mode 100644 index a4a91f49c..000000000 --- a/differential-dataflow/src/into_owned.rs +++ /dev/null @@ -1,204 +0,0 @@ -//! Traits for converting between owned and borrowed types. - -/// A reference type corresponding to an owned type, supporting conversion in each direction. -/// -/// This trait can be implemented by a GAT, and enables owned types to be borrowed as a GAT. -/// This trait is analogous to `ToOwned`, but not as prescriptive. Specifically, it avoids the -/// requirement that the other trait implement `Borrow`, for which a borrow must result in a -/// `&'self Borrowed`, which cannot move the lifetime into a GAT borrowed type. -pub trait IntoOwned<'a> { - /// Owned type into which this type can be converted. - type Owned; - /// Conversion from an instance of this type to the owned type. - #[must_use] - fn into_owned(self) -> Self::Owned; - /// Clones `self` onto an existing instance of the owned type. - fn clone_onto(self, other: &mut Self::Owned); - /// Borrows an owned instance as oneself. - #[must_use] - fn borrow_as(owned: &'a Self::Owned) -> Self; -} - -impl<'a, T: ToOwned + ?Sized> IntoOwned<'a> for &'a T { - type Owned = T::Owned; - #[inline] - fn into_owned(self) -> Self::Owned { - self.to_owned() - } - #[inline] - fn clone_onto(self, other: &mut Self::Owned) { - ::clone_into(self, other) - } - #[inline] - fn borrow_as(owned: &'a Self::Owned) -> Self { - std::borrow::Borrow::borrow(owned) - } -} - -impl<'a, T, E> IntoOwned<'a> for Result -where - T: IntoOwned<'a>, - E: IntoOwned<'a>, -{ - type Owned = Result; - - #[inline] - fn into_owned(self) -> Self::Owned { - self.map(T::into_owned).map_err(E::into_owned) - } - - #[inline] - fn clone_onto(self, other: &mut Self::Owned) { - match (self, other) { - (Ok(item), Ok(target)) => T::clone_onto(item, target), - (Err(item), Err(target)) => E::clone_onto(item, target), - (Ok(item), target) => *target = Ok(T::into_owned(item)), - (Err(item), target) => *target = Err(E::into_owned(item)), - } - } - - #[inline] - fn borrow_as(owned: &'a Self::Owned) -> Self { - owned.as_ref().map(T::borrow_as).map_err(E::borrow_as) - } -} - -impl<'a, T> IntoOwned<'a> for Option -where - T: IntoOwned<'a>, -{ - type Owned = Option; - - #[inline] - fn into_owned(self) -> Self::Owned { - self.map(IntoOwned::into_owned) - } - - #[inline] - fn clone_onto(self, other: &mut Self::Owned) { - match (self, other) { - (Some(item), Some(target)) => T::clone_onto(item, target), - (Some(item), target) => *target = Some(T::into_owned(item)), - (None, target) => *target = None, - } - } - - #[inline] - fn borrow_as(owned: &'a Self::Owned) -> Self { - owned.as_ref().map(T::borrow_as) - } -} - -mod tuple { - use paste::paste; - - macro_rules! tuple { - ($($name:ident)+) => (paste! { - - #[allow(non_camel_case_types)] - #[allow(non_snake_case)] - impl<'a, $($name),*> crate::IntoOwned<'a> for ($($name,)*) - where - $($name: crate::IntoOwned<'a>),* - { - type Owned = ($($name::Owned,)*); - - #[inline] - fn into_owned(self) -> Self::Owned { - let ($($name,)*) = self; - ( - $($name.into_owned(),)* - ) - } - - #[inline] - fn clone_onto(self, other: &mut Self::Owned) { - let ($($name,)*) = self; - let ($([<$name _other>],)*) = other; - $($name.clone_onto([<$name _other>]);)* - } - - #[inline] - fn borrow_as(owned: &'a Self::Owned) -> Self { - let ($($name,)*) = owned; - ( - $($name::borrow_as($name),)* - ) - } - } - }) - } - - tuple!(A); - tuple!(A B); - tuple!(A B C); - tuple!(A B C D); - tuple!(A B C D E); - tuple!(A B C D E F); - tuple!(A B C D E F G); - tuple!(A B C D E F G H); - tuple!(A B C D E F G H I); - tuple!(A B C D E F G H I J); - tuple!(A B C D E F G H I J K); - tuple!(A B C D E F G H I J K L); - tuple!(A B C D E F G H I J K L M); - tuple!(A B C D E F G H I J K L M N); - tuple!(A B C D E F G H I J K L M N O); - tuple!(A B C D E F G H I J K L M N O P); -} - -mod primitive { - macro_rules! implement_for { - ($index_type:ty) => { - impl<'a> crate::IntoOwned<'a> for $index_type { - type Owned = $index_type; - - #[inline] - fn into_owned(self) -> Self::Owned { - self - } - - #[inline] - fn clone_onto(self, other: &mut Self::Owned) { - *other = self; - } - - #[inline] - fn borrow_as(owned: &'a Self::Owned) -> Self { - *owned - } - } - }; -} - - implement_for!(()); - implement_for!(bool); - implement_for!(char); - - implement_for!(u8); - implement_for!(u16); - implement_for!(u32); - implement_for!(u64); - implement_for!(u128); - implement_for!(usize); - - implement_for!(i8); - implement_for!(i16); - implement_for!(i32); - implement_for!(i64); - implement_for!(i128); - implement_for!(isize); - - implement_for!(f32); - implement_for!(f64); - - implement_for!(std::num::Wrapping); - implement_for!(std::num::Wrapping); - implement_for!(std::num::Wrapping); - implement_for!(std::num::Wrapping); - implement_for!(std::num::Wrapping); - implement_for!(std::num::Wrapping); - - implement_for!(std::time::Duration); - -} \ No newline at end of file diff --git a/differential-dataflow/src/lib.rs b/differential-dataflow/src/lib.rs index c67b356cf..1431634e4 100644 --- a/differential-dataflow/src/lib.rs +++ b/differential-dataflow/src/lib.rs @@ -79,7 +79,6 @@ use std::fmt::Debug; pub use collection::{Collection, AsCollection}; pub use hashable::Hashable; pub use difference::Abelian as Diff; -pub use into_owned::IntoOwned; /// Data type usable in differential dataflow. /// @@ -106,7 +105,6 @@ pub mod logging; pub mod consolidation; pub mod capture; pub mod containers; -mod into_owned; /// Configuration options for differential dataflow. #[derive(Default)] diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 7b461f816..868f69a93 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -26,7 +26,7 @@ use timely::progress::Timestamp; use timely::progress::Antichain; use timely::dataflow::operators::Capability; -use crate::{Data, ExchangeData, Collection, AsCollection, Hashable, IntoOwned}; +use crate::{Data, ExchangeData, Collection, AsCollection, Hashable}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; @@ -285,10 +285,11 @@ where /// A direct implementation of `ReduceCore::reduce_abelian`. pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> where - for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, + T1: TraceReader, T2: for<'a> Trace< Key<'a>= T1::Key<'a>, - Val<'a> : IntoOwned<'a, Owned = V>, + KeyOwn = K, + ValOwn = V, Time=T1::Time, Diff: Abelian, >+'static, @@ -309,10 +310,11 @@ where /// A direct implementation of `ReduceCore::reduce_core`. pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> where - for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, + T1: TraceReader, T2: for<'a> Trace< Key<'a>=T1::Key<'a>, - Val<'a> : IntoOwned<'a, Owned = V>, + KeyOwn = K, + ValOwn = V, Time=T1::Time, >+'static, K: Ord + 'static, diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 423f45261..4a8eae843 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -111,7 +111,9 @@ use timely::dataflow::operators::Capability; use crate::operators::arrange::arrangement::Arranged; use crate::trace::{Builder, Description}; use crate::trace::{self, Trace, TraceReader, Cursor}; -use crate::{ExchangeData, Hashable, IntoOwned}; +use crate::{ExchangeData, Hashable}; + +use crate::trace::implementations::containers::BatchContainer; use super::TraceAgent; @@ -131,9 +133,9 @@ pub fn arrange_from_upsert( ) -> Arranged> where G: Scope, - Tr: Trace + for<'a> TraceReader< - Key<'a> : IntoOwned<'a, Owned = K>, - Val<'a> : IntoOwned<'a, Owned = V>, + Tr: for<'a> Trace< + KeyOwn = K, + ValOwn = V, Time: TotalOrder+ExchangeData, Diff=isize, >+'static, @@ -148,7 +150,7 @@ where let reader = &mut reader; - let exchange = Exchange::new(move |update: &(K,Option,G::Timestamp)| (update.0).hashed().into()); + let exchange = Exchange::<(K, Option, G::Timestamp), _>::new(move |update: &(K,Option,G::Timestamp)| (update.0).hashed().into()); stream.unary_frontier(exchange, name, move |_capability, info| { @@ -239,8 +241,8 @@ where let mut prev_value: Option = None; // Attempt to find the key in the trace. - trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key)); - if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) { + trace_cursor.seek_key(&trace_storage, Tr::KeyContainer::borrow_as(&key)); + if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&Tr::KeyContainer::borrow_as(&key))).unwrap_or(false) { // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; @@ -248,7 +250,7 @@ where assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); - prev_value = Some(val.into_owned()); + prev_value = Some(Tr::owned_val(val)); } trace_cursor.step_val(&trace_storage); } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index d60c013db..fb0042766 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -8,7 +8,7 @@ use timely::Container; use timely::container::PushInto; use crate::hashable::Hashable; -use crate::{Data, ExchangeData, Collection, IntoOwned}; +use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian}; use timely::order::PartialOrder; @@ -24,6 +24,7 @@ use crate::lattice::Lattice; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder}; +use crate::trace::implementations::containers::BatchContainer; use crate::trace::TraceReader; @@ -84,7 +85,7 @@ impl Reduce for Collection impl Reduce for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a K, Val<'a>=&'a V, Diff=R>+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a V, Diff=R>+Clone+'static, { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { @@ -160,7 +161,7 @@ impl, K: ExchangeData+Hashable, R1: ExchangeDat impl Threshold for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Diff=R1>+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R1>+Clone+'static, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { self.reduce_abelian::<_,K,(),KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) @@ -207,7 +208,7 @@ impl, K: ExchangeData+Hashable, R: ExchangeData impl Count for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Diff=R>+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R>+Clone+'static, { fn count_core + 'static>(&self) -> Collection { self.reduce_abelian::<_,K,R,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) @@ -246,7 +247,8 @@ pub trait ReduceCore, K: ToOwned + ?Sized, V: D where T2: for<'a> Trace< Key<'a>= &'a K, - Val<'a> : IntoOwned<'a, Owned = V>, + KeyOwn = K, + ValOwn = V, Time=G::Timestamp, Diff: Abelian, >+'static, @@ -271,7 +273,8 @@ pub trait ReduceCore, K: ToOwned + ?Sized, V: D where T2: for<'a> Trace< Key<'a>=&'a K, - Val<'a> : IntoOwned<'a, Owned = V>, + KeyOwn = K, + ValOwn = V, Time=G::Timestamp, >+'static, Bu: Builder, Output = T2::Batch>, @@ -292,7 +295,8 @@ where V: Data, T2: for<'a> Trace< Key<'a>=&'a K, - Val<'a> : IntoOwned<'a, Owned = V>, + KeyOwn = K, + ValOwn = V, Time=G::Timestamp, >+'static, Bu: Builder, Output = T2::Batch>, @@ -309,8 +313,8 @@ where pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, - T1: for<'a> TraceReader : IntoOwned<'a, Owned = K>> + Clone + 'static, - T2: for<'a> Trace=T1::Key<'a>, Val<'a> : IntoOwned<'a, Owned = V>, Time=T1::Time> + 'static, + T1: for<'a> TraceReader + Clone + 'static, + T2: for<'a> Trace=T1::Key<'a>, ValOwn = V, Time=T1::Time> + 'static, K: Ord + 'static, V: Data, Bu: Builder>, @@ -472,7 +476,7 @@ where use std::borrow::Borrow; // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed.get(exposed_position).map(|x| <_ as IntoOwned>::borrow_as(&x.0)); + let key1 = exposed.get(exposed_position).map(|x| T1::KeyContainer::borrow_as(&x.0)); let key2 = batch_cursor.get_key(batch_storage); let key = match (key1, key2) { (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), @@ -488,7 +492,7 @@ where interesting_times.clear(); // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. - while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(& as IntoOwned>::borrow_as(&k))).unwrap_or(false) { + while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(&T1::KeyContainer::borrow_as(&k))).unwrap_or(false) { interesting_times.push(exposed[exposed_position].1.clone()); exposed_position += 1; } @@ -516,7 +520,7 @@ where // Record future warnings about interesting times (and assert they should be "future"). for time in new_interesting_times.drain(..) { debug_assert!(upper_limit.less_equal(&time)); - interesting.push((key.into_owned(), time)); + interesting.push((T1::owned_key(key), time)); } // Sort each buffer by value and move into the corresponding builder. @@ -526,7 +530,7 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - buffer.push_into(((key.into_owned(), val), time, diff)); + buffer.push_into(((T1::owned_key(key), val), time, diff)); builders[index].push(&mut buffer); buffer.clear(); } @@ -626,7 +630,7 @@ fn sort_dedup(list: &mut Vec) { trait PerKeyCompute<'a, C1, C2, C3, V> where C1: Cursor, - C2: for<'b> Cursor = C1::Key<'a>, Val<'b> : IntoOwned<'b, Owned = V>, Time = C1::Time>, + C2: for<'b> Cursor = C1::Key<'a>, ValOwn = V, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, V: Clone + Ord, { @@ -661,7 +665,6 @@ mod history_replay { use crate::lattice::Lattice; use crate::trace::Cursor; use crate::operators::ValueHistory; - use crate::IntoOwned; use super::{PerKeyCompute, sort_dedup}; @@ -690,7 +693,7 @@ mod history_replay { impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V> where C1: Cursor, - C2: for<'b> Cursor = C1::Key<'a>, Val<'b> : IntoOwned<'b, Owned = V>, Time = C1::Time>, + C2: for<'b> Cursor = C1::Key<'a>, ValOwn = V, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, V: Clone + Ord, { @@ -897,7 +900,7 @@ mod history_replay { meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet)); for &((value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { - self.output_buffer.push((value.into_owned(), diff.clone())); + self.output_buffer.push((C2::owned_val(value), diff.clone())); } else { self.temporary.push(next_time.join(time)); diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index f6ea686ac..8b270ef84 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -118,7 +118,7 @@ pub trait LayoutExt : LaidOut: Copy + Ord; /// Alias for an owned val of a layout. - type ValOwn; + type ValOwn: Clone + Ord; /// Alias for an borrowed val of a layout. type Val<'a>: Copy + Ord; /// Alias for an owned time of a layout. @@ -522,7 +522,7 @@ pub mod containers { /// A general-purpose container resembling `Vec`. pub trait BatchContainer: 'static { /// An owned instance of `Self::ReadItem<'_>`. - type Owned; + type Owned: Clone + Ord; /// The type that can be read back out of the container. type ReadItem<'a>: Copy + Ord; diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 769500e8b..e9c2ddc0c 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -54,7 +54,9 @@ pub trait TraceReader : LayoutExt { LaidOut + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, + KeyOwn = Self::KeyOwn, Val<'a> = Self::Val<'a>, + ValOwn = Self::ValOwn, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, @@ -75,7 +77,9 @@ pub trait TraceReader : LayoutExt { LaidOut + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, + KeyOwn = Self::KeyOwn, Val<'a> = Self::Val<'a>, + ValOwn = Self::ValOwn, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, @@ -249,7 +253,9 @@ pub trait BatchReader : LayoutExt + Sized { LaidOut + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, + KeyOwn = Self::KeyOwn, Val<'a> = Self::Val<'a>, + ValOwn = Self::ValOwn, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index d2bbbf1cd..2633d0b62 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -4,7 +4,6 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; -use differential_dataflow::IntoOwned; /// Reports a number of extensions to a stream of prefixes. /// @@ -20,11 +19,7 @@ pub fn count( ) -> Collection where G: Scope, - Tr: for<'a> TraceReader< - Key<'a>: IntoOwned<'a, Owned = K>, - TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, - Diff=isize, - >+Clone+'static, + Tr: TraceReader+Clone+'static, for<'a> Tr::Diff : Semigroup>, K: Hashable + Ord + Default + 'static, R: Monoid+Multiply+ExchangeData, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 0ed560232..bf9b3b589 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -50,7 +50,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{Cursor, TraceReader}; use differential_dataflow::consolidation::{consolidate, consolidate_updates}; -use differential_dataflow::IntoOwned; +use differential_dataflow::trace::implementations::BatchContainer; /// A binary equijoin that responds to updates on only its first input. /// @@ -86,7 +86,7 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>, TimeGat<'a> : IntoOwned<'a, Owned = Tr::Time>>+Clone+'static, + Tr: TraceReader+Clone+'static, R: Mul, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, @@ -156,7 +156,7 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>, TimeGat<'a> : IntoOwned<'a, Owned = Tr::Time>>+Clone+'static, + Tr: for<'a> TraceReader+Clone+'static, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(std::time::Instant, usize) -> bool + 'static, @@ -306,17 +306,17 @@ fn process_proposals( comparison: &CF, yield_function: &Y, output_func: &mut S, - mut output_buffer: &mut Vec<(::Time, ::Diff)>, + mut output_buffer: &mut Vec<(Tr::Time, Tr::Diff)>, timer: Instant, work: &mut usize, trace: &mut Tr, - proposals: &mut Vec<((K, V, ::Time), ::Time, R)>, + proposals: &mut Vec<((K, V, Tr::Time), Tr::Time, R)>, mut session: SessionFor, - frontier: AntichainRef<::Time> + frontier: AntichainRef ) -> bool where G: Scope, - Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>, TimeGat<'a> : IntoOwned<'a, Owned = Tr::Time>>, + Tr: for<'a> TraceReader, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(Instant, usize) -> bool + 'static, S: FnMut(&mut SessionFor, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static, @@ -335,14 +335,13 @@ where for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() { // Use TOTAL ORDER to allow the release of `time`. yielded = yielded || yield_function(timer, *work); - if !yielded && !frontier.iter().any(|t| comparison( as IntoOwned>::borrow_as(t), initial)) { - use differential_dataflow::IntoOwned; - cursor.seek_key(&storage, IntoOwned::borrow_as(key)); - if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) { + if !yielded && !frontier.iter().any(|t| comparison(Tr::TimeContainer::borrow_as(t), initial)) { + cursor.seek_key(&storage, Tr::KeyContainer::borrow_as(key)); + if cursor.get_key(&storage) == Some(Tr::KeyContainer::borrow_as(key)) { while let Some(val2) = cursor.get_val(&storage) { cursor.map_times(&storage, |t, d| { if comparison(t, initial) { - let mut t = t.into_owned(); + let mut t = Tr::owned_time(t); t.join_assign(time); output_buffer.push((t, Tr::owned_diff(d))) } diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 436f8da7c..0b0f6df5a 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{IsZero, Semigroup, Monoid}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{Cursor, TraceReader}; -use differential_dataflow::IntoOwned; +use differential_dataflow::trace::implementations::BatchContainer; /// Proposes extensions to a stream of prefixes. /// @@ -29,8 +29,7 @@ pub fn lookup_map( where G: Scope, Tr: for<'a> TraceReader< - Key<'a>: IntoOwned<'a, Owned = K>, - TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, + KeyOwn = K, Diff : Semigroup>+Monoid+ExchangeData, >+Clone+'static, K: Hashable + Ord + 'static, @@ -94,13 +93,12 @@ where for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() { if !input2.frontier.less_equal(time) { logic2(prefix, &mut key1); - use differential_dataflow::IntoOwned; - cursor.seek_key(&storage, IntoOwned::borrow_as(&key1)); - if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) { + cursor.seek_key(&storage, Tr::KeyContainer::borrow_as(&key1)); + if cursor.get_key(&storage) == Some(Tr::KeyContainer::borrow_as(&key1)) { while let Some(value) = cursor.get_val(&storage) { let mut count = Tr::Diff::zero(); cursor.map_times(&storage, |t, d| { - if t.into_owned().less_equal(time) { count.plus_equals(&d); } + if Tr::owned_time(t).less_equal(time) { count.plus_equals(&d); } }); if !count.is_zero() { let (dout, rout) = output_func(prefix, diff, value, &count); diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index 7b96f00fd..b519ecf47 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -4,7 +4,6 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; -use differential_dataflow::IntoOwned; /// Proposes extensions to a prefix stream. /// @@ -22,9 +21,8 @@ pub fn propose( where G: Scope, Tr: for<'a> TraceReader< - Key<'a> : IntoOwned<'a, Owned = K>, - Val<'a> : IntoOwned<'a, Owned = V>, - TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, + KeyOwn = K, + ValOwn = V, Diff: Monoid+Multiply+ExchangeData+Semigroup>, >+Clone+'static, K: Hashable + Default + Ord + 'static, @@ -36,7 +34,7 @@ where prefixes, arrangement, move |p: &P, k: &mut K | { *k = key_selector(p); }, - move |prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)), + move |prefix, diff, value, sum| ((prefix.clone(), Tr::owned_val(value)), diff.clone().multiply(sum)), Default::default(), Default::default(), Default::default(), @@ -56,9 +54,8 @@ pub fn propose_distinct( where G: Scope, Tr: for<'a> TraceReader< - Key<'a> : IntoOwned<'a, Owned = K>, - Val<'a> : IntoOwned<'a, Owned = V>, - TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, + KeyOwn = K, + ValOwn = V, Diff : Semigroup>+Monoid+Multiply+ExchangeData, >+Clone+'static, K: Hashable + Default + Ord + 'static, @@ -70,7 +67,7 @@ where prefixes, arrangement, move |p: &P, k: &mut K| { *k = key_selector(p); }, - move |prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()), + move |prefix, diff, value, _sum| ((prefix.clone(), Tr::owned_val(value)), diff.clone()), Default::default(), Default::default(), Default::default(), diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 330661027..9c111d252 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -6,7 +6,6 @@ use differential_dataflow::{ExchangeData, Collection}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; -use differential_dataflow::IntoOwned; /// Proposes extensions to a stream of prefixes. /// @@ -21,8 +20,7 @@ pub fn validate( where G: Scope, Tr: for<'a> TraceReader< - Key<'a> : IntoOwned<'a, Owned = (K, V)>, - TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, + KeyOwn = (K, V), Diff : Semigroup>+Monoid+Multiply+ExchangeData, >+Clone+'static, K: Ord+Hash+Clone+Default + 'static, From f19aca65e30eda1f7ab559c3b3ee269b296c15e2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 15 Jul 2025 22:16:56 -0400 Subject: [PATCH 5/6] Clean-up --- .../src/operators/arrange/agent.rs | 4 ++-- .../src/operators/arrange/upsert.rs | 2 +- .../src/trace/cursor/cursor_list.rs | 4 ++-- .../src/trace/implementations/mod.rs | 6 +++--- .../src/trace/implementations/ord_neu.rs | 12 ++++++------ .../src/trace/implementations/rhh.rs | 6 +++--- .../src/trace/implementations/spine_fueled.rs | 10 ++-------- differential-dataflow/src/trace/mod.rs | 14 +++++++------- differential-dataflow/src/trace/wrappers/enter.rs | 10 +++++----- .../src/trace/wrappers/enter_at.rs | 10 +++++----- differential-dataflow/src/trace/wrappers/filter.rs | 10 +++++----- differential-dataflow/src/trace/wrappers/freeze.rs | 10 +++++----- .../src/trace/wrappers/frontier.rs | 10 +++++----- differential-dataflow/src/trace/wrappers/rc.rs | 4 ++-- 14 files changed, 53 insertions(+), 59 deletions(-) diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index a35815860..71951f856 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -36,8 +36,8 @@ pub struct TraceAgent { logging: Option, } -use crate::trace::implementations::LaidOut; -impl LaidOut for TraceAgent { +use crate::trace::implementations::WithLayout; +impl WithLayout for TraceAgent { type Layout = Tr::Layout; } diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 4a8eae843..c3a354ff3 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -150,7 +150,7 @@ where let reader = &mut reader; - let exchange = Exchange::<(K, Option, G::Timestamp), _>::new(move |update: &(K,Option,G::Timestamp)| (update.0).hashed().into()); + let exchange = Exchange::new(move |update: &(K,Option,G::Timestamp)| (update.0).hashed().into()); stream.unary_frontier(exchange, name, move |_capability, info| { diff --git a/differential-dataflow/src/trace/cursor/cursor_list.rs b/differential-dataflow/src/trace/cursor/cursor_list.rs index 94fbda7cb..6d2d5b57b 100644 --- a/differential-dataflow/src/trace/cursor/cursor_list.rs +++ b/differential-dataflow/src/trace/cursor/cursor_list.rs @@ -94,8 +94,8 @@ impl CursorList { } } -use crate::trace::implementations::LaidOut; -impl LaidOut for CursorList { +use crate::trace::implementations::WithLayout; +impl WithLayout for CursorList { type Layout = C::Layout; } diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index 8b270ef84..69ca3a50f 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -106,13 +106,13 @@ pub trait Layout { } /// A type bearing a layout. -pub trait LaidOut { +pub trait WithLayout { /// The layout. type Layout: Layout; } /// Automatically implemented trait for types with layouts. -pub trait LayoutExt : LaidOut> { +pub trait LayoutExt : WithLayout> { /// Alias for an owned key of a layout. type KeyOwn; /// Alias for an borrowed key of a layout. @@ -152,7 +152,7 @@ pub trait LayoutExt : LaidOut, onto: &mut Self::Time); } -impl LayoutExt for L { +impl LayoutExt for L { type KeyOwn = <::KeyContainer as BatchContainer>::Owned; type Key<'a> = <::KeyContainer as BatchContainer>::ReadItem<'a>; type ValOwn = <::ValContainer as BatchContainer>::Owned; diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 69d08bc67..8cde8383f 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -322,7 +322,7 @@ pub mod val_batch { pub updates: usize, } - impl LaidOut for OrdValBatch { + impl WithLayout for OrdValBatch { type Layout = L; } @@ -592,8 +592,8 @@ pub mod val_batch { phantom: PhantomData, } - use crate::trace::implementations::LaidOut; - impl LaidOut for OrdValCursor { + use crate::trace::implementations::WithLayout; + impl WithLayout for OrdValCursor { type Layout = L; } @@ -794,7 +794,7 @@ pub mod key_batch { pub updates: usize, } - impl Layout = &'a ()>>> LaidOut for OrdKeyBatch { + impl Layout = &'a ()>>> WithLayout for OrdKeyBatch { type Layout = L; } @@ -983,8 +983,8 @@ pub mod key_batch { phantom: PhantomData, } - use crate::trace::implementations::LaidOut; - impl Layout = &'a ()>>> LaidOut for OrdKeyCursor { + use crate::trace::implementations::WithLayout; + impl Layout = &'a ()>>> WithLayout for OrdKeyCursor { type Layout = L; } diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index 75de4486e..9dcf68ede 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -273,7 +273,7 @@ mod val_batch { pub updates: usize, } - impl LaidOut for RhhValBatch + impl WithLayout for RhhValBatch where layout::Key: Default + HashOrdered, for<'a> layout::KeyRef<'a, L>: HashOrdered, @@ -651,8 +651,8 @@ mod val_batch { phantom: PhantomData, } - use crate::trace::implementations::LaidOut; - impl LaidOut for RhhValCursor + use crate::trace::implementations::WithLayout; + impl WithLayout for RhhValCursor where layout::Key: Default + HashOrdered, for<'a> layout::KeyRef<'a, L>: HashOrdered, diff --git a/differential-dataflow/src/trace/implementations/spine_fueled.rs b/differential-dataflow/src/trace/implementations/spine_fueled.rs index ca78386b3..9cdd27443 100644 --- a/differential-dataflow/src/trace/implementations/spine_fueled.rs +++ b/differential-dataflow/src/trace/implementations/spine_fueled.rs @@ -99,18 +99,12 @@ pub struct Spine { exert_logic: Option, } -use crate::trace::LaidOut; -impl LaidOut for Spine { +use crate::trace::WithLayout; +impl WithLayout for Spine { type Layout = B::Layout; } impl TraceReader for Spine { - // type Key<'a> = B::Key<'a>; - // type Val<'a> = B::Val<'a>; - // type Time = B::Time; - // type TimeGat<'a> = B::TimeGat<'a>; - // type Diff = B::Diff; - // type DiffGat<'a> = B::DiffGat<'a>; type Batch = B; type Storage = Vec; diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index e9c2ddc0c..27598b59a 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -51,7 +51,7 @@ pub trait TraceReader : LayoutExt { 'static + Clone + BatchReader + - LaidOut + + WithLayout + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, KeyOwn = Self::KeyOwn, @@ -74,7 +74,7 @@ pub trait TraceReader : LayoutExt { /// The type used to enumerate the collections contents. type Cursor: Cursor + - LaidOut + + WithLayout + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, KeyOwn = Self::KeyOwn, @@ -237,7 +237,7 @@ pub trait Trace : TraceReader { fn close(&mut self); } -use crate::trace::implementations::LaidOut; +use crate::trace::implementations::WithLayout; /// A batch of updates whose contents may be read. /// @@ -250,7 +250,7 @@ pub trait BatchReader : LayoutExt + Sized { /// The type used to enumerate the batch's contents. type Cursor: Cursor + - LaidOut + + WithLayout + for<'a> LayoutExt< Key<'a> = Self::Key<'a>, KeyOwn = Self::KeyOwn, @@ -377,7 +377,7 @@ pub mod rc_blanket_impls { use timely::progress::{Antichain, frontier::AntichainRef}; use super::{Batch, BatchReader, Builder, Merger, Cursor, Description}; - impl LaidOut for Rc { + impl WithLayout for Rc { type Layout = B::Layout; } @@ -401,8 +401,8 @@ pub mod rc_blanket_impls { cursor: C, } - use crate::trace::implementations::LaidOut; - impl LaidOut for RcBatchCursor { + use crate::trace::implementations::WithLayout; + impl WithLayout for RcBatchCursor { type Layout = C::Layout; } diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index dedcd9400..6886631df 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -25,7 +25,7 @@ impl Clone for TraceEnter { } } -impl LaidOut for TraceEnter +impl WithLayout for TraceEnter where Tr: TraceReader, TInner: Refines+Lattice, @@ -116,7 +116,7 @@ pub struct BatchEnter { description: Description, } -impl LaidOut for BatchEnter +impl WithLayout for BatchEnter where B: BatchReader, TInner: Refines+Lattice, @@ -168,8 +168,8 @@ pub struct CursorEnter { cursor: C, } -use crate::trace::implementations::{Layout, LaidOut}; -impl LaidOut for CursorEnter +use crate::trace::implementations::{Layout, WithLayout}; +impl WithLayout for CursorEnter where C: Cursor, TInner: Refines+Lattice, @@ -242,7 +242,7 @@ impl BatchCursorEnter { } } -impl LaidOut for BatchCursorEnter +impl WithLayout for BatchCursorEnter where C: Cursor, TInner: Refines+Lattice, diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index 844a8858c..3a789f473 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -40,7 +40,7 @@ where } } -impl LaidOut for TraceEnter +impl WithLayout for TraceEnter where Tr: TraceReader, TInner: Refines+Lattice, @@ -140,7 +140,7 @@ pub struct BatchEnter { logic: F, } -impl LaidOut for BatchEnter +impl WithLayout for BatchEnter where B: BatchReader, TInner: Refines+Lattice, @@ -196,8 +196,8 @@ pub struct CursorEnter { logic: F, } -use crate::trace::implementations::{Layout, LaidOut}; -impl LaidOut for CursorEnter +use crate::trace::implementations::{Layout, WithLayout}; +impl WithLayout for CursorEnter where C: Cursor, TInner: Refines+Lattice, @@ -267,7 +267,7 @@ pub struct BatchCursorEnter { logic: F, } -impl LaidOut for BatchCursorEnter +impl WithLayout for BatchCursorEnter where C: Cursor, TInner: Refines+Lattice, diff --git a/differential-dataflow/src/trace/wrappers/filter.rs b/differential-dataflow/src/trace/wrappers/filter.rs index ef1e7c034..ad49543bd 100644 --- a/differential-dataflow/src/trace/wrappers/filter.rs +++ b/differential-dataflow/src/trace/wrappers/filter.rs @@ -24,7 +24,7 @@ where } } -impl LaidOut for TraceFilter { +impl WithLayout for TraceFilter { type Layout = Tr::Layout; } @@ -72,7 +72,7 @@ pub struct BatchFilter { logic: F, } -impl LaidOut for BatchFilter { +impl WithLayout for BatchFilter { type Layout = B::Layout; } @@ -106,8 +106,8 @@ pub struct CursorFilter { logic: F, } -use crate::trace::implementations::LaidOut; -impl LaidOut for CursorFilter { +use crate::trace::implementations::WithLayout; +impl WithLayout for CursorFilter { type Layout = C::Layout; } @@ -163,7 +163,7 @@ pub struct BatchCursorFilter { logic: F, } -impl LaidOut for BatchCursorFilter { +impl WithLayout for BatchCursorFilter { type Layout = C::Layout; } diff --git a/differential-dataflow/src/trace/wrappers/freeze.rs b/differential-dataflow/src/trace/wrappers/freeze.rs index 1ce3f0889..5af2cdbf3 100644 --- a/differential-dataflow/src/trace/wrappers/freeze.rs +++ b/differential-dataflow/src/trace/wrappers/freeze.rs @@ -69,7 +69,7 @@ where } } -impl LaidOut for TraceFreeze +impl WithLayout for TraceFreeze where Tr: TraceReader, F: Fn(Tr::TimeGat<'_>)->Option, @@ -139,7 +139,7 @@ impl Clone for BatchFreeze { } } -impl LaidOut for BatchFreeze +impl WithLayout for BatchFreeze where B: BatchReader, F: Fn(B::TimeGat<'_>)->Option, @@ -184,8 +184,8 @@ pub struct CursorFreeze { func: Rc, } -use crate::trace::implementations::{Layout, LaidOut}; -impl LaidOut for CursorFreeze { +use crate::trace::implementations::{Layout, WithLayout}; +impl WithLayout for CursorFreeze { type Layout = ( ::KeyContainer, ::ValContainer, @@ -243,7 +243,7 @@ pub struct BatchCursorFreeze { func: Rc, } -impl LaidOut for BatchCursorFreeze { +impl WithLayout for BatchCursorFreeze { type Layout = ( ::KeyContainer, ::ValContainer, diff --git a/differential-dataflow/src/trace/wrappers/frontier.rs b/differential-dataflow/src/trace/wrappers/frontier.rs index b22fb5b90..22cca80fe 100644 --- a/differential-dataflow/src/trace/wrappers/frontier.rs +++ b/differential-dataflow/src/trace/wrappers/frontier.rs @@ -31,7 +31,7 @@ impl Clone for TraceFrontier { } } -impl LaidOut for TraceFrontier { +impl WithLayout for TraceFrontier { type Layout = ( ::KeyContainer, ::ValContainer, @@ -86,7 +86,7 @@ pub struct BatchFrontier { until: Antichain, } -impl LaidOut for BatchFrontier { +impl WithLayout for BatchFrontier { type Layout = ( ::KeyContainer, ::ValContainer, @@ -125,8 +125,8 @@ pub struct CursorFrontier { until: Antichain } -use crate::trace::implementations::{Layout, LaidOut}; -impl LaidOut for CursorFrontier { +use crate::trace::implementations::{Layout, WithLayout}; +impl WithLayout for CursorFrontier { type Layout = ( ::KeyContainer, ::ValContainer, @@ -192,7 +192,7 @@ pub struct BatchCursorFrontier { until: Antichain, } -impl LaidOut for BatchCursorFrontier { +impl WithLayout for BatchCursorFrontier { type Layout = ( ::KeyContainer, ::ValContainer, diff --git a/differential-dataflow/src/trace/wrappers/rc.rs b/differential-dataflow/src/trace/wrappers/rc.rs index bc2fa2064..9051db631 100644 --- a/differential-dataflow/src/trace/wrappers/rc.rs +++ b/differential-dataflow/src/trace/wrappers/rc.rs @@ -78,8 +78,8 @@ pub struct TraceRc { pub wrapper: Rc>>, } -use crate::trace::LaidOut; -impl LaidOut for TraceRc { +use crate::trace::WithLayout; +impl WithLayout for TraceRc { type Layout = Tr::Layout; } From faceb47150bd68c6264b36882a80e0d9f327127d Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 15 Jul 2025 22:24:08 -0400 Subject: [PATCH 6/6] Remove generics from upsert --- .../src/operators/arrange/upsert.rs | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index c3a354ff3..fcf9dfe46 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -59,7 +59,7 @@ //! use differential_dataflow::operators::arrange::upsert; //! //! let stream = scope.input_from(&mut input); -//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValBuilder, ValSpine>(&stream, &"test"); +//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder, ValSpine>(&stream, &"test"); //! //! arranged //! .as_collection(|k,v| (k.clone(), v.clone())) @@ -127,21 +127,19 @@ use super::TraceAgent; /// This method is only implemented for totally ordered times, as we do not yet /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. -pub fn arrange_from_upsert( - stream: &Stream, G::Timestamp)>, +pub fn arrange_from_upsert( + stream: &Stream, G::Timestamp)>, name: &str, ) -> Arranged> where G: Scope, Tr: for<'a> Trace< - KeyOwn = K, - ValOwn = V, + KeyOwn: ExchangeData+Hashable+std::hash::Hash, + ValOwn: ExchangeData, Time: TotalOrder+ExchangeData, Diff=isize, >+'static, - K: ExchangeData+Hashable+std::hash::Hash, - V: ExchangeData, - Bu: Builder, Output = Tr::Batch>, + Bu: Builder, Output = Tr::Batch>, { let mut reader: Option> = None; @@ -150,7 +148,7 @@ where let reader = &mut reader; - let exchange = Exchange::new(move |update: &(K,Option,G::Timestamp)| (update.0).hashed().into()); + let exchange = Exchange::new(move |update: &(Tr::KeyOwn,Option,G::Timestamp)| (update.0).hashed().into()); stream.unary_frontier(exchange, name, move |_capability, info| { @@ -175,7 +173,7 @@ where let mut prev_frontier = Antichain::from_elem(::minimum()); // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap). - let mut priority_queue = BinaryHeap::)>>::new(); + let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); move |input, output| { @@ -238,7 +236,7 @@ where for (key, mut list) in to_process { // The prior value associated with the key. - let mut prev_value: Option = None; + let mut prev_value: Option = None; // Attempt to find the key in the trace. trace_cursor.seek_key(&trace_storage, Tr::KeyContainer::borrow_as(&key));