Skip to content

Commit ce909b6

Browse files
authored
chore(query): reduce memory expansion in transform_aggregate_expand (#18372)
1 parent 472d0ea commit ce909b6

File tree

2 files changed

+35
-27
lines changed

2 files changed

+35
-27
lines changed

โ€Žsrc/query/service/src/pipelines/builders/builder_aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl PipelineBuilder {
8888
grouping_ids.push(!id & mask);
8989
}
9090

91-
self.main_pipeline.add_transformer(|| {
91+
self.main_pipeline.add_accumulating_transformer(|| {
9292
TransformExpandGroupingSets::new(group_bys.clone(), grouping_ids.clone())
9393
});
9494
Ok(())

โ€Žsrc/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_expand.rs

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use databend_common_expression::types::NumberScalar;
1919
use databend_common_expression::BlockEntry;
2020
use databend_common_expression::DataBlock;
2121
use databend_common_expression::Scalar;
22-
use databend_common_pipeline_transforms::processors::Transform;
22+
use databend_common_pipeline_transforms::AccumulatingTransform;
2323

2424
pub struct TransformExpandGroupingSets {
2525
group_bys: Vec<usize>,
@@ -35,10 +35,10 @@ impl TransformExpandGroupingSets {
3535
}
3636
}
3737

38-
impl Transform for TransformExpandGroupingSets {
38+
impl AccumulatingTransform for TransformExpandGroupingSets {
3939
const NAME: &'static str = "TransformExpandGroupingSets";
4040

41-
fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
41+
fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
4242
let num_rows = data.num_rows();
4343
let num_group_bys = self.group_bys.len();
4444
let mut output_blocks = Vec::with_capacity(self.grouping_ids.len());
@@ -48,45 +48,53 @@ impl Transform for TransformExpandGroupingSets {
4848
.map(|i| data.get_by_offset(*i).clone())
4949
.collect::<Vec<_>>();
5050

51+
let mut entries = data
52+
.columns()
53+
.iter()
54+
.cloned()
55+
.chain(dup_group_by_cols.clone())
56+
.collect::<Vec<_>>();
57+
58+
// all group columns should be nullable
59+
for i in 0..num_group_bys {
60+
let entry = unsafe {
61+
let offset = self.group_bys.get_unchecked(i);
62+
entries.get_unchecked_mut(*offset)
63+
};
64+
match entry {
65+
BlockEntry::Const(_, data_type, _) => {
66+
*data_type = data_type.wrap_nullable();
67+
}
68+
BlockEntry::Column(column) => *column = column.clone().wrap_nullable(None),
69+
};
70+
}
71+
5172
for &id in &self.grouping_ids {
5273
// Repeat data for each grouping set.
5374
let grouping_id_column = BlockEntry::new_const_column(
5475
DataType::Number(NumberDataType::UInt32),
5576
Scalar::Number(NumberScalar::UInt32(id as u32)),
5677
num_rows,
5778
);
58-
let mut entries = data
59-
.columns()
60-
.iter()
61-
.cloned()
62-
.chain(dup_group_by_cols.clone())
63-
.chain(Some(grouping_id_column))
64-
.collect::<Vec<_>>();
79+
// This is a copy of entries which clones the buffer of columns
80+
// So it's memory efficient
81+
let mut current_group_entries = entries.clone();
82+
current_group_entries.push(grouping_id_column);
83+
6584
let bits = !id;
6685
for i in 0..num_group_bys {
6786
let entry = unsafe {
6887
let offset = self.group_bys.get_unchecked(i);
69-
entries.get_unchecked_mut(*offset)
88+
current_group_entries.get_unchecked_mut(*offset)
7089
};
90+
// Reset the column to be nullable
7191
if bits & (1 << i) == 0 {
72-
// This column should be set to NULLs.
73-
*entry = BlockEntry::new_const_column(
74-
entry.data_type().wrap_nullable(),
75-
Scalar::Null,
76-
num_rows,
77-
)
78-
} else {
79-
match entry {
80-
BlockEntry::Const(_, data_type, _) => {
81-
*data_type = data_type.wrap_nullable();
82-
}
83-
BlockEntry::Column(column) => *column = column.clone().wrap_nullable(None),
84-
};
92+
*entry = BlockEntry::new_const_column(entry.data_type(), Scalar::Null, num_rows)
8593
}
8694
}
87-
output_blocks.push(DataBlock::new(entries, num_rows));
95+
output_blocks.push(DataBlock::new(current_group_entries, num_rows));
8896
}
8997

90-
DataBlock::concat(&output_blocks)
98+
Ok(output_blocks)
9199
}
92100
}

0 commit comments

Comments
ย (0)