From a81f89abb477c40d3e70faed439b35865bb2c9d7 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 29 May 2025 11:40:21 +0200 Subject: [PATCH 1/2] Make batcher variant over builder Signed-off-by: Moritz Hoffmann --- differential-dataflow/examples/columnar.rs | 8 ++-- differential-dataflow/examples/spines.rs | 16 ++++---- .../src/operators/arrange/arrangement.rs | 18 ++++----- .../src/operators/consolidate.rs | 4 +- .../trace/implementations/merge_batcher.rs | 15 ++++---- .../src/trace/implementations/ord_neu.rs | 10 ++--- .../src/trace/implementations/rhh.rs | 4 +- differential-dataflow/src/trace/mod.rs | 7 +++- differential-dataflow/tests/trace.rs | 4 +- experiments/src/bin/deals.rs | 12 +++--- experiments/src/bin/graspan1.rs | 4 +- experiments/src/bin/graspan2.rs | 38 +++++++++---------- 12 files changed, 72 insertions(+), 68 deletions(-) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 3ab037666..7cd25b89c 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -46,8 +46,8 @@ fn main() { let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); - let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); - let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); + let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); + let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); @@ -351,8 +351,8 @@ use differential_dataflow::trace::implementations::merge_batcher::ColMerger; use differential_dataflow::containers::TimelyStack; /// A batcher for columnar storage. -pub type Col2ValBatcher = MergeBatcher, batcher::Chunker>, ColMerger<(K,V),T,R>>; -pub type Col2KeyBatcher = Col2ValBatcher; +pub type Col2ValBatcher = MergeBatcher, batcher::Chunker>, ColMerger<(K,V),T,R>, Bu>; +pub type Col2KeyBatcher = Col2ValBatcher; /// Types for consolidating, merging, and extracting columnar update collections. pub mod batcher { diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs index 9d5a82019..eaa932b5a 100644 --- a/differential-dataflow/examples/spines.rs +++ b/differential-dataflow/examples/spines.rs @@ -29,22 +29,22 @@ fn main() { match mode.as_str() { "new" => { use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine}; - let data = data.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); - let keys = keys.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); + let data = data.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); + let keys = keys.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "old" => { use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine}; - let data = data.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); - let keys = keys.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); + let data = data.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); + let keys = keys.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "rhh" => { use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine}; - let data = data.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); - let keys = keys.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); + let data = data.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); + let keys = keys.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, @@ -54,11 +54,11 @@ fn main() { let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) - .arrange::, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>() + .arrange::, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>() .reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) - .arrange::, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>() + .arrange::, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>() .reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>,PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index d1387c742..90c09db0b 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -355,7 +355,7 @@ where /// Arranges updates into a shared trace. fn arrange(&self) -> Arranged> where - Ba: Batcher + 'static, + Ba: Batcher + 'static, Bu: Builder, Tr: Trace + 'static, Tr::Batch: Batch, @@ -366,7 +366,7 @@ where /// Arranges updates into a shared trace, with a supplied name. fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher + 'static, + Ba: Batcher + 'static, Bu: Builder, Tr: Trace + 'static, Tr::Batch: Batch, @@ -383,7 +383,7 @@ where { fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher, Time=G::Timestamp> + 'static, + Ba: Batcher, Time=G::Timestamp> + 'static, Bu: Builder, Tr: Trace + 'static, Tr::Batch: Batch, @@ -403,7 +403,7 @@ where G: Scope, G::Timestamp: Lattice, P: ParallelizationContract, - Ba: Batcher + 'static, + Ba: Batcher + 'static, Ba::Input: Container + Clone + 'static, Bu: Builder, Tr: Trace+'static, @@ -512,7 +512,7 @@ where } // Extract updates not in advance of `upper`. - let batch = batcher.seal::(upper.clone()); + let batch = batcher.seal(upper.clone()); writer.insert(batch.clone(), Some(capability.time().clone())); @@ -540,7 +540,7 @@ where } else { // Announce progress updates, even without data. - let _batch = batcher.seal::(input.frontier().frontier().to_owned()); + let _batch = batcher.seal(input.frontier().frontier().to_owned()); writer.seal(input.frontier().frontier().to_owned()); } @@ -561,7 +561,7 @@ where { fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher, Time=G::Timestamp> + 'static, + Ba: Batcher, Time=G::Timestamp> + 'static, Bu: Builder, Tr: Trace + 'static, Tr::Batch: Batch, @@ -598,7 +598,7 @@ where } fn arrange_by_key_named(&self, name: &str) -> Arranged>> { - self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) + self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) } } @@ -633,6 +633,6 @@ where fn arrange_by_self_named(&self, name: &str) -> Arranged>> { self.map(|k| (k, ())) - .arrange_named::,KeyBuilder<_,_,_>,_>(name) + .arrange_named::,KeyBuilder<_,_,_>,_>(name) } } diff --git a/differential-dataflow/src/operators/consolidate.rs b/differential-dataflow/src/operators/consolidate.rs index 602f58f87..349bad59d 100644 --- a/differential-dataflow/src/operators/consolidate.rs +++ b/differential-dataflow/src/operators/consolidate.rs @@ -46,13 +46,13 @@ where /// ``` pub fn consolidate(&self) -> Self { use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine}; - self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>>("Consolidate") + self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>>("Consolidate") } /// As `consolidate` but with the ability to name the operator and specify the trace type. pub fn consolidate_named(&self, name: &str) -> Self where - Ba: Batcher, Time=G::Timestamp> + 'static, + Ba: Batcher, Time=G::Timestamp> + 'static, Tr: crate::trace::Trace+'static, for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = D>, Tr::Batch: crate::trace::Batch, diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 4de943594..f23916292 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -24,7 +24,7 @@ use crate::trace::{Batcher, Builder, Description}; /// /// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs, /// and must produce outputs of type `M::Chunk`. -pub struct MergeBatcher { +pub struct MergeBatcher { /// Transforms input streams to chunks of sorted, consolidated data. chunker: C, /// A sequence of power-of-two length lists of sorted, consolidated containers. @@ -44,14 +44,15 @@ pub struct MergeBatcher { /// Timely operator ID. operator_id: usize, /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present. - _marker: PhantomData, + _marker: PhantomData<(Input, Bu)>, } -impl Batcher for MergeBatcher +impl Batcher for MergeBatcher where C: ContainerBuilder + Default + for<'a> PushInto<&'a mut Input>, M: Merger, M::Time: Timestamp, + Bu: Builder, { type Input = Input; type Time = M::Time; @@ -85,7 +86,7 @@ where // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal(&mut self, upper: Antichain) -> Bu::Output { // Finish while let Some(chunk) = self.chunker.finish() { let chunk = std::mem::take(chunk); @@ -115,7 +116,7 @@ where self.stash.clear(); let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum())); - let seal = B::seal(&mut readied, description); + let seal = Bu::seal(&mut readied, description); self.lower = upper; seal } @@ -127,7 +128,7 @@ where } } -impl MergeBatcher +impl MergeBatcher where M: Merger, { @@ -194,7 +195,7 @@ where } } -impl Drop for MergeBatcher +impl Drop for MergeBatcher where M: Merger, { diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 5035a229c..0dac563bf 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -24,7 +24,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine>>>; /// A batcher using ordered lists. -pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; +pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>, Bu>; /// A builder using ordered lists. pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -34,14 +34,14 @@ pub type RcOrdValBuilder = RcBuilder = Spine>>>; /// A batcher for columnar storage. -pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; +pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>, Bu>; /// A builder for columnar storage. pub type ColValBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine>>>; /// A batcher for ordered lists. -pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>; +pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>, Bu>; /// A builder for ordered lists. pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; @@ -51,14 +51,14 @@ pub type RcOrdKeyBuilder = RcBuilder /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine>>>; /// A batcher for columnar storage -pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>; +pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>, Bu>; /// A builder for columnar storage pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine>>>; /// A batcher for columnar storage. -pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColMerger<(::Owned,::Owned),T,R>>; +pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColMerger<(::Owned,::Owned),T,R>, Bu>; /// A builder for columnar storage. pub type PreferredBuilder = RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>; diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index 48ce98899..d3806d127 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -24,7 +24,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine>>>; /// A batcher for ordered lists. -pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; +pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>, Bu>; /// A builder for ordered lists. pub type VecBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -34,7 +34,7 @@ pub type VecBuilder = RcBuilder, Vec< /// A trace implementation backed by columnar storage. pub type ColSpine = Spine>>>; /// A batcher for columnar storage. -pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; +pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>, Bu>; /// A builder for columnar storage. pub type ColBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 198f99bb6..fdb3f466c 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -275,7 +275,10 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized { } /// Functionality for collecting and batching updates. -pub trait Batcher { +pub trait Batcher +where + B: Builder +{ /// Type pushed into the batcher. type Input; /// Type produced by the batcher. @@ -287,7 +290,7 @@ pub trait Batcher { /// Adds an unordered container of elements to the batcher. fn push_container(&mut self, batch: &mut Self::Input); /// Returns all updates not greater or equal to an element of `upper`. - fn seal>(&mut self, upper: Antichain) -> B::Output; + fn seal(&mut self, upper: Antichain) -> B::Output; /// Returns the lower envelope of contained update times. fn frontier(&mut self) -> timely::progress::frontier::AntichainRef; } diff --git a/differential-dataflow/tests/trace.rs b/differential-dataflow/tests/trace.rs index 033da8bf7..60acafde5 100644 --- a/differential-dataflow/tests/trace.rs +++ b/differential-dataflow/tests/trace.rs @@ -12,7 +12,7 @@ fn get_trace() -> ValSpine { let op_info = OperatorInfo::new(0, 0, [].into()); let mut trace = IntegerTrace::new(op_info, None, None); { - let mut batcher = ValBatcher::::new(None, 0); + let mut batcher = ValBatcher::::new(None, 0); batcher.push_container(&mut vec![ ((1, 2), 0, 1), @@ -21,7 +21,7 @@ fn get_trace() -> ValSpine { ]); let batch_ts = &[1, 2, 3]; - let batches = batch_ts.iter().map(move |i| batcher.seal::(Antichain::from_elem(*i))); + let batches = batch_ts.iter().map(move |i| batcher.seal(Antichain::from_elem(*i))); for b in batches { trace.insert(b); } diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index f9776d17a..13481675f 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -41,7 +41,7 @@ fn main() { let (input, graph) = scope.new_collection(); // each edge should exist in both directions. - let graph = graph.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let graph = graph.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); match program.as_str() { "tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(), @@ -94,10 +94,10 @@ fn tc>(edges: &EdgeArranged) -> C let result = inner .map(|(x,y)| (y,x)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_y,&x,&z| Some((x, z))) .concat(&edges.as_collection(|&k,&v| (k,v))) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -121,12 +121,12 @@ fn sg>(edges: &EdgeArranged) -> C let result = inner - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) .concat(&peers) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/experiments/src/bin/graspan1.rs b/experiments/src/bin/graspan1.rs index e93bb5381..413eb4554 100644 --- a/experiments/src/bin/graspan1.rs +++ b/experiments/src/bin/graspan1.rs @@ -31,7 +31,7 @@ fn main() { let (n_handle, nodes) = scope.new_collection(); let (e_handle, edges) = scope.new_collection(); - let edges = edges.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let edges = edges.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // a N c <- a N b && b E c // N(a,c) <- N(a,b), E(b, c) @@ -46,7 +46,7 @@ fn main() { let next = labels.join_core(&edges, |_b, a, c| Some((*c, *a))) .concat(&nodes) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() // .distinct_total_core::(); .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }); diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index 042e9f486..a871202f5 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -47,7 +47,7 @@ fn unoptimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias, value_alias) = scope @@ -60,14 +60,14 @@ fn unoptimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); - let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // VA(a,b) <- VF(x,a),VF(x,b) // VA(a,b) <- VF(x,a),MA(x,y),VF(y,b) let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))); let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&value_alias_next); @@ -77,16 +77,16 @@ fn unoptimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))); let value_flow_next = value_flow_next - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -95,12 +95,12 @@ fn unoptimized() { let memory_alias_next: Collection<_,_,Present> = value_alias_next .join_core(&dereference, |_x,&y,&a| Some((y,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_y,&a,&b| Some((a,b))); let memory_alias_next: Collection<_,_,Present> = memory_alias_next - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -172,7 +172,7 @@ fn optimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias) = scope @@ -185,8 +185,8 @@ fn optimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); - let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // VF(a,a) <- // VF(a,b) <- A(a,x),VF(x,b) @@ -194,13 +194,13 @@ fn optimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -209,9 +209,9 @@ fn optimized() { let value_flow_deref = value_flow .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_x,&a,&b| Some((a,b))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // MA(a,b) <- VFD(x,a),VFD(y,b) // MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b) @@ -222,10 +222,10 @@ fn optimized() { let memory_alias_next = memory_alias_arranged .join_core(&value_flow_deref, |_x,&y,&a| Some((y,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_deref, |_y,&a,&b| Some((a,b))) .concat(&memory_alias_next) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; From f137beac32b9f4fad0a88194c2056276c422780d Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 29 May 2025 12:23:04 +0200 Subject: [PATCH 2/2] Absorb the builder into the batcher Signed-off-by: Moritz Hoffmann --- differential-dataflow/examples/columnar.rs | 4 +- differential-dataflow/examples/spines.rs | 16 +++---- .../src/operators/arrange/arrangement.rs | 45 ++++++++----------- .../src/operators/consolidate.rs | 11 +++-- .../trace/implementations/merge_batcher.rs | 4 +- differential-dataflow/src/trace/mod.rs | 18 +++----- experiments/src/bin/deals.rs | 12 ++--- experiments/src/bin/graspan1.rs | 4 +- experiments/src/bin/graspan2.rs | 38 ++++++++-------- 9 files changed, 69 insertions(+), 83 deletions(-) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 7cd25b89c..c603c350e 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -46,8 +46,8 @@ fn main() { let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); - let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); - let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); + let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_, ColKeyBuilder<_,_,_>>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); + let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_, ColKeyBuilder<_,_,_>>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs index eaa932b5a..5f759a44b 100644 --- a/differential-dataflow/examples/spines.rs +++ b/differential-dataflow/examples/spines.rs @@ -29,22 +29,22 @@ fn main() { match mode.as_str() { "new" => { use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine}; - let data = data.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); - let keys = keys.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); + let data = data.arrange::>, ColKeySpine<_,_,_>>(); + let keys = keys.arrange::>, ColKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "old" => { use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine}; - let data = data.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); - let keys = keys.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); + let data = data.arrange::>, OrdKeySpine<_,_,_>>(); + let keys = keys.arrange::>, OrdKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "rhh" => { use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine}; - let data = data.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); - let keys = keys.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); + let data = data.map(|x| HashWrapper { inner: x }).arrange::>, VecSpine<_,(),_,_>>(); + let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>, VecSpine<_,(),_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, @@ -54,11 +54,11 @@ fn main() { let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) - .arrange::, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>() + .arrange::>, PreferredSpine<[u8],[u8],_,_>>() .reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) - .arrange::, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>() + .arrange::>, PreferredSpine<[u8],u8,_,_>>() .reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>,PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 90c09db0b..ff8f731ec 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -45,8 +45,7 @@ use super::TraceAgent; /// computation, memory) required to produce and maintain an indexed representation of a collection. pub struct Arranged where - G::Timestamp: Lattice+Ord, - Tr: TraceReader+Clone, + Tr: TraceReader, { /// A stream containing arranged updates. /// @@ -62,7 +61,7 @@ where impl Clone for Arranged where - G: Scope, + G: Scope, Tr: TraceReader + Clone, { fn clone(&self) -> Self { @@ -347,27 +346,26 @@ where } /// A type that can be arranged as if a collection of updates. +/// +/// The type `C` is a container that a batcher can accept, and `G` is the scope. pub trait Arrange where G: Scope, - G::Timestamp: Lattice, { /// Arranges updates into a shared trace. - fn arrange(&self) -> Arranged> + fn arrange(&self) -> Arranged> where - Ba: Batcher + 'static, - Bu: Builder, + Ba: Batcher + 'static, Tr: Trace + 'static, Tr::Batch: Batch, { - self.arrange_named::("Arrange") + self.arrange_named::("Arrange") } /// Arranges updates into a shared trace, with a supplied name. - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher + 'static, - Bu: Builder, + Ba: Batcher + 'static, Tr: Trace + 'static, Tr::Batch: Batch, ; @@ -376,20 +374,18 @@ where impl Arrange> for Collection where G: Scope, - G::Timestamp: Lattice, K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData + Semigroup, { - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher, Time=G::Timestamp> + 'static, - Bu: Builder, + Ba: Batcher, Time=G::Timestamp, Output=Tr::Batch> + 'static, Tr: Trace + 'static, Tr::Batch: Batch, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name) + arrange_core::<_, _, Ba, _>(&self.inner, exchange, name) } } @@ -398,14 +394,12 @@ where /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// It uses the supplied parallelization contract to distribute the data, which does not need to /// be consistently by key (though this is the most common). -pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> +pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> where G: Scope, - G::Timestamp: Lattice, P: ParallelizationContract, - Ba: Batcher + 'static, + Ba: Batcher + 'static, Ba::Input: Container + Clone + 'static, - Bu: Builder, Tr: Trace+'static, Tr::Batch: Batch, { @@ -559,15 +553,14 @@ impl Arrange(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher, Time=G::Timestamp> + 'static, - Bu: Builder, + Ba: Batcher, Time=G::Timestamp, Output=Tr::Batch> + 'static, Tr: Trace + 'static, Tr::Batch: Batch, { let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name) + arrange_core::<_,_,Ba,_>(&self.map(|k| (k, ())).inner, exchange, name) } } @@ -598,7 +591,7 @@ where } fn arrange_by_key_named(&self, name: &str) -> Arranged>> { - self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) + self.arrange_named::>,_>(name) } } @@ -633,6 +626,6 @@ where fn arrange_by_self_named(&self, name: &str) -> Arranged>> { self.map(|k| (k, ())) - .arrange_named::,KeyBuilder<_,_,_>,_>(name) + .arrange_named::>,_>(name) } } diff --git a/differential-dataflow/src/operators/consolidate.rs b/differential-dataflow/src/operators/consolidate.rs index 349bad59d..d52222fb7 100644 --- a/differential-dataflow/src/operators/consolidate.rs +++ b/differential-dataflow/src/operators/consolidate.rs @@ -14,7 +14,7 @@ use crate::difference::Semigroup; use crate::Data; use crate::lattice::Lattice; -use crate::trace::{Batcher, Builder}; +use crate::trace::Batcher; /// Methods which require data be arrangeable. impl Collection @@ -46,21 +46,20 @@ where /// ``` pub fn consolidate(&self) -> Self { use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine}; - self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>>("Consolidate") + self.consolidate_named::>, KeySpine<_,_,_>>("Consolidate") } /// As `consolidate` but with the ability to name the operator and specify the trace type. - pub fn consolidate_named(&self, name: &str) -> Self + pub fn consolidate_named(&self, name: &str) -> Self where - Ba: Batcher, Time=G::Timestamp> + 'static, + Ba: Batcher, Time=G::Timestamp, Output=Tr::Batch> + 'static, Tr: crate::trace::Trace+'static, for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = D>, Tr::Batch: crate::trace::Batch, - Bu: Builder, { use crate::operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) - .arrange_named::(name) + .arrange_named::(name) .as_collection(|d, _| d.into_owned()) } diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index f23916292..aa8d5161c 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -47,7 +47,7 @@ pub struct MergeBatcher { _marker: PhantomData<(Input, Bu)>, } -impl Batcher for MergeBatcher +impl Batcher for MergeBatcher where C: ContainerBuilder + Default + for<'a> PushInto<&'a mut Input>, M: Merger, @@ -56,7 +56,7 @@ where { type Input = Input; type Time = M::Time; - type Output = M::Chunk; + type Output = Bu::Output; fn new(logger: Option, operator_id: usize) -> Self { Self { diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index fdb3f466c..1559ebe31 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -175,12 +175,12 @@ pub trait TraceReader { /// An append-only collection of `(key, val, time, diff)` tuples. /// /// The trace must pretend to look like a collection of `(Key, Val, Time, isize)` tuples, but is permitted -/// to introduce new types `KeyRef`, `ValRef`, and `TimeRef` which can be dereference to the types above. +/// to introduce new types `KeyRef`, `ValRef`, and `TimeRef` which can be dereferenced to the types above. /// /// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need /// to return them. pub trait Trace : TraceReader -where ::Batch: Batch { +{ /// Allocates a new empty trace. fn new( @@ -222,10 +222,7 @@ where ::Batch: Batch { /// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is /// especially useful for views derived from other sources in ways that prevent the construction of batches /// from the type of data in the view (for example, filtered views, or views with extended time coordinates). -pub trait BatchReader -where - Self: ::std::marker::Sized, -{ +pub trait BatchReader { /// Key by which updates are indexed. type Key<'a>: Copy + Clone + Ord; /// Values associated with keys. @@ -274,11 +271,8 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized { fn empty(lower: Antichain, upper: Antichain) -> Self; } -/// Functionality for collecting and batching updates. -pub trait Batcher -where - B: Builder -{ +/// Functionality for collecting and batching updates and turning them into batches. +pub trait Batcher { /// Type pushed into the batcher. type Input; /// Type produced by the batcher. @@ -290,7 +284,7 @@ where /// Adds an unordered container of elements to the batcher. fn push_container(&mut self, batch: &mut Self::Input); /// Returns all updates not greater or equal to an element of `upper`. - fn seal(&mut self, upper: Antichain) -> B::Output; + fn seal(&mut self, upper: Antichain) -> Self::Output; /// Returns the lower envelope of contained update times. fn frontier(&mut self) -> timely::progress::frontier::AntichainRef; } diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index 13481675f..342fd1819 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -41,7 +41,7 @@ fn main() { let (input, graph) = scope.new_collection(); // each edge should exist in both directions. - let graph = graph.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let graph = graph.arrange::>, ValSpine<_,_,_,_>>(); match program.as_str() { "tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(), @@ -94,10 +94,10 @@ fn tc>(edges: &EdgeArranged) -> C let result = inner .map(|(x,y)| (y,x)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&edges, |_y,&x,&z| Some((x, z))) .concat(&edges.as_collection(|&k,&v| (k,v))) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::>, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -121,12 +121,12 @@ fn sg>(edges: &EdgeArranged) -> C let result = inner - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) .concat(&peers) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::>, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/experiments/src/bin/graspan1.rs b/experiments/src/bin/graspan1.rs index 413eb4554..4b6c101bd 100644 --- a/experiments/src/bin/graspan1.rs +++ b/experiments/src/bin/graspan1.rs @@ -31,7 +31,7 @@ fn main() { let (n_handle, nodes) = scope.new_collection(); let (e_handle, edges) = scope.new_collection(); - let edges = edges.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let edges = edges.arrange::>, ValSpine<_,_,_,_>>(); // a N c <- a N b && b E c // N(a,c) <- N(a,b), E(b, c) @@ -46,7 +46,7 @@ fn main() { let next = labels.join_core(&edges, |_b, a, c| Some((*c, *a))) .concat(&nodes) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() // .distinct_total_core::(); .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }); diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index a871202f5..50dc65fef 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -47,7 +47,7 @@ fn unoptimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let dereference = dereference.arrange::>, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias, value_alias) = scope @@ -60,14 +60,14 @@ fn unoptimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); - let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let value_flow_arranged = value_flow.arrange::>, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::>, ValSpine<_,_,_,_>>(); // VA(a,b) <- VF(x,a),VF(x,b) // VA(a,b) <- VF(x,a),MA(x,y),VF(y,b) let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))); let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&value_alias_next); @@ -77,16 +77,16 @@ fn unoptimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))); let value_flow_next = value_flow_next - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -95,12 +95,12 @@ fn unoptimized() { let memory_alias_next: Collection<_,_,Present> = value_alias_next .join_core(&dereference, |_x,&y,&a| Some((y,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_y,&a,&b| Some((a,b))); let memory_alias_next: Collection<_,_,Present> = memory_alias_next - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -172,7 +172,7 @@ fn optimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let dereference = dereference.arrange::>, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias) = scope @@ -185,8 +185,8 @@ fn optimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); - let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let value_flow_arranged = value_flow.arrange::>, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::>, ValSpine<_,_,_,_>>(); // VF(a,a) <- // VF(a,b) <- A(a,x),VF(x,b) @@ -194,13 +194,13 @@ fn optimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -209,9 +209,9 @@ fn optimized() { let value_flow_deref = value_flow .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_x,&a,&b| Some((a,b))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + .arrange::>, ValSpine<_,_,_,_>>(); // MA(a,b) <- VFD(x,a),VFD(y,b) // MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b) @@ -222,10 +222,10 @@ fn optimized() { let memory_alias_next = memory_alias_arranged .join_core(&value_flow_deref, |_x,&y,&a| Some((y,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::>, ValSpine<_,_,_,_>>() .join_core(&value_flow_deref, |_y,&a,&b| Some((a,b))) .concat(&memory_alias_next) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ;