From b037eee5f5c79846518dd01cda8b520f4bd641ce Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 21 Mar 2025 16:30:54 +0100 Subject: [PATCH 1/2] Introduce EnterTime, fix Negate Introduces the EnterTime trait, which allows streams to explain to differential how they can modify their timestamp component to match the inner time. Fixes the Negate trait so it can be implemented by downstream crates. Due to Rust's rules, the local type has to come first, so the `G` parameter has to come after the container type. Signed-off-by: Moritz Hoffmann --- differential-dataflow/src/collection.rs | 64 ++++++++++--------- differential-dataflow/src/operators/enter.rs | 54 ++++++++++++++++ .../src/operators/iterate.rs | 2 +- differential-dataflow/src/operators/mod.rs | 7 +- differential-dataflow/src/operators/negate.rs | 6 +- 5 files changed, 95 insertions(+), 38 deletions(-) create mode 100644 differential-dataflow/src/operators/enter.rs diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index dd173bcf8..d841967fd 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -22,6 +22,7 @@ use timely::dataflow::StreamCore; use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; use crate::hashable::Hashable; +use crate::operators::enter::EnterTime; /// A mutable collection of values of type `D` /// @@ -173,6 +174,37 @@ impl Collection { self.inner.scope() } + /// Brings a Collection into a nested scope. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::Scope; + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let result = scope.region(|child| { + /// data.enter(child) + /// .leave() + /// }); + /// + /// data.assert_eq(&result); + /// }); + /// ``` + pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection, D, R, as EnterTime>::Container> + where + T: Refines, + StreamCore: EnterTime, + { + self.inner + .enter_time(child) + .as_collection() + } + + /// Creates a new collection whose counts are the negation of those in the input. /// /// This method is most commonly used with `concat` to get those element in one collection but not another. @@ -198,7 +230,7 @@ impl Collection { /// ``` // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect // an inherent method on `Collection`. - pub fn negate(&self) -> Collection where StreamCore: crate::operators::Negate { + pub fn negate(&self) -> Collection where StreamCore: crate::operators::Negate { crate::operators::Negate::negate(&self.inner).as_collection() } } @@ -362,36 +394,6 @@ impl Collection { .as_collection() } - /// Brings a Collection into a nested scope. - /// - /// # Examples - /// - /// ``` - /// use timely::dataflow::Scope; - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let result = scope.region(|child| { - /// data.enter(child) - /// .leave() - /// }); - /// - /// data.assert_eq(&result); - /// }); - /// ``` - pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection, D, R> - where - T: Refines<::Timestamp>, - { - self.inner - .enter(child) - .map(|(data, time, diff)| (data, T::to_inner(time), diff)) - .as_collection() - } - /// Brings a Collection into a nested scope, at varying times. /// /// The `initial` function indicates the time at which each element of the Collection should appear. diff --git a/differential-dataflow/src/operators/enter.rs b/differential-dataflow/src/operators/enter.rs new file mode 100644 index 000000000..361b0fdc9 --- /dev/null +++ b/differential-dataflow/src/operators/enter.rs @@ -0,0 +1,54 @@ +//! Enter a collection into a scope. + +use timely::Data; +use timely::dataflow::{Scope, ScopeParent, Stream, StreamCore}; +use timely::dataflow::operators::core::{Enter, Map}; +use timely::dataflow::scopes::Child; +use timely::progress::timestamp::Refines; + +/// Extension trait for streams. +pub trait EnterTime +where + G: ScopeParent, + TInner: Refines, +{ + /// The containers in the output stream. + type Container: Clone; + + /// Brings a stream into a nested scope. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::Scope; + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let result = scope.region(|child| { + /// data.enter(child) + /// .leave() + /// }); + /// + /// data.assert_eq(&result); + /// }); + /// ``` + fn enter_time<'a>(&self, child: &Child<'a, G, TInner>) -> StreamCore, Self::Container>; +} + +impl EnterTime, G, TInner> for Stream +where + G: Scope, + D: Data, + R: Data, + TInner: Refines, +{ + type Container = Vec<(D, TInner, R)>; + + fn enter_time<'a>(&self, child: &Child<'a, G, TInner>) -> Stream, (D, TInner, R)> { + self.enter(child) + .map(|(data, time, diff)| (data, TInner::to_inner(time), diff)) + } +} diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index 6056ad8cd..36caed7c7 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -169,7 +169,7 @@ where impl Variable where G::Timestamp: Lattice, - StreamCore: crate::operators::Negate + ResultsIn, + StreamCore: crate::operators::Negate + ResultsIn, { /// Creates a new initially empty `Variable`. /// diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 615cfa399..1d625b2ee 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -12,12 +12,13 @@ pub use self::count::CountTotal; pub use self::threshold::ThresholdTotal; pub mod arrange; -pub mod negate; -pub mod reduce; pub mod consolidate; +pub mod count; +pub mod enter; pub mod iterate; pub mod join; -pub mod count; +pub mod negate; +pub mod reduce; pub mod threshold; use crate::lattice::Lattice; diff --git a/differential-dataflow/src/operators/negate.rs b/differential-dataflow/src/operators/negate.rs index b354c95ca..2388f17c4 100644 --- a/differential-dataflow/src/operators/negate.rs +++ b/differential-dataflow/src/operators/negate.rs @@ -35,18 +35,18 @@ pub trait Negate { fn negate(&self) -> Self; } -impl Negate for Collection +impl Negate for Collection where G: Scope, C: Clone, - StreamCore: Negate, + StreamCore: Negate, { fn negate(&self) -> Self { self.inner.negate().as_collection() } } -impl Negate> for Stream { +impl Negate, G> for Stream { fn negate(&self) -> Self { self.map_in_place(|x| x.2.negate()) } From 1d4e4063cf8311aace8b526cecbf9c0ef18f52a8 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 24 Mar 2025 09:28:11 +0100 Subject: [PATCH 2/2] Add Negate and EnterTime implementations Signed-off-by: Moritz Hoffmann --- differential-dataflow/examples/columnar.rs | 88 ++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index af864bffb..ec5c97a67 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -460,3 +460,91 @@ pub mod batcher { } } } + +mod differential { + use columnar::Columnar; + use differential_dataflow::difference::Abelian; + use differential_dataflow::operators::enter::EnterTime; + use differential_dataflow::operators::Negate; + use timely::dataflow::channels::pact::Pipeline; + use timely::dataflow::operators::{Enter, Operator}; + use timely::dataflow::scopes::Child; + use timely::dataflow::{Scope, StreamCore}; + use timely::progress::timestamp::Refines; + use timely::{Container, Data}; + + use crate::builder::ColumnBuilder; + use crate::Column; + + impl EnterTime, G, TInner> + for StreamCore> + where + G: Scope, + G::Timestamp: Columnar, + ::Container: Data, + D: Columnar, + D::Container: Data, + R: Columnar, + R::Container: Data, + TInner: Columnar + Refines, + TInner::Container: Data, + { + type Container = Column<(D, TInner, R)>; + + fn enter_time<'a>( + &self, + child: &Child<'a, G, TInner>, + ) -> StreamCore, Column<(D, TInner, R)>> { + self.enter(child).unary::, _, _, _>( + Pipeline, + "EnterTimeColumn", + |_capability, _info| { + move |input, output| { + while let Some((time, data)) = input.next() { + let mut session = output.session_with_builder(&time); + for (data, time, diff) in data.iter() { + // TODO: This isn't optimal if `into_owned` needs to allocate. We could + // specialize over concrete timestamp types to avoid this, and work + // through columns at a time. + let time_owned = Columnar::into_owned(time); + session.give((data, &TInner::to_inner(time_owned), diff)); + } + } + } + }, + ) + } + } + + impl Negate, G> + for StreamCore> + where + G: Scope, + G::Timestamp: Columnar, + ::Container: Data, + D: Columnar, + D::Container: Data, + R: Columnar + Abelian, + R::Container: Data, + { + fn negate(&self) -> Self { + self.unary::, _, _, _>( + Pipeline, + "NegateColumn", + |_capability, _info| { + move |input, output| { + let mut diff_owned = R::zero(); + while let Some((time, data)) = input.next() { + let mut session = output.session_with_builder(&time); + for (data, time, diff) in data.iter() { + R::copy_from(&mut diff_owned, diff); + diff_owned.negate(); + session.give((data, time, &diff_owned)); + } + } + } + }, + ) + } + } +}