From 1da3e04be30c13ffa85e56f5241e211e7f7d13f2 Mon Sep 17 00:00:00 2001 From: Sami Tabet Date: Mon, 23 Jun 2025 18:13:49 +0200 Subject: [PATCH] fix: Incorrect memory accounting in `array_agg` function See this issue: https://github.com/apache/datafusion/issues/16517 --- .../functions-aggregate/src/array_agg.rs | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 4ec73e306e0f..dbd2ec6686bd 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -341,12 +341,20 @@ impl Accumulator for ArrayAggAccumulator { Some(values) => { // Make sure we don't insert empty lists if !values.is_empty() { - self.values.push(values); + // The ArrayRef might be holding a reference to its original input buffer, so + // storing it here directly copied/compacted avoids over accounting memory + // not used here. + self.values + .push(make_array(copy_array_data(&values.to_data()))); } } None => { for arr in list_arr.iter().flatten() { - self.values.push(arr); + // The ArrayRef might be holding a reference to its original input buffer, so + // storing it here directly copied/compacted avoids over accounting memory + // not used here. + self.values + .push(make_array(copy_array_data(&arr.to_data()))); } } } @@ -728,7 +736,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { mod tests { use super::*; use arrow::array::{ListBuilder, StringBuilder}; - use arrow::datatypes::{FieldRef, Schema}; + use arrow::datatypes::{FieldRef, Schema, UInt64Type}; use datafusion_common::cast::as_generic_string_array; use datafusion_common::internal_err; use datafusion_physical_expr::expressions::Column; @@ -994,6 +1002,34 @@ mod tests { Ok(()) } + #[test] + fn does_not_over_account_memory_for_merge() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; + + let a1 = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(0), Some(1), Some(2)]), + Some(vec![Some(3)]), + None, + Some(vec![Some(4)]), + ]); + let a2 = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(0), Some(1), Some(2)]), + Some(vec![Some(3)]), + None, + Some(vec![Some(4)]), + ]); + + acc1.merge_batch(&[Arc::new(a1.slice(0, 1))])?; + acc2.merge_batch(&[Arc::new(a2.slice(0, 1))])?; + + acc1 = merge(acc1, acc2)?; + + // without compaction, the size is 16812. + assert_eq!(acc1.size(), 556); + + Ok(()) + } + #[test] fn does_not_over_account_memory() -> Result<()> { let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?;