Skip to content

Commit 61293e2

Browse files
authored
fix(query): spill block need consider scalar (#15387)
* fix(query): spill block need consider scalar * refactor some as_column() to convert_to_full_column Note: as_column in some way is safe. Because sometimes the block is already convert_to_full. And sometimes we can confirme the Value<AnyType> is Column. E.g. in topK * add function BlockEntry::to_column
1 parent 3f8e36c commit 61293e2

File tree

10 files changed

+37
-25
lines changed

10 files changed

+37
-25
lines changed

src/query/expression/src/block.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ impl BlockEntry {
7373
_ => self,
7474
}
7575
}
76+
77+
pub fn to_column(&self, num_rows: usize) -> Column {
78+
self.value.convert_to_full_column(&self.data_type, num_rows)
79+
}
7680
}
7781

7882
#[typetag::serde(tag = "type")]

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,8 @@ pub fn agg_spilling_aggregate_payload<Method: HashMethodBounds>(
230230
let mut columns_data = Vec::with_capacity(columns.len());
231231
let mut columns_layout = Vec::with_capacity(columns.len());
232232
for column in columns.into_iter() {
233-
let column = column.value.as_column().unwrap();
234-
let column_data = serialize_column(column);
233+
let column = column.to_column(data_block.num_rows());
234+
let column_data = serialize_column(&column);
235235
write_size += column_data.len() as u64;
236236
columns_layout.push(column_data.len() as u64);
237237
columns_data.push(column_data);
@@ -327,8 +327,8 @@ pub fn spilling_aggregate_payload<Method: HashMethodBounds>(
327327
let mut columns_layout = Vec::with_capacity(columns.len());
328328

329329
for column in columns.into_iter() {
330-
let column = column.value.as_column().unwrap();
331-
let column_data = serialize_column(column);
330+
let column = column.to_column(data_block.num_rows());
331+
let column_data = serialize_column(&column);
332332
write_size += column_data.len() as u64;
333333
columns_layout.push(column_data.len() as u64);
334334
columns_data.push(column_data);

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ fn agg_spilling_aggregate_payload<Method: HashMethodBounds>(
277277
let mut columns_layout = Vec::with_capacity(columns.len());
278278

279279
for column in columns.into_iter() {
280-
let column = column.value.as_column().unwrap();
281-
let column_data = serialize_column(column);
280+
let column = column.to_column(data_block.num_rows());
281+
let column_data = serialize_column(&column);
282282
write_size += column_data.len() as u64;
283283
columns_layout.push(column_data.len() as u64);
284284
columns_data.push(column_data);
@@ -398,8 +398,8 @@ fn spilling_aggregate_payload<Method: HashMethodBounds>(
398398
let mut columns_layout = Vec::with_capacity(columns.len());
399399

400400
for column in columns.into_iter() {
401-
let column = column.value.as_column().unwrap();
402-
let column_data = serialize_column(column);
401+
let column = column.to_column(data_block.num_rows());
402+
let column_data = serialize_column(&column);
403403
write_size += column_data.len() as u64;
404404
columns_layout.push(column_data.len() as u64);
405405
columns_data.push(column_data);

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ fn agg_spilling_group_by_payload<Method: HashMethodBounds>(
322322
}
323323

324324
let data_block = payload.group_by_flush_all()?;
325+
let num_rows = data_block.num_rows();
325326
rows += data_block.num_rows();
326327

327328
let old_write_size = write_size;
@@ -330,8 +331,8 @@ fn agg_spilling_group_by_payload<Method: HashMethodBounds>(
330331
let mut columns_layout = Vec::with_capacity(columns.len());
331332

332333
for column in columns.into_iter() {
333-
let column = column.value.as_column().unwrap();
334-
let column_data = serialize_column(column);
334+
let column = column.to_column(num_rows);
335+
let column_data = serialize_column(&column);
335336
write_size += column_data.len() as u64;
336337
columns_layout.push(column_data.len() as u64);
337338
columns_data.push(column_data);
@@ -440,6 +441,7 @@ fn spilling_group_by_payload<Method: HashMethodBounds>(
440441
}
441442

442443
let data_block = serialize_group_by(method, inner_table)?;
444+
let num_rows = data_block.num_rows();
443445
rows += 0;
444446

445447
let old_write_size = write_size;
@@ -448,8 +450,8 @@ fn spilling_group_by_payload<Method: HashMethodBounds>(
448450
let mut columns_layout = Vec::with_capacity(columns.len());
449451

450452
for column in columns.into_iter() {
451-
let column = column.value.as_column().unwrap();
452-
let column_data = serialize_column(column);
453+
let column = column.to_column(num_rows);
454+
let column_data = serialize_column(&column);
453455
write_size += column_data.len() as u64;
454456
columns_layout.push(column_data.len() as u64);
455457
columns_data.push(column_data);

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,10 @@ pub fn agg_spilling_group_by_payload<Method: HashMethodBounds>(
227227
let mut columns_data = Vec::with_capacity(columns.len());
228228
let mut columns_layout = Vec::with_capacity(columns.len());
229229
for column in columns.into_iter() {
230-
let column = column.value.as_column().unwrap();
231-
let column_data = serialize_column(column);
230+
let column = column
231+
.value
232+
.convert_to_full_column(&column.data_type, data_block.num_rows());
233+
let column_data = serialize_column(&column);
232234
write_size += column_data.len() as u64;
233235
columns_layout.push(column_data.len() as u64);
234236
columns_data.push(column_data);
@@ -320,8 +322,8 @@ pub fn spilling_group_by_payload<Method: HashMethodBounds>(
320322
let mut columns_data = Vec::with_capacity(columns.len());
321323
let mut columns_layout = Vec::with_capacity(columns.len());
322324
for column in columns.into_iter() {
323-
let column = column.value.as_column().unwrap();
324-
let column_data = serialize_column(column);
325+
let column = column.to_column(data_block.num_rows());
326+
let column_data = serialize_column(&column);
325327
write_size += column_data.len() as u64;
326328
columns_layout.push(column_data.len() as u64);
327329
columns_data.push(column_data);

src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,15 @@ impl<Method: HashMethodBounds> TransformPartialAggregate<Method> {
228228
let aggregate_functions = &self.params.aggregate_functions;
229229
let offsets_aggregate_states = &self.params.offsets_aggregate_states;
230230

231+
let num_rows = block.num_rows();
231232
for index in 0..aggregate_functions.len() {
232233
// Aggregation states are in the back of the block.
233234
let agg_index = block.num_columns() - aggregate_functions.len() + index;
234235
let function = &aggregate_functions[index];
235236
let offset = offsets_aggregate_states[index];
236-
let agg_state = block.get_by_offset(agg_index).value.as_column().unwrap();
237+
let agg_state = block.get_by_offset(agg_index).to_column(num_rows);
237238

238-
function.batch_merge(places, offset, agg_state)?;
239+
function.batch_merge(places, offset, &agg_state)?;
239240
}
240241

241242
Ok(())

src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl HashJoinProbeState {
4848
let max_block_size = probe_state.max_block_size;
4949
// `probe_column` is the subquery result column.
5050
// For sql: select * from t1 where t1.a in (select t2.a from t2); t2.a is the `probe_column`,
51-
let probe_column = input.get_by_offset(0).value.as_column().unwrap();
51+
let probe_column = input.get_by_offset(0).to_column(input.num_rows());
5252
// Check if there is any null in the probe column.
5353
if matches!(probe_column.validity().1, Some(x) if x.unset_bits() > 0) {
5454
let mut has_null = self
@@ -148,7 +148,7 @@ impl HashJoinProbeState {
148148
let max_block_size = probe_state.max_block_size;
149149
// `probe_column` is the subquery result column.
150150
// For sql: select * from t1 where t1.a in (select t2.a from t2); t2.a is the `probe_column`,
151-
let probe_column = input.get_by_offset(0).value.as_column().unwrap();
151+
let probe_column = input.get_by_offset(0).to_column(input.num_rows());
152152
// Check if there is any null in the probe column.
153153
if matches!(probe_column.validity().1, Some(x) if x.unset_bits() > 0) {
154154
let mut has_null = self

src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark_join.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,11 @@ impl HashJoinProbeState {
118118
let build_indexes_ptr = build_indexes.as_mut_ptr();
119119
let pointers = probe_state.hashes.as_slice();
120120
let selection = &probe_state.selection.as_slice()[0..probe_state.selection_count];
121+
let num_rows = input.num_rows();
121122
let cols = input
122123
.columns()
123124
.iter()
124-
.map(|c| (c.value.as_column().unwrap().clone(), c.data_type.clone()))
125+
.map(|c| (c.to_column(num_rows), c.data_type.clone()))
125126
.collect::<Vec<_>>();
126127
let markers = probe_state.markers.as_mut().unwrap();
127128
self.hash_join_state

src/query/service/src/pipelines/processors/transforms/window/transform_window.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ impl<T: Number> TransformWindow<T> {
574574
}
575575
if cur != self.frame_end {
576576
let block = &self.blocks.get(cur.block - self.first_block).unwrap().block;
577-
let col = block.get_by_offset(func.arg).value.as_column().unwrap();
577+
let col = block.get_by_offset(func.arg).to_column(block.num_rows());
578578
col.index(cur.row).unwrap().to_owned()
579579
} else {
580580
// No such row
@@ -585,7 +585,7 @@ impl<T: Number> TransformWindow<T> {
585585
let cur = self.goback_row(self.frame_end);
586586
debug_assert!(self.frame_start <= cur);
587587
let block = &self.blocks.get(cur.block - self.first_block).unwrap().block;
588-
let col = block.get_by_offset(func.arg).value.as_column().unwrap();
588+
let col = block.get_by_offset(func.arg).to_column(block.num_rows());
589589
col.index(cur.row).unwrap().to_owned()
590590
};
591591
let builder = &mut self.blocks[self.current_row.block - self.first_block].builder;

src/query/service/src/spillers/spiller.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,10 @@ impl Spiller {
157157
let columns = data.columns().to_vec();
158158
let mut columns_data = Vec::with_capacity(columns.len());
159159
for column in columns.into_iter() {
160-
let column = column.value.as_column().unwrap();
161-
let column_data = serialize_column(column);
160+
let column = column
161+
.value
162+
.convert_to_full_column(&column.data_type, data.num_rows());
163+
let column_data = serialize_column(&column);
162164
self.columns_layout
163165
.entry(location.to_string())
164166
.and_modify(|layouts| {

0 commit comments

Comments
 (0)