Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rayon-demo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ rand = "0.9"
rand_xorshift = "0.4"
regex = "1"
winit = "0.30"
ndarray = "0.15.6"

[dependencies.serde]
version = "1.0.85"
Expand Down
2 changes: 2 additions & 0 deletions rayon-demo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ mod map_collect;
#[cfg(test)]
mod pythagoras;
#[cfg(test)]
mod scan;
#[cfg(test)]
mod sort;
#[cfg(test)]
mod str_split;
Expand Down
64 changes: 64 additions & 0 deletions rayon-demo/src/scan/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::time::{Duration, Instant};

use super::scan::{scan_matmul, scan_parallel, scan_sequential, Procs};

/******* Addition with artificial delay *******/

const DELAY: Duration = Duration::from_nanos(10);
fn wait() -> i32 {
let time = Instant::now();

let mut sum = 0;
while time.elapsed() < DELAY {
sum += 1;
}
sum
}

fn scan_add(procs: Procs) -> Vec<i32> {
let init = || 2;
let id = 0;

match procs {
Procs::Sequential => {
let f = |state: &mut i32, x: &i32| {
test::black_box(wait());
*state += x;
Some(*state)
};
scan_sequential(init, id, f)
}
Procs::Parallel => {
let f = |x: &i32, y: &i32| {
test::black_box(wait());
*x + *y
};
scan_parallel(init, id, f)
}
}
}

#[test]
fn test_scan_add() {
assert_eq!(scan_add(Procs::Sequential), scan_add(Procs::Parallel));
}

#[bench]
fn scan_add_sequential(b: &mut test::Bencher) {
b.iter(|| scan_add(Procs::Sequential));
}

#[bench]
fn scan_add_parallel(b: &mut test::Bencher) {
b.iter(|| scan_add(Procs::Parallel));
}

#[bench]
fn scan_matmul_sequential(b: &mut test::Bencher) {
b.iter(|| scan_matmul(Procs::Sequential, 50));
}

#[bench]
fn scan_matmul_parallel(b: &mut test::Bencher) {
b.iter(|| scan_matmul(Procs::Parallel, 50));
}
32 changes: 32 additions & 0 deletions rayon-demo/src/scan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use rayon::iter::*;

mod scan;
use self::scan::{scan_matmul, Procs};

#[test]
fn test_scan_matmul() {
assert_eq!(
scan_matmul(Procs::Sequential, 10),
scan_matmul(Procs::Parallel, 10)
);
}

#[test]
fn test_scan_addition() {
let init = 0u64;
let op = |state: &mut u64, x: &u64| {
*state += x;
Some(*state)
};
let op_par = |state: &u64, x: &u64| *state + x;

for len in 0..100 {
let v = vec![1u64; len];
let scan_seq = v.iter().scan(init, op).collect::<Vec<u64>>();
let scan_par = v.into_par_iter().scan(op_par, init).collect::<Vec<u64>>();
assert_eq!(scan_seq, scan_par);
}
}

#[cfg(test)]
mod bench;
59 changes: 59 additions & 0 deletions rayon-demo/src/scan/scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use ndarray::{Array, Dim};
use rayon::iter::*;
use std::num::Wrapping;

const SIZE: usize = 10000;

pub enum Procs {
Sequential,
Parallel,
}

pub fn scan_sequential<T, P, I>(init: I, id: T, scan_op: P) -> Vec<T>
where
T: Clone,
I: Fn() -> T,
P: FnMut(&mut T, &T) -> Option<T>,
{
let v = vec![init(); SIZE];
let scan = v.iter().scan(id, scan_op);
scan.collect()
}

pub fn scan_parallel<T, P, I>(init: I, id: T, scan_op: P) -> Vec<T>
where
T: Clone + Send + Sync,
I: Fn() -> T,
P: Fn(&T, &T) -> T + Sync,
{
let v = vec![init(); SIZE];
let scan = v
.into_par_iter()
.with_min_len(SIZE / 100)
.scan(&scan_op, id);
scan.collect()
}

/******** Matrix multiplication with wrapping arithmetic *******/

type Matrix = Array<Wrapping<i32>, Dim<[usize; 2]>>;
pub fn scan_matmul(procs: Procs, mat_size: usize) -> Vec<Matrix> {
let init = || {
Array::from_iter((0..((mat_size * mat_size) as i32)).map(|x| Wrapping(x)))
.into_shape((mat_size, mat_size))
.unwrap()
};
let id = Array::eye(mat_size);

match procs {
Procs::Sequential => {
let f = |state: &mut Matrix, x: &Matrix| {
*state = state.dot(x);
Some(state.clone())
};

scan_sequential(init, id, f)
}
Procs::Parallel => scan_parallel(init, id, |x, y| x.dot(y)),
}
}
45 changes: 45 additions & 0 deletions src/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@

use self::plumbing::*;
use self::private::Try;
use self::scan::Scan;
pub use either::Either;
use std::cmp::Ordering;
use std::collections::LinkedList;
Expand Down Expand Up @@ -161,6 +162,8 @@ mod while_some;
mod zip;
mod zip_eq;

mod scan;

pub use self::{
blocks::{ExponentialBlocks, UniformBlocks},
chain::Chain,
Expand Down Expand Up @@ -1394,6 +1397,48 @@ pub trait ParallelIterator: Sized + Send {
sum::sum(self)
}

/// Folds the items in the iterator using `scan_op`, and produces a
/// new iterator with all of the intermediate results.
///
/// Specifically, the nth element of the scan iterator will be the
/// result of reducing the first n elements of the input with `scan_op`.
///
/// # Examples
///
/// ```
/// // Iterate over a sequence of numbers `x0, ..., xN`
/// // and use scan to compute the partial sums
/// use rayon::prelude::*;
/// let partial_sums = [1, 2, 3, 4, 5]
/// .into_par_iter() // iterating over i32
/// .scan(|a, b| *a + *b, // add (&i32, &i32) -> i32
/// 0) // identity
/// .collect::<Vec<i32>>();
/// assert_eq!(partial_sums, vec![1, 3, 6, 10, 15]);
/// ```
///
/// **Note:** Unlike a sequential `scan` operation, the order in
/// which `scan_op` will be applied to produce the result is not fully
/// specified. So `scan_op` should be [associative] or else the results
/// will be non-deterministic. Also unlike sequential `scan`, there is
/// no internal state for this operation, so the operation has a
/// different signature.
///
/// The argument `identity` should be an "identity" value for
/// `scan_op`, which may be inserted into the sequence as
/// needed to create opportunities for parallel execution. So, for
/// example, if you are doing a summation, then `identity` ought
/// to represent the zero for your type.
///
/// [associative]: https://en.wikipedia.org/wiki/Associative_property
fn scan<F>(self, scan_op: F, identity: Self::Item) -> Scan<Self::Item, F>
where
F: Fn(&Self::Item, &Self::Item) -> Self::Item + Sync + Send,
<Self as ParallelIterator>::Item: Send + Sync,
{
scan::scan(self, scan_op, identity)
}

/// Multiplies all the items in the iterator.
///
/// Note that the order in items will be reduced is not specified,
Expand Down
Loading