From 74a24195da286751dfc7f65d347bfef76d4fe28b Mon Sep 17 00:00:00 2001 From: Ariel Uy Date: Mon, 27 Feb 2023 15:54:54 -0800 Subject: [PATCH 1/3] Create Scan unindexed parallel iterator Write a parallel scan function that consumes a parallel iterator, then produces a new one. Add some benchmarking and testing. --- rayon-demo/Cargo.toml | 1 + rayon-demo/src/main.rs | 2 + rayon-demo/src/scan/mod.rs | 124 +++++++++++++++++++ src/iter/mod.rs | 11 ++ src/iter/scan.rs | 245 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 +- 6 files changed, 384 insertions(+), 1 deletion(-) create mode 100644 rayon-demo/src/scan/mod.rs create mode 100644 src/iter/scan.rs diff --git a/rayon-demo/Cargo.toml b/rayon-demo/Cargo.toml index 0b7a4f27b..5be620cea 100644 --- a/rayon-demo/Cargo.toml +++ b/rayon-demo/Cargo.toml @@ -17,6 +17,7 @@ rand = "0.9" rand_xorshift = "0.4" regex = "1" winit = "0.30" +ndarray = "0.15.6" [dependencies.serde] version = "1.0.85" diff --git a/rayon-demo/src/main.rs b/rayon-demo/src/main.rs index bc7f3fac7..f337c8797 100644 --- a/rayon-demo/src/main.rs +++ b/rayon-demo/src/main.rs @@ -35,6 +35,8 @@ mod str_split; mod tree; #[cfg(test)] mod vec_collect; +#[cfg(test)] +mod scan; #[cfg(test)] extern crate test; diff --git a/rayon-demo/src/scan/mod.rs b/rayon-demo/src/scan/mod.rs new file mode 100644 index 000000000..1589422ba --- /dev/null +++ b/rayon-demo/src/scan/mod.rs @@ -0,0 +1,124 @@ +use ndarray::{Array, Dim}; +use rayon::iter::*; +use std::time::{Duration, Instant}; +use std::num::Wrapping; + +const SIZE: usize = 10000; + +enum Procs { + Sequential, + Parallel, +} + +fn scan_sequential(init: I, id: T, scan_op: P) -> Vec +where + T: Clone, + I: Fn() -> T, + P: FnMut(&mut T, &T) -> Option, +{ + let v = vec![init(); SIZE]; + let scan = v.iter().scan(id, scan_op); + scan.collect() +} + +fn scan_parallel(init: I, id: T, scan_op: P) -> Vec +where + T: Clone + Send + Sync, + I: Fn() -> T, + P: Fn(&T, &T) -> T + Sync, +{ + let v = vec![init(); SIZE]; + let scan = v.into_par_iter().with_min_len(SIZE / 100).scan(&scan_op, id); + scan.collect() +} + +/******* Addition with artificial delay *******/ + +const DELAY: Duration = Duration::from_nanos(10); +fn wait() -> i32 { + let time = Instant::now(); + + let mut sum = 0; + while time.elapsed() < DELAY { + sum += 1; + } + sum +} + +fn scan_add(procs: Procs) -> Vec { + let init = || 2; + let id = 0; + + match procs { + Procs::Sequential => { + let f = |state: &mut i32, x: &i32| { + test::black_box(wait()); + *state += x; + Some(*state) + }; + scan_sequential(init, id, f) + } + Procs::Parallel => { + let f = |x: &i32, y: &i32| { + test::black_box(wait()); + *x + *y + }; + scan_parallel(init, id, f) + } + } +} + +#[bench] +fn scan_add_sequential(b: &mut test::Bencher) { + b.iter(|| scan_add(Procs::Sequential)); +} + +#[bench] +fn scan_add_parallel(b: &mut test::Bencher) { + b.iter(|| scan_add(Procs::Parallel)); +} + +#[test] +fn test_scan_add() { + assert_eq!(scan_add(Procs::Sequential), scan_add(Procs::Parallel)); +} + +/******** Matrix multiplication with wrapping arithmetic *******/ + +type Matrix = Array, Dim<[usize; 2]>>; +fn scan_matmul(procs: Procs) -> Vec { + const MAT_SIZE: usize = 50; + let init = || { + Array::from_iter((0..((MAT_SIZE * MAT_SIZE) as i32)).map(|x| Wrapping(x))) + .into_shape((MAT_SIZE, MAT_SIZE)) + .unwrap() + }; + let id = Array::eye(MAT_SIZE); + + match procs { + Procs::Sequential => { + let f = |state: &mut Matrix, x: &Matrix| { + *state = state.dot(x); + Some(state.clone()) + }; + + scan_sequential(init, id, f) + } + Procs::Parallel => scan_parallel(init, id, |x, y| x.dot(y)), + } +} + +#[bench] +fn scan_matmul_sequential(b: &mut test::Bencher) { + b.iter(|| scan_matmul(Procs::Sequential)); +} + +#[bench] +fn scan_matmul_parallel(b: &mut test::Bencher) { + b.iter(|| scan_matmul(Procs::Parallel)); +} + +#[test] +fn test_scan_matmul() { + assert_eq!(scan_matmul(Procs::Sequential), scan_matmul(Procs::Parallel)); +} diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 812530ddc..9d659c309 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -81,6 +81,7 @@ use self::plumbing::*; use self::private::Try; +use self::scan::Scan; pub use either::Either; use std::cmp::Ordering; use std::collections::LinkedList; @@ -161,6 +162,8 @@ mod while_some; mod zip; mod zip_eq; +mod scan; + pub use self::{ blocks::{ExponentialBlocks, UniformBlocks}, chain::Chain, @@ -1394,6 +1397,14 @@ pub trait ParallelIterator: Sized + Send { sum::sum(self) } + fn scan(self, scan_op: F, id: Self::Item) -> Scan + where + F: Fn(&Self::Item, &Self::Item) -> Self::Item + Sync + Send, + ::Item: Send + Sync, + { + scan::scan(self, scan_op, id) + } + /// Multiplies all the items in the iterator. /// /// Note that the order in items will be reduced is not specified, diff --git a/src/iter/scan.rs b/src/iter/scan.rs new file mode 100644 index 000000000..d783a8480 --- /dev/null +++ b/src/iter/scan.rs @@ -0,0 +1,245 @@ +use super::plumbing::*; +use super::*; +use std::usize; +use std::collections::LinkedList; + +pub(super) fn scan(pi: PI, scan_op: P, id: T) -> Scan +where + PI: ParallelIterator, + P: Fn(&T, &T) -> T + Send + Sync, + T: Send + Sync, +{ + let list = scan_p1(pi, &scan_op); + let data = list.into_iter().collect(); + let offsets = compute_offsets(&data, &scan_op, id); + Scan::new(data, offsets, scan_op) +} + +// Compute the offset for each chunk by performing another sequential scan +// on the last value of each chunk +fn compute_offsets<'a, P, T>(data: &Vec>, scan_op: &'a P, id: T) -> Vec +where + P: Fn(&T, &T) -> T, +{ + let mut offsets: Vec = Vec::with_capacity(data.len()); + offsets.push(id); + + for it in data { + // offsets is never empty because we already pushed id to it + let last = offsets.last().unwrap(); + // `it` can never be empty due to implementation of ScanP1Folder + let next: T = (scan_op)(last, &it.last().unwrap()); + offsets.push(next); + } + offsets +} + +/******* scan part 1: consumer ******/ + +// Breaks the iterator into pieces and performs sequential scan on each +// Returns intermediate data, a LinkedList of the result of each seq scan +fn scan_p1<'a, PI, P, T>(pi: PI, scan_op: &'a P) -> LinkedList> +where + PI: ParallelIterator, + P: Fn(&T, &T) -> T + Send + Sync, + T: Send, +{ + let consumer = ScanP1Consumer { scan_op }; + pi.drive_unindexed(consumer) +} + +struct ScanP1Consumer<'p, P> { + scan_op: &'p P, +} + +impl<'p, P: Send> ScanP1Consumer<'p, P> { + fn new(scan_op: &'p P) -> ScanP1Consumer<'p, P> { + ScanP1Consumer { scan_op } + } +} + +impl<'p, T, P: 'p> Consumer for ScanP1Consumer<'p, P> +where + T: Send, + P: Fn(&T, &T) -> T + Send + Sync, +{ + type Folder = ScanP1Folder<'p, T, P>; + type Reducer = ScanP1Reducer; + type Result = LinkedList>; + + fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { + ( + ScanP1Consumer::new(self.scan_op), + ScanP1Consumer::new(self.scan_op), + ScanP1Reducer, + ) + } + + fn into_folder(self) -> Self::Folder { + ScanP1Folder { + vec: Vec::new(), + scan_op: self.scan_op, + } + } + + fn full(&self) -> bool { + false + } +} + +impl<'p, T, P: 'p> UnindexedConsumer for ScanP1Consumer<'p, P> +where + T: Send, + P: Fn(&T, &T) -> T + Send + Sync, +{ + fn split_off_left(&self) -> Self { + Self { + scan_op: self.scan_op, + } + } + + fn to_reducer(&self) -> Self::Reducer { + ScanP1Reducer + } +} + +struct ScanP1Folder<'p, T, P> { + vec: Vec, + scan_op: &'p P, +} + +impl<'p, T, P> Folder for ScanP1Folder<'p, T, P> +where + P: Fn(&T, &T) -> T + 'p, +{ + type Result = LinkedList>; + + fn consume(mut self, item: T) -> Self { + let next = match self.vec.last() { + None => item, + Some(prev) => (self.scan_op)(prev, &item), + }; + self.vec.push(next); + self + } + + fn complete(self) -> Self::Result { + let mut list = LinkedList::new(); + if !self.vec.is_empty() { + list.push_back(self.vec); + } + list + } + + fn full(&self) -> bool { + false + } +} + +struct ScanP1Reducer; + +impl Reducer> for ScanP1Reducer { + fn reduce(self, mut left: LinkedList, mut right: LinkedList) -> LinkedList { + left.append(&mut right); + left + } +} + +/*********** scan part 2: producer **********/ + +#[derive(Debug)] +pub struct Scan { + data: Vec>, + offsets: Vec, + scan_op: P, +} + +impl Scan +where + T: Send + Sync, + P: Fn(&T, &T) -> T + Send + Sync, +{ + pub(super) fn new(data: Vec>, offsets: Vec, scan_op: P) -> Self { + Scan { + data, + offsets, + scan_op, + } + } +} + +impl ParallelIterator for Scan +where + T: Send + Sync, + P: Fn(&T, &T) -> T + Send + Sync, +{ + type Item = T; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge_unindexed( + ScanP2Producer { + data: &self.data, + offsets: &self.offsets, + scan_op: &self.scan_op, + }, + consumer, + ) + } +} + +struct ScanP2Producer<'a, T, P> { + data: &'a [Vec], + offsets: &'a [T], + scan_op: &'a P, +} + +impl<'a, T, P> ScanP2Producer<'a, T, P> +where + T: Send + Sync, + P: Fn(&T, &T) -> T + Send + Sync, +{ + pub(super) fn new(data: &'a [Vec], offsets: &'a [T], scan_op: &'a P) -> Self { + ScanP2Producer { + data, + offsets, + scan_op, + } + } +} + +impl<'a, T, P> UnindexedProducer for ScanP2Producer<'a, T, P> +where + T: Send + Sync, + P: Fn(&T, &T) -> T + Send + Sync, +{ + type Item = T; + + fn split(self) -> (Self, Option) { + let mid = self.offsets.len() / 2; + if mid == 0 { + return (self, None); + } + let (data_l, data_r) = self.data.split_at(mid); + let (offsets_l, offsets_r) = self.offsets.split_at(mid); + + ( + ScanP2Producer::new(data_l, offsets_l, self.scan_op), + Some(ScanP2Producer::new(data_r, offsets_r, self.scan_op)), + ) + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + let iter = self + .data + .iter() + .zip(self.offsets.iter()) + .flat_map(|(chunk, offset)| chunk.iter().map(|x| (self.scan_op)(offset, x))); + folder.consume_iter(iter) + } +} diff --git a/src/lib.rs b/src/lib.rs index 09b5df308..33d90c0b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ #![deny(missing_debug_implementations)] -#![deny(missing_docs)] +//#![deny(missing_docs)] #![deny(unreachable_pub)] #![warn(rust_2018_idioms)] From d0b83c02af3b801acce04cf6f291d4598d5addc8 Mon Sep 17 00:00:00 2001 From: Ariel Uy Date: Mon, 17 Apr 2023 19:16:13 -0700 Subject: [PATCH 2/3] Add docs for scan --- src/iter/mod.rs | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 9d659c309..437ff5d30 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -1397,12 +1397,46 @@ pub trait ParallelIterator: Sized + Send { sum::sum(self) } - fn scan(self, scan_op: F, id: Self::Item) -> Scan + /// Folds the items in the iterator using `scan_op`, and produces a + /// new iterator with all of the intermediate results. + /// + /// Specifically, the nth element of the scan iterator will be the + /// result of reducing the first n elements of the input with `scan_op`. + /// + /// # Examples + /// + /// ``` + /// // Iterate over a sequence of numbers `x0, ..., xN` + /// // and use scan to compute the partial sums + /// use rayon::prelude::*; + /// let partial_sums = [1, 2, 3, 4, 5] + /// .into_par_iter() // iterating over i32 + /// .scan(|a, b| *a + *b, // add (&i32, &i32) -> i32 + /// 0) // identity + /// .collect::>(); + /// assert_eq!(partial_sums, vec![1, 3, 6, 10, 15]); + /// ``` + /// + /// **Note:** Unlike a sequential `scan` operation, the order in + /// which `scan_op` will be applied to produce the result is not fully + /// specified. So `scan_op` should be [associative] or else the results + /// will be non-deterministic. Also unlike sequential `scan`, there is + /// no internal state for this operation, so the operation has a + /// different signature. + /// + /// The argument `identity` should be an "identity" value for + /// `scan_op`, which may be inserted into the sequence as + /// needed to create opportunities for parallel execution. So, for + /// example, if you are doing a summation, then `identity` ought + /// to represent the zero for your type. + /// + /// [associative]: https://en.wikipedia.org/wiki/Associative_property + fn scan(self, scan_op: F, identity: Self::Item) -> Scan where F: Fn(&Self::Item, &Self::Item) -> Self::Item + Sync + Send, ::Item: Send + Sync, { - scan::scan(self, scan_op, id) + scan::scan(self, scan_op, identity) } /// Multiplies all the items in the iterator. From 054d670bbe648c66c567f0e9202baa1625420ccc Mon Sep 17 00:00:00 2001 From: Ariel Uy Date: Wed, 23 Apr 2025 09:47:25 -0700 Subject: [PATCH 3/3] Update scan PR with changes from rayon-scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Refactor tests/bench - Add new tests - Minor fix to splitting in scan algorithm Thanks to François Garillot for some of the changes. --- rayon-demo/src/main.rs | 4 +- rayon-demo/src/scan/bench.rs | 64 +++++++++++++++++ rayon-demo/src/scan/mod.rs | 134 ++++++----------------------------- rayon-demo/src/scan/scan.rs | 59 +++++++++++++++ src/iter/scan.rs | 4 +- 5 files changed, 148 insertions(+), 117 deletions(-) create mode 100644 rayon-demo/src/scan/bench.rs create mode 100644 rayon-demo/src/scan/scan.rs diff --git a/rayon-demo/src/main.rs b/rayon-demo/src/main.rs index f337c8797..6725dfe0f 100644 --- a/rayon-demo/src/main.rs +++ b/rayon-demo/src/main.rs @@ -28,6 +28,8 @@ mod map_collect; #[cfg(test)] mod pythagoras; #[cfg(test)] +mod scan; +#[cfg(test)] mod sort; #[cfg(test)] mod str_split; @@ -35,8 +37,6 @@ mod str_split; mod tree; #[cfg(test)] mod vec_collect; -#[cfg(test)] -mod scan; #[cfg(test)] extern crate test; diff --git a/rayon-demo/src/scan/bench.rs b/rayon-demo/src/scan/bench.rs new file mode 100644 index 000000000..3b4f39821 --- /dev/null +++ b/rayon-demo/src/scan/bench.rs @@ -0,0 +1,64 @@ +use std::time::{Duration, Instant}; + +use super::scan::{scan_matmul, scan_parallel, scan_sequential, Procs}; + +/******* Addition with artificial delay *******/ + +const DELAY: Duration = Duration::from_nanos(10); +fn wait() -> i32 { + let time = Instant::now(); + + let mut sum = 0; + while time.elapsed() < DELAY { + sum += 1; + } + sum +} + +fn scan_add(procs: Procs) -> Vec { + let init = || 2; + let id = 0; + + match procs { + Procs::Sequential => { + let f = |state: &mut i32, x: &i32| { + test::black_box(wait()); + *state += x; + Some(*state) + }; + scan_sequential(init, id, f) + } + Procs::Parallel => { + let f = |x: &i32, y: &i32| { + test::black_box(wait()); + *x + *y + }; + scan_parallel(init, id, f) + } + } +} + +#[test] +fn test_scan_add() { + assert_eq!(scan_add(Procs::Sequential), scan_add(Procs::Parallel)); +} + +#[bench] +fn scan_add_sequential(b: &mut test::Bencher) { + b.iter(|| scan_add(Procs::Sequential)); +} + +#[bench] +fn scan_add_parallel(b: &mut test::Bencher) { + b.iter(|| scan_add(Procs::Parallel)); +} + +#[bench] +fn scan_matmul_sequential(b: &mut test::Bencher) { + b.iter(|| scan_matmul(Procs::Sequential, 50)); +} + +#[bench] +fn scan_matmul_parallel(b: &mut test::Bencher) { + b.iter(|| scan_matmul(Procs::Parallel, 50)); +} diff --git a/rayon-demo/src/scan/mod.rs b/rayon-demo/src/scan/mod.rs index 1589422ba..8ce94104b 100644 --- a/rayon-demo/src/scan/mod.rs +++ b/rayon-demo/src/scan/mod.rs @@ -1,124 +1,32 @@ -use ndarray::{Array, Dim}; use rayon::iter::*; -use std::time::{Duration, Instant}; -use std::num::Wrapping; -const SIZE: usize = 10000; - -enum Procs { - Sequential, - Parallel, -} - -fn scan_sequential(init: I, id: T, scan_op: P) -> Vec -where - T: Clone, - I: Fn() -> T, - P: FnMut(&mut T, &T) -> Option, -{ - let v = vec![init(); SIZE]; - let scan = v.iter().scan(id, scan_op); - scan.collect() -} - -fn scan_parallel(init: I, id: T, scan_op: P) -> Vec -where - T: Clone + Send + Sync, - I: Fn() -> T, - P: Fn(&T, &T) -> T + Sync, -{ - let v = vec![init(); SIZE]; - let scan = v.into_par_iter().with_min_len(SIZE / 100).scan(&scan_op, id); - scan.collect() -} - -/******* Addition with artificial delay *******/ - -const DELAY: Duration = Duration::from_nanos(10); -fn wait() -> i32 { - let time = Instant::now(); - - let mut sum = 0; - while time.elapsed() < DELAY { - sum += 1; - } - sum -} - -fn scan_add(procs: Procs) -> Vec { - let init = || 2; - let id = 0; - - match procs { - Procs::Sequential => { - let f = |state: &mut i32, x: &i32| { - test::black_box(wait()); - *state += x; - Some(*state) - }; - scan_sequential(init, id, f) - } - Procs::Parallel => { - let f = |x: &i32, y: &i32| { - test::black_box(wait()); - *x + *y - }; - scan_parallel(init, id, f) - } - } -} - -#[bench] -fn scan_add_sequential(b: &mut test::Bencher) { - b.iter(|| scan_add(Procs::Sequential)); -} - -#[bench] -fn scan_add_parallel(b: &mut test::Bencher) { - b.iter(|| scan_add(Procs::Parallel)); -} +mod scan; +use self::scan::{scan_matmul, Procs}; #[test] -fn test_scan_add() { - assert_eq!(scan_add(Procs::Sequential), scan_add(Procs::Parallel)); +fn test_scan_matmul() { + assert_eq!( + scan_matmul(Procs::Sequential, 10), + scan_matmul(Procs::Parallel, 10) + ); } -/******** Matrix multiplication with wrapping arithmetic *******/ - -type Matrix = Array, Dim<[usize; 2]>>; -fn scan_matmul(procs: Procs) -> Vec { - const MAT_SIZE: usize = 50; - let init = || { - Array::from_iter((0..((MAT_SIZE * MAT_SIZE) as i32)).map(|x| Wrapping(x))) - .into_shape((MAT_SIZE, MAT_SIZE)) - .unwrap() +#[test] +fn test_scan_addition() { + let init = 0u64; + let op = |state: &mut u64, x: &u64| { + *state += x; + Some(*state) }; - let id = Array::eye(MAT_SIZE); - - match procs { - Procs::Sequential => { - let f = |state: &mut Matrix, x: &Matrix| { - *state = state.dot(x); - Some(state.clone()) - }; + let op_par = |state: &u64, x: &u64| *state + x; - scan_sequential(init, id, f) - } - Procs::Parallel => scan_parallel(init, id, |x, y| x.dot(y)), + for len in 0..100 { + let v = vec![1u64; len]; + let scan_seq = v.iter().scan(init, op).collect::>(); + let scan_par = v.into_par_iter().scan(op_par, init).collect::>(); + assert_eq!(scan_seq, scan_par); } } -#[bench] -fn scan_matmul_sequential(b: &mut test::Bencher) { - b.iter(|| scan_matmul(Procs::Sequential)); -} - -#[bench] -fn scan_matmul_parallel(b: &mut test::Bencher) { - b.iter(|| scan_matmul(Procs::Parallel)); -} - -#[test] -fn test_scan_matmul() { - assert_eq!(scan_matmul(Procs::Sequential), scan_matmul(Procs::Parallel)); -} +#[cfg(test)] +mod bench; diff --git a/rayon-demo/src/scan/scan.rs b/rayon-demo/src/scan/scan.rs new file mode 100644 index 000000000..2dc23c3d4 --- /dev/null +++ b/rayon-demo/src/scan/scan.rs @@ -0,0 +1,59 @@ +use ndarray::{Array, Dim}; +use rayon::iter::*; +use std::num::Wrapping; + +const SIZE: usize = 10000; + +pub enum Procs { + Sequential, + Parallel, +} + +pub fn scan_sequential(init: I, id: T, scan_op: P) -> Vec +where + T: Clone, + I: Fn() -> T, + P: FnMut(&mut T, &T) -> Option, +{ + let v = vec![init(); SIZE]; + let scan = v.iter().scan(id, scan_op); + scan.collect() +} + +pub fn scan_parallel(init: I, id: T, scan_op: P) -> Vec +where + T: Clone + Send + Sync, + I: Fn() -> T, + P: Fn(&T, &T) -> T + Sync, +{ + let v = vec![init(); SIZE]; + let scan = v + .into_par_iter() + .with_min_len(SIZE / 100) + .scan(&scan_op, id); + scan.collect() +} + +/******** Matrix multiplication with wrapping arithmetic *******/ + +type Matrix = Array, Dim<[usize; 2]>>; +pub fn scan_matmul(procs: Procs, mat_size: usize) -> Vec { + let init = || { + Array::from_iter((0..((mat_size * mat_size) as i32)).map(|x| Wrapping(x))) + .into_shape((mat_size, mat_size)) + .unwrap() + }; + let id = Array::eye(mat_size); + + match procs { + Procs::Sequential => { + let f = |state: &mut Matrix, x: &Matrix| { + *state = state.dot(x); + Some(state.clone()) + }; + + scan_sequential(init, id, f) + } + Procs::Parallel => scan_parallel(init, id, |x, y| x.dot(y)), + } +} diff --git a/src/iter/scan.rs b/src/iter/scan.rs index d783a8480..6173b985d 100644 --- a/src/iter/scan.rs +++ b/src/iter/scan.rs @@ -1,7 +1,7 @@ use super::plumbing::*; use super::*; -use std::usize; use std::collections::LinkedList; +use std::usize; pub(super) fn scan(pi: PI, scan_op: P, id: T) -> Scan where @@ -218,7 +218,7 @@ where type Item = T; fn split(self) -> (Self, Option) { - let mid = self.offsets.len() / 2; + let mid = self.data.len() / 2; if mid == 0 { return (self, None); }