diff --git a/datafusion/expr-common/src/groups_accumulator.rs b/datafusion/expr-common/src/groups_accumulator.rs index 5ff1c1d07216..fd8c4a522d32 100644 --- a/datafusion/expr-common/src/groups_accumulator.rs +++ b/datafusion/expr-common/src/groups_accumulator.rs @@ -20,6 +20,8 @@ use arrow::array::{ArrayRef, BooleanArray}; use datafusion_common::{not_impl_err, Result}; +pub const BLOCK_SIZE: usize = 4096; + /// Describes how many rows should be emitted during grouping. #[derive(Debug, Clone, Copy)] pub enum EmitTo { @@ -54,6 +56,38 @@ impl EmitTo { } } } + + /// Removes the number of rows from `v` required to emit the right + /// number of rows, returning a `Vec` with elements taken, and the + /// remaining values in `v`. + /// + /// This avoids copying if Self::All + pub fn take_needed_blocks(&self, v: &mut Vec>) -> Vec> { + match self { + Self::All => { + // Take the entire vector, leave new (empty) vector + std::mem::take(v) + } + Self::First(n) => { + let num_blocks = n / BLOCK_SIZE; + + let num_values = n % BLOCK_SIZE; + // get num_blocks - 1 from t (unchanged) + let mut t = vec![]; + if num_blocks > 0 { + t = v.split_off(num_blocks - 1); + } + + // leave n+1,.. in v[num_blocks] + let mut t_v = v[0].split_off(num_values); + std::mem::swap(&mut v[0], &mut t_v); + + t.push(t_v); + + t + } + } + } } /// `GroupsAccumulator` implements a single aggregate (e.g. AVG) and diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 2d995b4a4179..6a4786ff5643 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -18,6 +18,7 @@ use ahash::RandomState; use datafusion_common::stats::Precision; use datafusion_expr::expr::WindowFunction; +use datafusion_expr::groups_accumulator::BLOCK_SIZE; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; use datafusion_macros::user_doc; use datafusion_physical_expr::expressions; @@ -468,13 +469,13 @@ impl Accumulator for CountAccumulator { /// accumulator has no additional null or seen filter tracking. #[derive(Debug)] struct CountGroupsAccumulator { - /// Count per group. + /// Count per group, stored in blocks of BLOCK_SIZE values. /// /// Note this is an i64 and not a u64 (or usize) because the /// output type of count is `DataType::Int64`. Thus by using `i64` /// for the counts, the output [`Int64Array`] can be created /// without copy. - counts: Vec, + counts: Vec>, } impl CountGroupsAccumulator { @@ -496,13 +497,28 @@ impl GroupsAccumulator for CountGroupsAccumulator { // Add one to each group's counter for each non null, non // filtered value - self.counts.resize(total_num_groups, 0); + let num_blocks = total_num_groups / BLOCK_SIZE + 1; + let num_values = total_num_groups % BLOCK_SIZE; + + let prev_blocks = self.counts.len(); + let new_blocks = num_blocks - prev_blocks; + self.counts + .resize_with(num_blocks, || Vec::with_capacity(BLOCK_SIZE)); + + self.counts + [prev_blocks.saturating_sub(1)..prev_blocks.saturating_sub(1) + new_blocks] + .iter_mut() + .for_each(|block| block.resize(BLOCK_SIZE, 0)); + + self.counts[num_blocks - 1].resize_with(num_values, || 0); accumulate_indices( group_indices, values.logical_nulls().as_ref(), opt_filter, |group_index| { - self.counts[group_index] += 1; + let block = group_index / BLOCK_SIZE; + let index = group_index % BLOCK_SIZE; + self.counts[block][index] += 1; }, ); @@ -525,11 +541,29 @@ impl GroupsAccumulator for CountGroupsAccumulator { assert_eq!(partial_counts.null_count(), 0); let partial_counts = partial_counts.values(); + let num_blocks = total_num_groups / BLOCK_SIZE + 1; + let num_values = total_num_groups % BLOCK_SIZE; + + self.counts + .resize_with(num_blocks, || Vec::with_capacity(BLOCK_SIZE)); + + let prev_blocks = self.counts.len(); + let new_blocks = num_blocks - prev_blocks; + + self.counts + [prev_blocks.saturating_sub(1)..prev_blocks.saturating_sub(1) + new_blocks] + .iter_mut() + .for_each(|block| block.resize(BLOCK_SIZE, 0)); + + self.counts[num_blocks - 1].resize_with(num_values, || 0); + // Adds the counts with the partial counts - self.counts.resize(total_num_groups, 0); group_indices.iter().zip(partial_counts.iter()).for_each( |(&group_index, partial_count)| { - self.counts[group_index] += partial_count; + let block = group_index / BLOCK_SIZE; + let index = group_index % BLOCK_SIZE; + + self.counts[block][index] += partial_count; }, ); @@ -537,19 +571,22 @@ impl GroupsAccumulator for CountGroupsAccumulator { } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let counts = emit_to.take_needed(&mut self.counts); + let counts = emit_to.take_needed_blocks(&mut self.counts); // Count is always non null (null inputs just don't contribute to the overall values) - let nulls = None; - let array = PrimitiveArray::::new(counts.into(), nulls); + // TODO: support emitting batches + let array = + PrimitiveArray::::from_iter_values(counts.into_iter().flatten()); Ok(Arc::new(array)) } // return arrays for counts fn state(&mut self, emit_to: EmitTo) -> Result> { - let counts = emit_to.take_needed(&mut self.counts); - let counts: PrimitiveArray = Int64Array::from(counts); // zero copy, no nulls + // TODO: optimize this + let counts = emit_to.take_needed_blocks(&mut self.counts); + // todo: this could be optimized to minimize copies + let counts: Int64Array = counts.into_iter().flatten().collect(); Ok(vec![Arc::new(counts) as ArrayRef]) }