Skip to content

Implement PoC block allocation for count accumulator #15642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use arrow::array::{ArrayRef, BooleanArray};
use datafusion_common::{not_impl_err, Result};

pub const BLOCK_SIZE: usize = 4096;

/// Describes how many rows should be emitted during grouping.
#[derive(Debug, Clone, Copy)]
pub enum EmitTo {
Expand Down Expand Up @@ -54,6 +56,38 @@ impl EmitTo {
}
}
}

/// Removes the number of rows from `v` required to emit the right
/// number of rows, returning a `Vec` with elements taken, and the
/// remaining values in `v`.
///
/// This avoids copying if Self::All
pub fn take_needed_blocks<T>(&self, v: &mut Vec<Vec<T>>) -> Vec<Vec<T>> {
match self {
Self::All => {
// Take the entire vector, leave new (empty) vector
std::mem::take(v)
}
Self::First(n) => {
let num_blocks = n / BLOCK_SIZE;

let num_values = n % BLOCK_SIZE;
// get num_blocks - 1 from t (unchanged)
let mut t = vec![];
if num_blocks > 0 {
t = v.split_off(num_blocks - 1);
}

// leave n+1,.. in v[num_blocks]
let mut t_v = v[0].split_off(num_values);
std::mem::swap(&mut v[0], &mut t_v);

t.push(t_v);

t
}
}
}
}

/// `GroupsAccumulator` implements a single aggregate (e.g. AVG) and
Expand Down
59 changes: 48 additions & 11 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use ahash::RandomState;
use datafusion_common::stats::Precision;
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::groups_accumulator::BLOCK_SIZE;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be based on batch size as well.

use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator;
use datafusion_macros::user_doc;
use datafusion_physical_expr::expressions;
Expand Down Expand Up @@ -468,13 +469,13 @@ impl Accumulator for CountAccumulator {
/// accumulator has no additional null or seen filter tracking.
#[derive(Debug)]
struct CountGroupsAccumulator {
/// Count per group.
/// Count per group, stored in blocks of BLOCK_SIZE values.
///
/// Note this is an i64 and not a u64 (or usize) because the
/// output type of count is `DataType::Int64`. Thus by using `i64`
/// for the counts, the output [`Int64Array`] can be created
/// without copy.
counts: Vec<i64>,
counts: Vec<Vec<i64>>,
}

impl CountGroupsAccumulator {
Expand All @@ -496,13 +497,28 @@ impl GroupsAccumulator for CountGroupsAccumulator {

// Add one to each group's counter for each non null, non
// filtered value
self.counts.resize(total_num_groups, 0);
let num_blocks = total_num_groups / BLOCK_SIZE + 1;
let num_values = total_num_groups % BLOCK_SIZE;

let prev_blocks = self.counts.len();
let new_blocks = num_blocks - prev_blocks;
self.counts
.resize_with(num_blocks, || Vec::with_capacity(BLOCK_SIZE));

self.counts
[prev_blocks.saturating_sub(1)..prev_blocks.saturating_sub(1) + new_blocks]
.iter_mut()
.for_each(|block| block.resize(BLOCK_SIZE, 0));

self.counts[num_blocks - 1].resize_with(num_values, || 0);
accumulate_indices(
group_indices,
values.logical_nulls().as_ref(),
opt_filter,
|group_index| {
self.counts[group_index] += 1;
let block = group_index / BLOCK_SIZE;
let index = group_index % BLOCK_SIZE;
self.counts[block][index] += 1;
},
);

Expand All @@ -525,31 +541,52 @@ impl GroupsAccumulator for CountGroupsAccumulator {
assert_eq!(partial_counts.null_count(), 0);
let partial_counts = partial_counts.values();

let num_blocks = total_num_groups / BLOCK_SIZE + 1;
let num_values = total_num_groups % BLOCK_SIZE;

self.counts
.resize_with(num_blocks, || Vec::with_capacity(BLOCK_SIZE));

let prev_blocks = self.counts.len();
let new_blocks = num_blocks - prev_blocks;

self.counts
[prev_blocks.saturating_sub(1)..prev_blocks.saturating_sub(1) + new_blocks]
.iter_mut()
.for_each(|block| block.resize(BLOCK_SIZE, 0));

self.counts[num_blocks - 1].resize_with(num_values, || 0);

// Adds the counts with the partial counts
self.counts.resize(total_num_groups, 0);
group_indices.iter().zip(partial_counts.iter()).for_each(
|(&group_index, partial_count)| {
self.counts[group_index] += partial_count;
let block = group_index / BLOCK_SIZE;
let index = group_index % BLOCK_SIZE;

self.counts[block][index] += partial_count;
},
);

Ok(())
}

fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let counts = emit_to.take_needed(&mut self.counts);
let counts = emit_to.take_needed_blocks(&mut self.counts);

// Count is always non null (null inputs just don't contribute to the overall values)
let nulls = None;
let array = PrimitiveArray::<Int64Type>::new(counts.into(), nulls);
// TODO: support emitting batches
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evaluate and state could be supported to return Result<Vec<ArrayRef>> and Result<Vec<Vec<ArrayRef>>> although this is making a quite large breaking change.

let array =
PrimitiveArray::<Int64Type>::from_iter_values(counts.into_iter().flatten());

Ok(Arc::new(array))
}

// return arrays for counts
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let counts = emit_to.take_needed(&mut self.counts);
let counts: PrimitiveArray<Int64Type> = Int64Array::from(counts); // zero copy, no nulls
// TODO: optimize this
let counts = emit_to.take_needed_blocks(&mut self.counts);
// todo: this could be optimized to minimize copies
let counts: Int64Array = counts.into_iter().flatten().collect();
Ok(vec![Arc::new(counts) as ArrayRef])
}

Expand Down