From 2aa27dd6ce4f7690ad712bafc38d6684fe936045 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Tue, 4 May 2021 16:06:10 -0500 Subject: [PATCH 1/9] Fixed deprecated_semver --- src/trace/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 433a75154..c7e76fd6c 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -94,7 +94,7 @@ pub trait TraceReader { fn set_logical_compaction(&mut self, frontier: AntichainRef); /// Deprecated form of `set_logical_compaction`. - #[deprecated(since = "0.11", note = "please use `set_logical_compaction`")] + #[deprecated(since = "0.11.0", note = "please use `set_logical_compaction`")] fn advance_by(&mut self, frontier: AntichainRef) { self.set_logical_compaction(frontier); } @@ -108,7 +108,7 @@ pub trait TraceReader { fn get_logical_compaction(&mut self) -> AntichainRef; /// Deprecated form of `get_logical_compaction`. - #[deprecated(since = "0.11", note = "please use `get_logical_compaction`")] + #[deprecated(since = "0.11.0", note = "please use `get_logical_compaction`")] fn advance_frontier(&mut self) -> AntichainRef { self.get_logical_compaction() } @@ -130,7 +130,7 @@ pub trait TraceReader { fn set_physical_compaction(&mut self, frontier: AntichainRef); /// Deprecated form of `set_physical_compaction`. - #[deprecated(since = "0.11", note = "please use `set_physical_compaction`")] + #[deprecated(since = "0.11.0", note = "please use `set_physical_compaction`")] fn distinguish_since(&mut self, frontier: AntichainRef) { self.set_physical_compaction(frontier); } @@ -144,7 +144,7 @@ pub trait TraceReader { fn get_physical_compaction(&mut self) -> AntichainRef; /// Deprecated form of `get_physical_compaction`. - #[deprecated(since = "0.11", note = "please use `get_physical_compaction`")] + #[deprecated(since = "0.11.0", note = "please use `get_physical_compaction`")] fn distinguish_frontier(&mut self) -> AntichainRef { self.get_physical_compaction() } From e4610005e14ece0570b821718ff1bc32ad115034 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Tue, 4 May 2021 16:34:33 -0500 Subject: [PATCH 2/9] Added arranged methods for threshold, distinct and count --- src/operators/reduce.rs | 142 +++++++++++++++++++++++++++++++++++----- 1 file changed, 125 insertions(+), 17 deletions(-) diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 135128efe..f96d8d099 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -131,7 +131,28 @@ pub trait Threshold where G::Timestamp: Lattic } /// A `threshold` with the ability to name the operator. - fn threshold_namedR2+'static>(&self, name: &str, thresh: F) -> Collection; + fn threshold_namedR2+'static>(&self, name: &str, thresh: F) -> Collection { + self.threshold_core::>(name, thresh) + .as_collection(|key: &K, &()| key.clone()) + } + + /// A `threshold` that returns the manifested arrangement + fn threshold_arranged(&self, threshold: F) -> Arranged>> + where + R2: Abelian, + F: FnMut(&K, &R1) -> R2 + 'static, + { + self.threshold_core("Threshold", threshold) + } + + /// The inner logic behind `threshold`, allows naming the operator and returns an arrangement + fn threshold_core(&self, name: &str, threshold: F) -> Arranged> + where + R2: Abelian, + F: FnMut(&K, &R1) -> R2 + 'static, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + Tr::Cursor: Cursor; /// Reduces the collection to one occurrence of each distinct element. /// @@ -163,28 +184,74 @@ pub trait Threshold where G::Timestamp: Lattic /// type is something other than an `isize` integer, for example perhaps an /// `i32`. fn distinct_core>(&self) -> Collection { - self.threshold_named("Distinct", |_,_| R2::from(1i8)) + self.distinct_arranged() + .as_collection(|key, &()| key.clone()) + } + + /// Distinct for general integer differences that returns an [arrangement](Arranged) + /// + /// This method allows `distinct` to produce collections whose difference + /// type is something other than an `isize` integer, for example perhaps an + /// `i32`. + fn distinct_arranged(&self) -> Arranged>> + where + R2: Abelian + From, + { + self.distinct_arranged_core("Distinct") + } + + /// Distinct for general integer differences that returns an [arrangement](Arranged) + /// + /// This method allows `distinct` to produce collections whose difference + /// type is something other than an `isize` integer, for example perhaps an + /// `i32`. + fn distinct_arranged_core(&self, name: &str) -> Arranged> + where + R2: Abelian + From, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + Tr::Cursor: Cursor, + { + self.threshold_core(name, |_, _| R2::from(1i8)) } } -impl Threshold for Collection -where G::Timestamp: Lattice+Ord { - fn threshold_namedR2+'static>(&self, name: &str, thresh: F) -> Collection { +impl Threshold + for Collection +where + G::Timestamp: Lattice + Ord, +{ + fn threshold_core(&self, name: &str, thresh: F) -> Arranged> + where + R2: Abelian, + F: FnMut(&K, &R1) -> R2 + 'static, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + Tr::Cursor: Cursor, + { self.arrange_by_self_named(&format!("Arrange: {}", name)) - .threshold_named(name, thresh) + .threshold_core(name, thresh) } } impl Threshold for Arranged where - G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, + G::Timestamp: Lattice + Ord, + T1: TraceReader + Clone + 'static, T1::Batch: BatchReader, T1::Cursor: Cursor, { - fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { - self.reduce_abelian::<_,DefaultKeyTrace<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) - .as_collection(|k,_| k.clone()) + fn threshold_core(&self, name: &str, mut thresh: F) -> Arranged> + where + R2: Abelian, + F: FnMut(&K, &R1) -> R2 + 'static, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + Tr::Cursor: Cursor, + { + self.reduce_abelian::<_, Tr>(name, move |k, s, t| { + t.push(((), thresh(k, &s[0].1))); + }) } } @@ -210,16 +277,52 @@ pub trait Count where G::Timestamp: Lattice+Ord /// }); /// } /// ``` - fn count(&self) -> Collection; + fn count(&self) -> Collection { + self.count_core::<_, DefaultValTrace<_, _, _, _>>() + .as_collection(|key, count| (key.clone(), count.clone())) + } + + /// Counts the number of occurrences of each element, returning an [arrangement](Arranged) + /// + /// # Examples + /// + /// ``` + /// extern crate timely; + /// extern crate differential_dataflow; + /// + /// use differential_dataflow::input::Input; + /// use differential_dataflow::operators::Count; + /// + /// fn main() { + /// ::timely::example(|scope| { + /// // report the number of occurrences of each key + /// scope.new_collection_from(1 .. 10).1 + /// .map(|x| x / 3) + /// .count(); + /// }); + /// } + /// ``` + fn count_core(&self) -> Arranged> + where + R2: Abelian + From, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + Tr::Cursor: Cursor; } impl Count for Collection where G::Timestamp: Lattice+Ord, { - fn count(&self) -> Collection { + fn count_core(&self) -> Arranged> + where + R2: Abelian + From, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + Tr::Cursor: Cursor, + { self.arrange_by_self_named("Arrange: Count") - .count() + .count_core() } } @@ -230,9 +333,14 @@ where T1::Batch: BatchReader, T1::Cursor: Cursor, { - fn count(&self) -> Collection { - self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), 1))) - .as_collection(|k,c| (k.clone(), c.clone())) + fn count_core(&self) -> Arranged> + where + R2: Abelian + From, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + Tr::Cursor: Cursor, + { + self.reduce_abelian::<_, Tr>("Count", |_k, s, t| t.push((s[0].1.clone(), R2::from(1)))) } } From aed365f423ff7caf95b5a395b6d6ca4238894800 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Wed, 5 May 2021 14:52:41 -0500 Subject: [PATCH 3/9] Updated propagate_core documentation --- src/algorithms/graphs/propagate.rs | 5 +- src/operators/consolidate.rs | 81 ++++++++++++++++++------------ src/operators/reduce.rs | 31 ++++++------ 3 files changed, 67 insertions(+), 50 deletions(-) diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index c94e49814..550db3e26 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -53,8 +53,7 @@ use operators::arrange::arrangement::Arranged; /// Propagates labels forward, retaining the minimum label. /// /// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows -/// a method `logic` to specify the rounds in which we introduce various labels. The output -/// of `logic should be a number in the interval [0,64], +/// a method `logic` to specify the rounds in which we introduce various labels. pub fn propagate_core(edges: &Arranged, nodes: &Collection, logic: F) -> Collection where G: Scope, @@ -100,7 +99,7 @@ where let labels = proposals .concat(&nodes) - .reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8)))); + .reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1)))); let propagate: Collection<_, (N, L), R> = labels diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index d36f04905..027fd7343 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -6,18 +6,27 @@ //! underlying system can more clearly see that no work must be done in the later case, and we can //! drop out of, e.g. iterative computations. -use timely::dataflow::Scope; - -use ::{Collection, ExchangeData, Hashable}; -use ::difference::Semigroup; -use operators::arrange::arrangement::Arrange; +use crate::{ + difference::Semigroup, + lattice::Lattice, + operators::arrange::{Arrange, Arranged, TraceAgent}, + trace::{implementations::ord::OrdKeySpine, Batch, Cursor, Trace, TraceReader}, + AsCollection, Collection, ExchangeData, Hashable, +}; +use timely::dataflow::{channels::pact::Pipeline, operators::Operator, Scope}; /// An extension method for consolidating weighted streams. -pub trait Consolidate : Sized { +pub trait Consolidate: Sized +where + S: Scope, + S::Timestamp: Lattice, + D: ExchangeData + Hashable, + R: Semigroup, +{ /// Aggregates the weights of equal records into at most one record. /// - /// This method uses the type `D`'s `hashed()` method to partition the data. The data are - /// accumulated in place, each held back until their timestamp has completed. + /// This method uses the type `D`'s [`hashed()`](Hashable) method to partition the data. + /// The data is accumulated in place and held back until its timestamp has completed. /// /// # Examples /// @@ -40,30 +49,44 @@ pub trait Consolidate : Sized { /// }); /// } /// ``` - fn consolidate(&self) -> Self { + fn consolidate(&self) -> Collection { self.consolidate_named("Consolidate") } - /// As `consolidate` but with the ability to name the operator. - fn consolidate_named(&self, name: &str) -> Self; + /// A `consolidate` but with the ability to name the operator. + fn consolidate_named(&self, name: &str) -> Collection { + self.consolidate_core::>(name) + .as_collection(|data, &()| data.clone()) + } + + /// Aggregates the weights of equal records into at most one record, + /// returning the intermediate [arrangement](Arranged) + fn consolidate_core(&self, name: &str) -> Arranged> + where + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + Tr::Cursor: Cursor; } -impl Consolidate for Collection +impl Consolidate for Collection where - D: ExchangeData+Hashable, - R: ExchangeData+Semigroup, - G::Timestamp: ::lattice::Lattice+Ord, - { - fn consolidate_named(&self, name: &str) -> Self { - use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; - self.map(|k| (k, ())) - .arrange_named::>(name) - .as_collection(|d: &D, _| d.clone()) + S: Scope, + S::Timestamp: Lattice + Ord, + D: ExchangeData + Hashable, + R: ExchangeData + Semigroup, +{ + fn consolidate_core(&self, name: &str) -> Arranged> + where + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + Tr::Cursor: Cursor, + { + self.map(|key| (key, ())).arrange_named(name) } } /// An extension method for consolidating weighted streams. -pub trait ConsolidateStream { +pub trait ConsolidateStream { /// Aggregates the weights of equal records. /// /// Unlike `consolidate`, this method does not exchange data and does not @@ -98,19 +121,13 @@ pub trait ConsolidateStream { impl ConsolidateStream for Collection where - D: ExchangeData+Hashable, - R: ExchangeData+Semigroup, - G::Timestamp: ::lattice::Lattice+Ord, - { + D: ExchangeData + Hashable, + R: ExchangeData + Semigroup, + G::Timestamp: ::lattice::Lattice + Ord, +{ fn consolidate_stream(&self) -> Self { - - use timely::dataflow::channels::pact::Pipeline; - use timely::dataflow::operators::Operator; - use collection::AsCollection; - self.inner .unary(Pipeline, "ConsolidateStream", |_cap, _info| { - let mut vector = Vec::new(); move |input, output| { input.for_each(|time, data| { diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index f96d8d099..7686ea7b7 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -151,8 +151,8 @@ pub trait Threshold where G::Timestamp: Lattic R2: Abelian, F: FnMut(&K, &R1) -> R2 + 'static, Tr: Trace + TraceReader + 'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor; + Tr::Batch: Batch, + Tr::Cursor: Cursor; /// Reduces the collection to one occurrence of each distinct element. /// @@ -209,8 +209,8 @@ pub trait Threshold where G::Timestamp: Lattic where R2: Abelian + From, Tr: Trace + TraceReader + 'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, + Tr::Cursor: Cursor, { self.threshold_core(name, |_, _| R2::from(1i8)) } @@ -226,8 +226,8 @@ where R2: Abelian, F: FnMut(&K, &R1) -> R2 + 'static, Tr: Trace + TraceReader + 'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, + Tr::Cursor: Cursor, { self.arrange_by_self_named(&format!("Arrange: {}", name)) .threshold_core(name, thresh) @@ -246,8 +246,8 @@ where R2: Abelian, F: FnMut(&K, &R1) -> R2 + 'static, Tr: Trace + TraceReader + 'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, + Tr::Cursor: Cursor, { self.reduce_abelian::<_, Tr>(name, move |k, s, t| { t.push(((), thresh(k, &s[0].1))); @@ -292,13 +292,14 @@ pub trait Count where G::Timestamp: Lattice+Ord /// /// use differential_dataflow::input::Input; /// use differential_dataflow::operators::Count; + /// use differential_dataflow::operators::arrange::OrdValSpine; /// /// fn main() { /// ::timely::example(|scope| { /// // report the number of occurrences of each key /// scope.new_collection_from(1 .. 10).1 /// .map(|x| x / 3) - /// .count(); + /// .count_core::(); /// }); /// } /// ``` @@ -306,8 +307,8 @@ pub trait Count where G::Timestamp: Lattice+Ord where R2: Abelian + From, Tr: Trace + TraceReader + 'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor; + Tr::Batch: Batch, + Tr::Cursor: Cursor; } impl Count for Collection @@ -318,8 +319,8 @@ where where R2: Abelian + From, Tr: Trace + TraceReader + 'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, + Tr::Cursor: Cursor, { self.arrange_by_self_named("Arrange: Count") .count_core() @@ -337,8 +338,8 @@ where where R2: Abelian + From, Tr: Trace + TraceReader + 'static, - Tr::Batch: Batch, - Tr::Cursor: Cursor, + Tr::Batch: Batch, + Tr::Cursor: Cursor, { self.reduce_abelian::<_, Tr>("Count", |_k, s, t| t.push((s[0].1.clone(), R2::from(1)))) } From 3450dd71e3658ed88393142e204a1938a54dcf99 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Wed, 5 May 2021 15:20:01 -0500 Subject: [PATCH 4/9] Arranged consolidation --- .vscode/settings.json | 5 +++++ src/operators/consolidate.rs | 30 +++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..097627654 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "cSpell.words": [ + "Hashable" + ] +} \ No newline at end of file diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 027fd7343..8b0d84875 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -59,6 +59,34 @@ where .as_collection(|data, &()| data.clone()) } + /// A `consolidate` that returns the intermediate [arrangement](Arranged) + /// + /// # Example + /// + /// ```rust + /// use differential_dataflow::{ + /// input::Input, + /// operators::{Consolidate, JoinCore}, + /// }; + /// + /// timely::example(|scope| { + /// let (_, collection) = scope.new_collection_from(0..10u32); + /// + /// let keys = collection + /// .flat_map(|x| (0..x)) + /// .concat(&collection.negate()) + /// .consolidate_arranged(); + /// + /// collection + /// .map(|x| (x, x * 2)) + /// .join_core(&keys, |&key, &(), &value| (key, value)) + /// .inspect(|x| println!("{:?}", x)); + /// }); + /// ``` + fn consolidate_arranged(&self) -> Arranged>> { + self.consolidate_core::>("Consolidate") + } + /// Aggregates the weights of equal records into at most one record, /// returning the intermediate [arrangement](Arranged) fn consolidate_core(&self, name: &str) -> Arranged> @@ -123,7 +151,7 @@ impl ConsolidateStream for Collection where D: ExchangeData + Hashable, R: ExchangeData + Semigroup, - G::Timestamp: ::lattice::Lattice + Ord, + G::Timestamp: Lattice + Ord, { fn consolidate_stream(&self) -> Self { self.inner From fc9c9ff8f42074ef8fa46c282bff1b3e0fbee996 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Thu, 6 May 2021 19:05:54 -0500 Subject: [PATCH 5/9] Added debug assertion & used ptr::add --- src/consolidation.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index 379d25256..5e87ccb54 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -55,8 +55,8 @@ pub fn consolidate_slice(slice: &mut [(T, R)]) -> usize { assert!(offset < index); // LOOP INVARIANT: offset < index - let ptr1 = slice.as_mut_ptr().offset(offset as isize); - let ptr2 = slice.as_mut_ptr().offset(index as isize); + let ptr1 = slice.as_mut_ptr().add(offset); + let ptr2 = slice.as_mut_ptr().add(index); if (*ptr1).0 == (*ptr2).0 { (*ptr1).1.plus_equals(&(*ptr2).1); @@ -65,7 +65,7 @@ pub fn consolidate_slice(slice: &mut [(T, R)]) -> usize { if !(*ptr1).1.is_zero() { offset += 1; } - let ptr1 = slice.as_mut_ptr().offset(offset as isize); + let ptr1 = slice.as_mut_ptr().add(offset); std::mem::swap(&mut *ptr1, &mut *ptr2); } } @@ -118,8 +118,9 @@ pub fn consolidate_updates_slice(slice: &mut [(D, unsafe { // LOOP INVARIANT: offset < index - let ptr1 = slice.as_mut_ptr().offset(offset as isize); - let ptr2 = slice.as_mut_ptr().offset(index as isize); + debug_assert!(offset < index); + let ptr1 = slice.as_mut_ptr().add(offset); + let ptr2 = slice.as_mut_ptr().add(index); if (*ptr1).0 == (*ptr2).0 && (*ptr1).1 == (*ptr2).1 { (*ptr1).2.plus_equals(&(*ptr2).2); @@ -128,7 +129,7 @@ pub fn consolidate_updates_slice(slice: &mut [(D, if !(*ptr1).2.is_zero() { offset += 1; } - let ptr1 = slice.as_mut_ptr().offset(offset as isize); + let ptr1 = slice.as_mut_ptr().add(offset); std::mem::swap(&mut *ptr1, &mut *ptr2); } From 199809b5ae07bcd187d292261752d4237d479850 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Thu, 6 May 2021 19:09:24 -0500 Subject: [PATCH 6/9] Fixed doc tests --- src/operators/consolidate.rs | 2 +- src/operators/reduce.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 8b0d84875..52142141c 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -79,7 +79,7 @@ where /// /// collection /// .map(|x| (x, x * 2)) - /// .join_core(&keys, |&key, &(), &value| (key, value)) + /// .join_core(&keys, |&key, &value, &()| (key, value)) /// .inspect(|x| println!("{:?}", x)); /// }); /// ``` diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 7686ea7b7..9c890722b 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -292,14 +292,14 @@ pub trait Count where G::Timestamp: Lattice+Ord /// /// use differential_dataflow::input::Input; /// use differential_dataflow::operators::Count; - /// use differential_dataflow::operators::arrange::OrdValSpine; + /// use differential_dataflow::trace::implementations::ord::OrdValSpine; /// /// fn main() { /// ::timely::example(|scope| { /// // report the number of occurrences of each key - /// scope.new_collection_from(1 .. 10).1 + /// scope.new_collection_from(1..10).1 /// .map(|x| x / 3) - /// .count_core::(); + /// .count_core::>(); /// }); /// } /// ``` From ea5fd2d662f75cf87fe1ed2dde7a0cb430723003 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Thu, 6 May 2021 19:10:53 -0500 Subject: [PATCH 7/9] Lints on trace layers --- src/trace/layers/mod.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/trace/layers/mod.rs b/src/trace/layers/mod.rs index 29ebd46e5..468a21777 100644 --- a/src/trace/layers/mod.rs +++ b/src/trace/layers/mod.rs @@ -106,7 +106,7 @@ pub trait Cursor { fn reposition(&mut self, storage: &Storage, lower: usize, upper: usize); } -/// Reports the number of elements satisfing the predicate. +/// Reports the number of elements satisfying the predicate. /// /// This methods *relies strongly* on the assumption that the predicate /// stays false once it becomes false, a joint property of the predicate @@ -116,7 +116,7 @@ pub fn advancebool>(slice: &[T], function: F) -> usize { let small_limit = 8; - // Exponential seach if the answer isn't within `small_limit`. + // Exponential search if the answer isn't within `small_limit`. if slice.len() > small_limit && function(&slice[small_limit]) { // start with no advance @@ -127,16 +127,16 @@ pub fn advancebool>(slice: &[T], function: F) -> usize { let mut step = 1; while index + step < slice.len() && function(&slice[index + step]) { index += step; - step = step << 1; + step <<= 1; } // advance in exponentially shrinking steps. - step = step >> 1; + step >>= 1; while step > 0 { if index + step < slice.len() && function(&slice[index + step]) { index += step; } - step = step >> 1; + step >>= 1; } index += 1; @@ -145,7 +145,7 @@ pub fn advancebool>(slice: &[T], function: F) -> usize { index } else { - let limit = std::cmp::min(slice.len(), small_limit); - slice[..limit].iter().filter(|x| function(*x)).count() + let limit = slice.len().min(small_limit); + slice[..limit].iter().filter(|&x| function(x)).count() } } From 1ee97cd0ee5fb27e9f7adc114a19e2c6f2494919 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Thu, 6 May 2021 19:31:20 -0500 Subject: [PATCH 8/9] Fixed some lints in reduce --- src/input.rs | 15 ++++++++-- src/operators/reduce.rs | 63 ++++++++++++++++++++++++----------------- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/src/input.rs b/src/input.rs index d3cdff34f..13ce15c9d 100644 --- a/src/input.rs +++ b/src/input.rs @@ -247,7 +247,7 @@ impl InputSession { /// Adds to the weight of an element in the collection. pub fn update(&mut self, element: D, change: R) { if self.buffer.len() == self.buffer.capacity() { - if self.buffer.len() > 0 { + if !self.buffer.is_empty() { self.handle.send_batch(&mut self.buffer); } // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such. @@ -260,7 +260,7 @@ impl InputSession { pub fn update_at(&mut self, element: D, time: T, change: R) { assert!(self.time.less_equal(&time)); if self.buffer.len() == self.buffer.capacity() { - if self.buffer.len() > 0 { + if !self.buffer.is_empty() { self.handle.send_batch(&mut self.buffer); } // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such. @@ -301,6 +301,17 @@ impl InputSession { pub fn close(self) { } } +impl Default for InputSession +where + T: Timestamp + Clone, + D: Data, + R: Semigroup, +{ + fn default() -> Self { + Self::new() + } +} + impl Drop for InputSession { fn drop(&mut self) { self.flush(); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 9c890722b..8daee1bb9 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -586,8 +586,8 @@ where // this as long as it requires that there is only one capability for each message. let mut buffers = Vec::<(G::Timestamp, Vec<(T2::Val, G::Timestamp, T2::R)>)>::new(); let mut builders = Vec::new(); - for i in 0 .. capabilities.len() { - buffers.push((capabilities[i].time().clone(), Vec::new())); + for capability in capabilities.iter() { + buffers.push((capability.time().clone(), Vec::new())); builders.push(>::Builder::new()); } @@ -611,7 +611,7 @@ where // Determine the next key we will work on; could be synthetic, could be from a batch. let key1 = exposed.get(exposed_position).map(|x| x.0.clone()); - let key2 = batch_cursor.get_key(&batch_storage).map(|k| k.clone()); + let key2 = batch_cursor.get_key(&batch_storage).cloned(); let key = match (key1, key2) { (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), (Some(key1), None) => key1, @@ -741,7 +741,7 @@ where } // Exert trace maintenance if we have been so requested. - if let Some(mut fuel) = effort.clone() { + if let Some(mut fuel) = effort { output_writer.exert(&mut fuel); } } @@ -749,7 +749,7 @@ where ) }; - Arranged { stream: stream, trace: result_trace.unwrap() } + Arranged { stream, trace: result_trace.unwrap() } } } @@ -792,6 +792,7 @@ where /// Implementation based on replaying historical and new updates together. mod history_replay { + use std::convert::identity; use ::difference::Semigroup; use lattice::Lattice; use trace::Cursor; @@ -940,12 +941,19 @@ mod history_replay { // `input` or `output`. Finally, we may have synthetic times produced as the join of times // we consider in the course of evaluation. As long as any of these times exist, we need to // keep examining times. - while let Some(next_time) = [ batch_replay.time(), - times_slice.first(), - input_replay.time(), - output_replay.time(), - self.synth_times.last(), - ].iter().cloned().filter_map(|t| t).min().map(|t| t.clone()) { + while let Some(next_time) = [ + batch_replay.time(), + times_slice.first(), + input_replay.time(), + output_replay.time(), + self.synth_times.last(), + ] + .iter() + .cloned() + .filter_map(identity) + .min() + .cloned() + { // Advance input and output history replayers. This marks applicable updates as active. input_replay.step_while_time_is(&next_time); @@ -1003,7 +1011,10 @@ mod history_replay { // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use). debug_assert!(self.input_buffer.is_empty()); - meet.as_ref().map(|meet| input_replay.advance_buffer_by(&meet)); + if let Some(meet) = meet.as_ref() { + input_replay.advance_buffer_by(meet); + } + for &((value, ref time), ref diff) in input_replay.buffer().iter() { if time.less_equal(&next_time) { self.input_buffer.push((value, diff.clone())); @@ -1022,7 +1033,10 @@ mod history_replay { } crate::consolidation::consolidate(&mut self.input_buffer); - meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet)); + if let Some(meet) = meet.as_ref() { + output_replay.advance_buffer_by(meet); + } + for &((ref value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { self.output_buffer.push(((*value).clone(), diff.clone())); @@ -1042,7 +1056,7 @@ mod history_replay { crate::consolidation::consolidate(&mut self.output_buffer); // Apply user logic if non-empty input and see what happens! - if self.input_buffer.len() > 0 || self.output_buffer.len() > 0 { + if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() { logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer); self.input_buffer.clear(); self.output_buffer.clear(); @@ -1075,7 +1089,7 @@ mod history_replay { // The two locations are important, in that we will compact `output_produced` as we move // through times, but we cannot compact the output buffers because we need their actual // times. - if self.update_buffer.len() > 0 { + if !self.update_buffer.is_empty() { output_counter += 1; @@ -1142,17 +1156,14 @@ mod history_replay { self.synth_times.dedup(); } } - else { - - if interesting { - // We cannot process `next_time` now, and must delay it. - // - // I think we are probably only here because of an uninteresting time declared interesting, - // as initial interesting times are filtered to be in interval, and synthetic times are also - // filtered before introducing them to `self.synth_times`. - new_interesting.push(next_time.clone()); - debug_assert!(outputs.iter().any(|&(ref t,_)| t.less_equal(&next_time))) - } + else if interesting { + // We cannot process `next_time` now, and must delay it. + // + // I think we are probably only here because of an uninteresting time declared interesting, + // as initial interesting times are filtered to be in interval, and synthetic times are also + // filtered before introducing them to `self.synth_times`. + new_interesting.push(next_time.clone()); + debug_assert!(outputs.iter().any(|&(ref t,_)| t.less_equal(&next_time))); } // Update `meet` to track the meet of each source of times. From 43c29f657cfd4a1a98ade8fcc3bc03b9830a0299 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Sun, 9 May 2021 14:32:48 -0500 Subject: [PATCH 9/9] Fixed doctests --- src/operators/consolidate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 52142141c..cd51da2da 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -79,7 +79,7 @@ where /// /// collection /// .map(|x| (x, x * 2)) - /// .join_core(&keys, |&key, &value, &()| (key, value)) + /// .join_core(&keys, |&key, &value, &()| Some((key, value))) /// .inspect(|x| println!("{:?}", x)); /// }); /// ```