From 028584eaa94e4b787f81d804df47094e5d72fe4c Mon Sep 17 00:00:00 2001 From: coldWater Date: Mon, 21 Jul 2025 18:15:38 +0800 Subject: [PATCH 1/5] trait --- .../src/aggregate/aggregate_function.rs | 37 +++++++++++++----- .../src/aggregate/aggregate_function_state.rs | 38 +++++++++++++++---- .../src/aggregate/aggregate_hashtable.rs | 3 +- .../expression/src/aggregate/payload_flush.rs | 9 +---- .../adaptors/aggregate_null_adaptor.rs | 17 +++++---- .../adaptors/aggregate_ornull_adaptor.rs | 9 +++-- .../adaptors/aggregate_sort_adaptor.rs | 4 +- .../src/aggregates/aggregate_arg_min_max.rs | 4 +- .../src/aggregates/aggregate_array_agg.rs | 4 +- .../src/aggregates/aggregate_array_moving.rs | 8 ++-- .../src/aggregates/aggregate_bitmap.rs | 12 +++--- .../aggregate_combinator_distinct.rs | 4 +- .../src/aggregates/aggregate_combinator_if.rs | 8 ++-- .../aggregates/aggregate_combinator_state.rs | 10 ++--- .../src/aggregates/aggregate_count.rs | 4 +- .../src/aggregates/aggregate_covariance.rs | 4 +- .../aggregates/aggregate_json_array_agg.rs | 4 +- .../aggregates/aggregate_json_object_agg.rs | 4 +- .../src/aggregates/aggregate_markov_tarin.rs | 4 +- .../src/aggregates/aggregate_null_result.rs | 4 +- .../aggregates/aggregate_quantile_tdigest.rs | 4 +- .../aggregate_quantile_tdigest_weighted.rs | 4 +- .../src/aggregates/aggregate_retention.rs | 4 +- .../src/aggregates/aggregate_st_collect.rs | 4 +- .../src/aggregates/aggregate_string_agg.rs | 4 +- .../src/aggregates/aggregate_unary.rs | 4 +- .../src/aggregates/aggregate_window_funnel.rs | 4 +- .../src/aggregates/aggregator_common.rs | 4 +- .../aggregator/transform_single_key.rs | 9 +---- .../transforms/aggregator/udaf_script.rs | 4 +- 30 files changed, 134 insertions(+), 102 deletions(-) diff --git a/src/query/expression/src/aggregate/aggregate_function.rs b/src/query/expression/src/aggregate/aggregate_function.rs index fe446c3ed1cf4..d937fe61905e6 100755 --- a/src/query/expression/src/aggregate/aggregate_function.rs +++ b/src/query/expression/src/aggregate/aggregate_function.rs @@ -22,13 +22,13 @@ use super::AggrState; use super::AggrStateLoc; use super::AggrStateRegistry; use super::StateAddr; -use crate::types::BinaryType; use crate::types::DataType; +use crate::AggrStateSerdeType; use crate::BlockEntry; use crate::ColumnBuilder; -use crate::ColumnView; use crate::ProjectedBlock; use crate::Scalar; +use crate::ScalarRef; pub type AggregateFunctionRef = Arc; @@ -69,32 +69,49 @@ pub trait AggregateFunction: fmt::Display + Sync + Send { // Used in aggregate_null_adaptor fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()>; - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()>; + fn serialize_type(&self) -> Vec { + vec![AggrStateSerdeType::Binary(self.serialize_size_per_row())] + } + + fn serialize(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()> { + let binary_builder = builder.as_tuple_mut().unwrap()[0].as_binary_mut().unwrap(); + self.serialize_binary(place, &mut binary_builder.data)?; + binary_builder.commit_row(); + Ok(()) + } + + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()>; fn serialize_size_per_row(&self) -> Option { None } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()>; + fn merge(&self, place: AggrState, data: ScalarRef) -> Result<()> { + let mut binary = *data.as_tuple().unwrap()[0].as_binary().unwrap(); + self.merge_binary(place, &mut binary) + } + + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()>; /// Batch merge and deserialize the state from binary array fn batch_merge( &self, places: &[StateAddr], loc: &[AggrStateLoc], - state: &ColumnView, + state: &BlockEntry, ) -> Result<()> { - for (place, mut data) in places.iter().zip(state.iter()) { - self.merge(AggrState::new(*place, loc), &mut data)?; + let column = state.to_column(); + for (place, data) in places.iter().zip(column.iter()) { + self.merge(AggrState::new(*place, loc), data)?; } Ok(()) } fn batch_merge_single(&self, place: AggrState, state: &BlockEntry) -> Result<()> { - let view = state.downcast::().unwrap(); - for mut data in view.iter() { - self.merge(place, &mut data)?; + let column = state.to_column(); + for data in column.iter() { + self.merge(place, data)?; } Ok(()) } diff --git a/src/query/expression/src/aggregate/aggregate_function_state.rs b/src/query/expression/src/aggregate/aggregate_function_state.rs index 5d1fe21ac4379..409278532dd06 100644 --- a/src/query/expression/src/aggregate/aggregate_function_state.rs +++ b/src/query/expression/src/aggregate/aggregate_function_state.rs @@ -20,6 +20,8 @@ use enum_as_inner::EnumAsInner; use super::AggregateFunctionRef; use crate::types::binary::BinaryColumnBuilder; +use crate::types::DataType; +use crate::ColumnBuilder; #[derive(Clone, Copy, Debug)] pub struct StateAddr { @@ -113,11 +115,11 @@ impl From for usize { pub fn get_states_layout(funcs: &[AggregateFunctionRef]) -> Result { let mut registry = AggrStateRegistry::default(); - let mut serialize_size = Vec::with_capacity(funcs.len()); + let mut serialize_type = Vec::with_capacity(funcs.len()); for func in funcs { func.register_state(&mut registry); registry.commit(); - serialize_size.push(func.serialize_size_per_row()); + serialize_type.push(func.serialize_type().into_boxed_slice()); } let AggrStateRegistry { states, offsets } = registry; @@ -132,7 +134,7 @@ pub fn get_states_layout(funcs: &[AggregateFunctionRef]) -> Result Ok(StatesLayout { layout, states_loc, - serialize_size, + serialize_type, }) } @@ -195,14 +197,30 @@ impl AggrStateLoc { pub struct StatesLayout { pub layout: Layout, pub states_loc: Vec>, - serialize_size: Vec>, + serialize_type: Vec>, } impl StatesLayout { - pub fn serialize_builders(&self, num_rows: usize) -> Vec { - self.serialize_size + pub fn serialize_builders(&self, num_rows: usize) -> Vec { + self.serialize_type .iter() - .map(|size| BinaryColumnBuilder::with_capacity(num_rows, num_rows * size.unwrap_or(0))) + .map(|item| { + let builder = item + .iter() + .map(|serde_type| match serde_type { + AggrStateSerdeType::Bool => { + ColumnBuilder::with_capacity(&DataType::Boolean, num_rows) + } + AggrStateSerdeType::Binary(size) => { + ColumnBuilder::Binary(BinaryColumnBuilder::with_capacity( + num_rows, + num_rows * size.unwrap_or(0), + )) + } + }) + .collect(); + ColumnBuilder::Tuple(builder) + }) .collect() } } @@ -288,6 +306,12 @@ pub enum AggrStateType { Custom(Layout), } +#[derive(Debug, Clone, Copy)] +pub enum AggrStateSerdeType { + Bool, + Binary(Option), +} + #[cfg(test)] mod tests { use proptest::prelude::*; diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 11907059ef90e..a605227b7f5ce 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -27,7 +27,6 @@ use crate::aggregate::payload_row::row_match_columns; use crate::group_hash_columns; use crate::new_sel; use crate::read; -use crate::types::BinaryType; use crate::types::DataType; use crate::AggregateFunctionRef; use crate::BlockEntry; @@ -219,7 +218,7 @@ impl AggregateHashTable { .zip(agg_states.iter()) .zip(states_layout.states_loc.iter()) { - func.batch_merge(state_places, loc, &state.downcast::().unwrap())?; + func.batch_merge(state_places, loc, state)?; } } } diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index 43dabd3b519c2..e0d15864a3a8e 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -150,17 +150,12 @@ impl Payload { { { let builder = &mut builders[idx]; - func.serialize(AggrState::new(*place, loc), &mut builder.data)?; - builder.commit_row(); + func.serialize(AggrState::new(*place, loc), builder)?; } } } - entries.extend( - builders - .into_iter() - .map(|builder| Column::Binary(builder.build()).into()), - ); + entries.extend(builders.into_iter().map(|builder| builder.build().into())); } entries.extend_from_slice(&state.take_group_columns()); diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs b/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs index 18af505c1241a..b100564223d3a 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs @@ -183,11 +183,11 @@ impl AggregateFunction for AggregateNullUnaryAdapto .accumulate_row(place, not_null_columns, validity, row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { self.0.serialize(place, writer) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { self.0.merge(place, reader) } @@ -308,11 +308,11 @@ impl AggregateFunction .accumulate_row(place, not_null_columns, validity, row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { self.0.serialize(place, writer) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { self.0.merge(place, reader) } @@ -498,17 +498,18 @@ impl CommonNullAdaptor { fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { if !NULLABLE_RESULT { - return self.nested.serialize(place, writer); + return self.nested.serialize_binary(place, writer); } - self.nested.serialize(place.remove_last_loc(), writer)?; + self.nested + .serialize_binary(place.remove_last_loc(), writer)?; let flag = get_flag(place); writer.write_scalar(&flag) } fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { if !NULLABLE_RESULT { - return self.nested.merge(place, reader); + return self.nested.merge_binary(place, reader); } let flag = reader[reader.len() - 1]; @@ -522,7 +523,7 @@ impl CommonNullAdaptor { } set_flag(place, true); self.nested - .merge(place.remove_last_loc(), &mut &reader[..reader.len() - 1]) + .merge_binary(place.remove_last_loc(), &mut &reader[..reader.len() - 1]) } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs b/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs index c7f9d4c4677d4..3fd0cbcd70ccc 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs @@ -178,17 +178,18 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor { } #[inline] - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.inner.serialize(place.remove_last_loc(), writer)?; + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + self.inner + .serialize_binary(place.remove_last_loc(), writer)?; let flag = get_flag(place) as u8; writer.write_scalar(&flag) } #[inline] - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let flag = get_flag(place) || reader[reader.len() - 1] > 0; self.inner - .merge(place.remove_last_loc(), &mut &reader[..reader.len() - 1])?; + .merge_binary(place.remove_last_loc(), &mut &reader[..reader.len() - 1])?; set_flag(place, flag); Ok(()) } diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_sort_adaptor.rs b/src/query/functions/src/aggregates/adaptors/aggregate_sort_adaptor.rs index 4894fc24a9c84..20b11214d15fc 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_sort_adaptor.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_sort_adaptor.rs @@ -121,12 +121,12 @@ impl AggregateFunction for AggregateFunctionSortAdaptor { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = Self::get_state(place); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = Self::get_state(place); let rhs = SortAggState::deserialize(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_arg_min_max.rs b/src/query/functions/src/aggregates/aggregate_arg_min_max.rs index 98506ab12b8b5..5311335b6b229 100644 --- a/src/query/functions/src/aggregates/aggregate_arg_min_max.rs +++ b/src/query/functions/src/aggregates/aggregate_arg_min_max.rs @@ -270,12 +270,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: State = borsh_partial_deserialize(reader)?; state.merge_from(rhs) diff --git a/src/query/functions/src/aggregates/aggregate_array_agg.rs b/src/query/functions/src/aggregates/aggregate_array_agg.rs index 7083b886996df..ddeba25f9c86f 100644 --- a/src/query/functions/src/aggregates/aggregate_array_agg.rs +++ b/src/query/functions/src/aggregates/aggregate_array_agg.rs @@ -548,12 +548,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs = State::deserialize_reader(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_array_moving.rs b/src/query/functions/src/aggregates/aggregate_array_moving.rs index 6ef407c1402e2..0070924a974df 100644 --- a/src/query/functions/src/aggregates/aggregate_array_moving.rs +++ b/src/query/functions/src/aggregates/aggregate_array_moving.rs @@ -447,12 +447,12 @@ where State: SumState state.accumulate_row(&columns[0], row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs = State::deserialize_reader(reader)?; @@ -616,12 +616,12 @@ where State: SumState state.accumulate_row(&columns[0], row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs = State::deserialize_reader(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_bitmap.rs b/src/query/functions/src/aggregates/aggregate_bitmap.rs index cce9492c709d8..3f56ff133296b 100644 --- a/src/query/functions/src/aggregates/aggregate_bitmap.rs +++ b/src/query/functions/src/aggregates/aggregate_bitmap.rs @@ -287,7 +287,7 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); // flag indicate where bitmap is none let flag: u8 = if state.rb.is_some() { 1 } else { 0 }; @@ -298,7 +298,7 @@ where Ok(()) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let flag = reader[0]; @@ -482,12 +482,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.inner.serialize(place, writer) + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + self.inner.serialize_binary(place, writer) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - self.inner.merge(place, reader) + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + self.inner.merge_binary(place, reader) } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { diff --git a/src/query/functions/src/aggregates/aggregate_combinator_distinct.rs b/src/query/functions/src/aggregates/aggregate_combinator_distinct.rs index 77a11ec9b44f8..d74b9a32f661a 100644 --- a/src/query/functions/src/aggregates/aggregate_combinator_distinct.rs +++ b/src/query/functions/src/aggregates/aggregate_combinator_distinct.rs @@ -106,12 +106,12 @@ where State: DistinctStateFunc state.add(columns, row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = Self::get_state(place); state.serialize(writer) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = Self::get_state(place); let rhs = State::deserialize(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_combinator_if.rs b/src/query/functions/src/aggregates/aggregate_combinator_if.rs index ea83b8028d23e..03bbdf625e4df 100644 --- a/src/query/functions/src/aggregates/aggregate_combinator_if.rs +++ b/src/query/functions/src/aggregates/aggregate_combinator_if.rs @@ -155,12 +155,12 @@ impl AggregateFunction for AggregateIfCombinator { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.nested.serialize(place, writer) + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + self.nested.serialize_binary(place, writer) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - self.nested.merge(place, reader) + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + self.nested.merge_binary(place, reader) } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { diff --git a/src/query/functions/src/aggregates/aggregate_combinator_state.rs b/src/query/functions/src/aggregates/aggregate_combinator_state.rs index 486707ec7af61..e83d78f24164c 100644 --- a/src/query/functions/src/aggregates/aggregate_combinator_state.rs +++ b/src/query/functions/src/aggregates/aggregate_combinator_state.rs @@ -106,13 +106,13 @@ impl AggregateFunction for AggregateStateCombinator { self.nested.accumulate_row(place, columns, row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.nested.serialize(place, writer) + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + self.nested.serialize_binary(place, writer) } #[inline] - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - self.nested.merge(place, reader) + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + self.nested.merge_binary(place, reader) } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { @@ -121,7 +121,7 @@ impl AggregateFunction for AggregateStateCombinator { fn merge_result(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()> { let builder = builder.as_binary_mut().unwrap(); - self.nested.serialize(place, &mut builder.data)?; + self.nested.serialize_binary(place, &mut builder.data)?; builder.commit_row(); Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_count.rs b/src/query/functions/src/aggregates/aggregate_count.rs index b822b832896ee..9e1db7e50a989 100644 --- a/src/query/functions/src/aggregates/aggregate_count.rs +++ b/src/query/functions/src/aggregates/aggregate_count.rs @@ -160,12 +160,12 @@ impl AggregateFunction for AggregateCountFunction { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.count.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let other: u64 = borsh_partial_deserialize(reader)?; state.count += other; diff --git a/src/query/functions/src/aggregates/aggregate_covariance.rs b/src/query/functions/src/aggregates/aggregate_covariance.rs index 8af1216f56567..c69d242ea811a 100644 --- a/src/query/functions/src/aggregates/aggregate_covariance.rs +++ b/src/query/functions/src/aggregates/aggregate_covariance.rs @@ -231,12 +231,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: AggregateCovarianceState = borsh_partial_deserialize(reader)?; state.merge(&rhs); diff --git a/src/query/functions/src/aggregates/aggregate_json_array_agg.rs b/src/query/functions/src/aggregates/aggregate_json_array_agg.rs index f1ba8d35807fa..acd867e4f2a8e 100644 --- a/src/query/functions/src/aggregates/aggregate_json_array_agg.rs +++ b/src/query/functions/src/aggregates/aggregate_json_array_agg.rs @@ -240,12 +240,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: State = borsh_partial_deserialize(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_json_object_agg.rs b/src/query/functions/src/aggregates/aggregate_json_object_agg.rs index 9efe5fc0f6b90..45179f529cee2 100644 --- a/src/query/functions/src/aggregates/aggregate_json_object_agg.rs +++ b/src/query/functions/src/aggregates/aggregate_json_object_agg.rs @@ -293,12 +293,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: State = borsh_partial_deserialize(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_markov_tarin.rs b/src/query/functions/src/aggregates/aggregate_markov_tarin.rs index 0524caa523e39..285965bcbafa8 100644 --- a/src/query/functions/src/aggregates/aggregate_markov_tarin.rs +++ b/src/query/functions/src/aggregates/aggregate_markov_tarin.rs @@ -113,12 +113,12 @@ impl AggregateFunction for MarkovTarin { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let mut rhs = borsh_partial_deserialize::(reader)?; state.merge(&mut rhs); diff --git a/src/query/functions/src/aggregates/aggregate_null_result.rs b/src/query/functions/src/aggregates/aggregate_null_result.rs index 03f9b6a1ce041..74e2942e9bceb 100644 --- a/src/query/functions/src/aggregates/aggregate_null_result.rs +++ b/src/query/functions/src/aggregates/aggregate_null_result.rs @@ -86,11 +86,11 @@ impl AggregateFunction for AggregateNullResultFunction { Ok(()) } - fn serialize(&self, _place: AggrState, _writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, _place: AggrState, _writer: &mut Vec) -> Result<()> { Ok(()) } - fn merge(&self, _place: AggrState, _reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, _place: AggrState, _reader: &mut &[u8]) -> Result<()> { Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs b/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs index 5aadd1887b5db..d982f8b80ca14 100644 --- a/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs +++ b/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs @@ -362,12 +362,12 @@ where for<'a> T: AccessType = F64> + Send + Sync }); Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let mut rhs: QuantileTDigestState = borsh_partial_deserialize(reader)?; state.merge(&mut rhs) diff --git a/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs b/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs index 7ac562153fe53..d4aa266fd1a5e 100644 --- a/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs +++ b/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs @@ -143,12 +143,12 @@ where }); Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let mut rhs: QuantileTDigestState = borsh_partial_deserialize(reader)?; state.merge(&mut rhs) diff --git a/src/query/functions/src/aggregates/aggregate_retention.rs b/src/query/functions/src/aggregates/aggregate_retention.rs index 5f42198bec4c0..5b904f234e740 100644 --- a/src/query/functions/src/aggregates/aggregate_retention.rs +++ b/src/query/functions/src/aggregates/aggregate_retention.rs @@ -143,12 +143,12 @@ impl AggregateFunction for AggregateRetentionFunction { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: AggregateRetentionState = borsh_partial_deserialize(reader)?; state.merge(&rhs); diff --git a/src/query/functions/src/aggregates/aggregate_st_collect.rs b/src/query/functions/src/aggregates/aggregate_st_collect.rs index afdb3d88a373e..bfe960471c5ee 100644 --- a/src/query/functions/src/aggregates/aggregate_st_collect.rs +++ b/src/query/functions/src/aggregates/aggregate_st_collect.rs @@ -306,12 +306,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: State = borsh_partial_deserialize(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_string_agg.rs b/src/query/functions/src/aggregates/aggregate_string_agg.rs index 34a530c0253f7..743c2592d0b9b 100644 --- a/src/query/functions/src/aggregates/aggregate_string_agg.rs +++ b/src/query/functions/src/aggregates/aggregate_string_agg.rs @@ -147,12 +147,12 @@ impl AggregateFunction for AggregateStringAggFunction { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: StringAggState = borsh_partial_deserialize(reader)?; state.values.push_str(&rhs.values); diff --git a/src/query/functions/src/aggregates/aggregate_unary.rs b/src/query/functions/src/aggregates/aggregate_unary.rs index 20b9fc5f85a70..be20d15157534 100644 --- a/src/query/functions/src/aggregates/aggregate_unary.rs +++ b/src/query/functions/src/aggregates/aggregate_unary.rs @@ -227,12 +227,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state: &mut S = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state: &mut S = place.get::(); let rhs = S::deserialize_reader(reader)?; state.merge(&rhs) diff --git a/src/query/functions/src/aggregates/aggregate_window_funnel.rs b/src/query/functions/src/aggregates/aggregate_window_funnel.rs index 3d29c1734bff8..864e5d0eaeea5 100644 --- a/src/query/functions/src/aggregates/aggregate_window_funnel.rs +++ b/src/query/functions/src/aggregates/aggregate_window_funnel.rs @@ -275,12 +275,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::>(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::>(); let mut rhs: AggregateWindowFunnelState = borsh_partial_deserialize(reader)?; state.merge(&mut rhs); diff --git a/src/query/functions/src/aggregates/aggregator_common.rs b/src/query/functions/src/aggregates/aggregator_common.rs index 6430bcf3cc647..8d4255a991983 100644 --- a/src/query/functions/src/aggregates/aggregator_common.rs +++ b/src/query/functions/src/aggregates/aggregator_common.rs @@ -187,9 +187,9 @@ pub fn eval_aggr_for_test( func.accumulate(state, entries.into(), None, rows)?; if with_serialize { let mut buf = vec![]; - func.serialize(state, &mut buf)?; + func.serialize_binary(state, &mut buf)?; func.init_state(state); - func.merge(state, &mut buf.as_slice())?; + func.merge_binary(state, &mut buf.as_slice())?; } let mut builder = ColumnBuilder::with_capacity(&data_type, 1024); func.merge_result(state, &mut builder)?; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs index a43003ba291b4..9b5fba779986a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs @@ -24,7 +24,6 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::AggrState; use databend_common_expression::BlockMetaInfoDowncast; -use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; use databend_common_expression::DataBlock; use databend_common_expression::ProjectedBlock; @@ -156,14 +155,10 @@ impl AccumulatingTransform for PartialSingleStateAggregator { ) .zip(builders.iter_mut()) { - func.serialize(place, &mut builder.data)?; - builder.commit_row(); + func.serialize(place, builder)?; } - let columns = builders - .into_iter() - .map(|b| Column::Binary(b.build())) - .collect(); + let columns = builders.into_iter().map(|b| b.build()).collect(); vec![DataBlock::new_from_columns(columns)] } else { vec![] diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs index d92ace4e212fd..fdfc1dcbe6d39 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs @@ -98,14 +98,14 @@ impl AggregateFunction for AggregateUdfScript { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); state .serialize(writer) .map_err(|e| ErrorCode::Internal(format!("state failed to serialize: {e}"))) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs = UdfAggState::deserialize(reader).map_err(|e| ErrorCode::Internal(e.to_string()))?; From d64d2ec5246b973b0c028339924fe15de065af9f Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 22 Jul 2025 15:29:07 +0800 Subject: [PATCH 2/5] fix --- .../base/src/runtime/profile/profile.rs | 2 +- src/common/storage/src/copy.rs | 8 +- src/common/storage/src/merge.rs | 2 +- .../src/statistics/data_cache_statistics.rs | 2 +- .../src/aggregate/aggregate_function.rs | 6 +- .../src/aggregate/aggregate_function_state.rs | 44 +++-- src/query/expression/src/aggregate/payload.rs | 11 +- .../pipeline/core/src/processors/processor.rs | 39 ++++- .../pipeline/core/src/processors/profile.rs | 6 +- .../serde/transform_aggregate_serializer.rs | 89 ++++------ .../serde/transform_deserializer.rs | 163 ++++++++---------- .../serde/transform_exchange_async_barrier.rs | 40 +++-- .../servers/flight/v1/packets/packet_data.rs | 1 + .../physical_aggregate_partial.rs | 40 ++++- 14 files changed, 245 insertions(+), 208 deletions(-) diff --git a/src/common/base/src/runtime/profile/profile.rs b/src/common/base/src/runtime/profile/profile.rs index 58cfb503be56a..1dc288e7303f0 100644 --- a/src/common/base/src/runtime/profile/profile.rs +++ b/src/common/base/src/runtime/profile/profile.rs @@ -23,7 +23,7 @@ use crate::runtime::metrics::ScopedRegistry; use crate::runtime::profile::ProfileStatisticsName; use crate::runtime::ThreadTracker; -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ProfileLabel { pub name: String, pub value: Vec, diff --git a/src/common/storage/src/copy.rs b/src/common/storage/src/copy.rs index 927835fdb496a..a3464e33ed4fe 100644 --- a/src/common/storage/src/copy.rs +++ b/src/common/storage/src/copy.rs @@ -20,7 +20,7 @@ use serde::Deserialize; use serde::Serialize; use thiserror::Error; -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct CopyStatus { /// Key is file path. pub files: DashMap, @@ -45,7 +45,7 @@ impl CopyStatus { } } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct FileStatus { pub num_rows_loaded: usize, pub error: Option, @@ -79,7 +79,7 @@ impl FileStatus { } } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileErrorsInfo { pub num_errors: usize, pub first_error: FileParseErrorAtLine, @@ -156,7 +156,7 @@ impl FileParseError { } } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileParseErrorAtLine { pub error: FileParseError, pub line: usize, diff --git a/src/common/storage/src/merge.rs b/src/common/storage/src/merge.rs index 9f4fe94080bc4..50125e8823c90 100644 --- a/src/common/storage/src/merge.rs +++ b/src/common/storage/src/merge.rs @@ -15,7 +15,7 @@ use serde::Deserialize; use serde::Serialize; -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct MutationStatus { pub insert_rows: u64, pub deleted_rows: u64, diff --git a/src/query/catalog/src/statistics/data_cache_statistics.rs b/src/query/catalog/src/statistics/data_cache_statistics.rs index 04ae435204eb5..6838a93cc212d 100644 --- a/src/query/catalog/src/statistics/data_cache_statistics.rs +++ b/src/query/catalog/src/statistics/data_cache_statistics.rs @@ -24,7 +24,7 @@ pub struct DataCacheMetrics { bytes_from_memory: AtomicUsize, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct DataCacheMetricValues { pub bytes_from_remote_disk: usize, pub bytes_from_local_disk: usize, diff --git a/src/query/expression/src/aggregate/aggregate_function.rs b/src/query/expression/src/aggregate/aggregate_function.rs index d937fe61905e6..e4eb3ea51f047 100755 --- a/src/query/expression/src/aggregate/aggregate_function.rs +++ b/src/query/expression/src/aggregate/aggregate_function.rs @@ -23,12 +23,12 @@ use super::AggrStateLoc; use super::AggrStateRegistry; use super::StateAddr; use crate::types::DataType; -use crate::AggrStateSerdeType; use crate::BlockEntry; use crate::ColumnBuilder; use crate::ProjectedBlock; use crate::Scalar; use crate::ScalarRef; +use crate::StateSerdeItem; pub type AggregateFunctionRef = Arc; @@ -69,8 +69,8 @@ pub trait AggregateFunction: fmt::Display + Sync + Send { // Used in aggregate_null_adaptor fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()>; - fn serialize_type(&self) -> Vec { - vec![AggrStateSerdeType::Binary(self.serialize_size_per_row())] + fn serialize_type(&self) -> Vec { + vec![StateSerdeItem::Binary(self.serialize_size_per_row())] } fn serialize(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()> { diff --git a/src/query/expression/src/aggregate/aggregate_function_state.rs b/src/query/expression/src/aggregate/aggregate_function_state.rs index 409278532dd06..791ee68542ff7 100644 --- a/src/query/expression/src/aggregate/aggregate_function_state.rs +++ b/src/query/expression/src/aggregate/aggregate_function_state.rs @@ -119,7 +119,7 @@ pub fn get_states_layout(funcs: &[AggregateFunctionRef]) -> Result for func in funcs { func.register_state(&mut registry); registry.commit(); - serialize_type.push(func.serialize_type().into_boxed_slice()); + serialize_type.push(StateSerdeType(func.serialize_type().into())); } let AggrStateRegistry { states, offsets } = registry; @@ -193,25 +193,49 @@ impl AggrStateLoc { } } +#[derive(Debug, Clone, Copy)] +pub enum StateSerdeItem { + Bool, + Binary(Option), +} + +#[derive(Debug, Clone)] +pub struct StateSerdeType(Box<[StateSerdeItem]>); + +impl StateSerdeType { + pub fn data_type(&self) -> DataType { + DataType::Tuple( + self.0 + .iter() + .map(|item| match item { + StateSerdeItem::Bool => DataType::Boolean, + StateSerdeItem::Binary(_) => DataType::Binary, + }) + .collect(), + ) + } +} + #[derive(Debug, Clone)] pub struct StatesLayout { pub layout: Layout, pub states_loc: Vec>, - serialize_type: Vec>, + pub(super) serialize_type: Vec, } impl StatesLayout { pub fn serialize_builders(&self, num_rows: usize) -> Vec { self.serialize_type .iter() - .map(|item| { - let builder = item + .map(|serde_type| { + let builder = serde_type + .0 .iter() - .map(|serde_type| match serde_type { - AggrStateSerdeType::Bool => { + .map(|item| match item { + StateSerdeItem::Bool => { ColumnBuilder::with_capacity(&DataType::Boolean, num_rows) } - AggrStateSerdeType::Binary(size) => { + StateSerdeItem::Binary(size) => { ColumnBuilder::Binary(BinaryColumnBuilder::with_capacity( num_rows, num_rows * size.unwrap_or(0), @@ -306,12 +330,6 @@ pub enum AggrStateType { Custom(Layout), } -#[derive(Debug, Clone, Copy)] -pub enum AggrStateSerdeType { - Bool, - Binary(Option), -} - #[cfg(test)] mod tests { use proptest::prelude::*; diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index aafe2921c8532..836e299f16d47 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -423,9 +423,14 @@ impl Payload { pub fn empty_block(&self, fake_rows: Option) -> DataBlock { let fake_rows = fake_rows.unwrap_or(0); - let entries = (0..self.aggrs.len()) - .map(|_| { - ColumnBuilder::repeat_default(&DataType::Binary, fake_rows) + let entries = self + .states_layout + .as_ref() + .unwrap() + .serialize_type + .iter() + .map(|serde_type| { + ColumnBuilder::repeat_default(&serde_type.data_type(), fake_rows) .build() .into() }) diff --git a/src/query/pipeline/core/src/processors/processor.rs b/src/query/pipeline/core/src/processors/processor.rs index ce70053b80ded..bb71f493a2061 100644 --- a/src/query/pipeline/core/src/processors/processor.rs +++ b/src/query/pipeline/core/src/processors/processor.rs @@ -161,12 +161,24 @@ impl ProcessorPtr { /// # Safety pub unsafe fn process(&self) -> Result<()> { - let mut name = self.name(); - name.push_str("::process"); - let _span = LocalSpan::enter_with_local_parent(name) + let span = LocalSpan::enter_with_local_parent(format!("{}::process", self.name())) .with_property(|| ("graph-node-id", self.id().index().to_string())); - (*self.inner.get()).process() + match (*self.inner.get()).process() { + Ok(_) => Ok(()), + Err(err) => { + let _ = span + .with_property(|| ("error", "true")) + .with_properties(|| { + [ + ("error.type", err.code().to_string()), + ("error.message", err.display_text()), + ] + }); + log::info!(error = err.to_string(); "[PIPELINE-EXECUTOR] Error in process"); + Err(err) + } + } } /// # Safety @@ -190,10 +202,23 @@ impl ProcessorPtr { async move { let span = Span::enter_with_local_parent(name) .with_property(|| ("graph-node-id", id.index().to_string())); - task.in_span(span).await?; - drop(inner); - Ok(()) + match task.await { + Ok(_) => { + drop(inner); + Ok(()) + } + Err(err) => { + span.with_property(|| ("error", "true")).add_properties(|| { + [ + ("error.type", err.code().to_string()), + ("error.message", err.display_text()), + ] + }); + log::info!(error = err.to_string(); "[PIPELINE-EXECUTOR] Error in process"); + Err(err) + } + } } .boxed() } diff --git a/src/query/pipeline/core/src/processors/profile.rs b/src/query/pipeline/core/src/processors/profile.rs index 7fb5af4f4e8be..58b0fd98169ab 100644 --- a/src/query/pipeline/core/src/processors/profile.rs +++ b/src/query/pipeline/core/src/processors/profile.rs @@ -43,7 +43,7 @@ impl Drop for PlanScopeGuard { } } -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct ErrorInfoDesc { message: String, detail: String, @@ -60,7 +60,7 @@ impl ErrorInfoDesc { } } -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum ErrorInfo { Other(ErrorInfoDesc), IoError(ErrorInfoDesc), @@ -68,7 +68,7 @@ pub enum ErrorInfo { CalculationError(ErrorInfoDesc), } -#[derive(Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PlanProfile { pub id: Option, pub name: Option, diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index 096485fa98fcc..03b45b75d6607 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -121,26 +121,19 @@ impl Processor for TransformAggregateSerializer { impl TransformAggregateSerializer { fn transform_input_data(&mut self, mut data_block: DataBlock) -> Result { debug_assert!(data_block.is_empty()); - if let Some(block_meta) = data_block.take_meta() { - if let Some(block_meta) = AggregateMeta::downcast_from(block_meta) { - match block_meta { - AggregateMeta::Spilled(_) => unreachable!(), - AggregateMeta::Serialized(_) => unreachable!(), - AggregateMeta::BucketSpilled(_) => unreachable!(), - AggregateMeta::Partitioned { .. } => unreachable!(), - AggregateMeta::AggregateSpilling(_) => unreachable!(), - AggregateMeta::AggregatePayload(p) => { - self.input_data = Some(SerializeAggregateStream::create( - &self.params, - SerializePayload::AggregatePayload(p), - )); - return Ok(Event::Sync); - } - } - } - } - unreachable!() + let Some(AggregateMeta::AggregatePayload(p)) = data_block + .take_meta() + .and_then(AggregateMeta::downcast_from) + else { + unreachable!() + }; + + self.input_data = Some(SerializeAggregateStream::create( + &self.params, + SerializePayload::AggregatePayload(p), + )); + Ok(Event::Sync) } } @@ -218,41 +211,29 @@ impl SerializeAggregateStream { return Ok(None); } - match self.payload.as_ref().get_ref() { - SerializePayload::AggregatePayload(p) => { - let block = p.payload.aggregate_flush(&mut self.flush_state)?; - - if block.is_none() { - self.end_iter = true; - } - - match block { - Some(block) => { - self.nums += 1; - Ok(Some(block.add_meta(Some( - AggregateSerdeMeta::create_agg_payload( - p.bucket, - p.max_partition_count, - false, - ), - ))?)) - } - None => { - // always return at least one block - if self.nums == 0 { - self.nums += 1; - let block = p.payload.empty_block(Some(1)); - Ok(Some(block.add_meta(Some( - AggregateSerdeMeta::create_agg_payload( - p.bucket, - p.max_partition_count, - true, - ), - ))?)) - } else { - Ok(None) - } - } + let SerializePayload::AggregatePayload(p) = self.payload.as_ref().get_ref(); + match p.payload.aggregate_flush(&mut self.flush_state)? { + Some(block) => { + self.nums += 1; + Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count, false), + ))?)) + } + None => { + self.end_iter = true; + // always return at least one block + if self.nums == 0 { + self.nums += 1; + let block = p.payload.empty_block(Some(1)); + Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload( + p.bucket, + p.max_partition_count, + true, + ), + ))?)) + } else { + Ok(None) } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs index 09725ee3bb173..9f632ec0b187c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs @@ -79,94 +79,77 @@ impl TransformDeserializer { return Ok(DataBlock::new_with_meta(vec![], 0, meta)); } - let data_block = match &meta { - None => { - deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())? - } - Some(meta) => match AggregateSerdeMeta::downcast_ref_from(meta) { - None => { - deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())? + let Some(meta) = meta + .as_ref() + .and_then(AggregateSerdeMeta::downcast_ref_from) + else { + let data_block = + deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())?; + return match data_block.num_columns() == 0 { + true => Ok(DataBlock::new_with_meta(vec![], row_count as usize, meta)), + false => data_block.add_meta(meta), + }; + }; + + match meta.typ { + BUCKET_TYPE => { + let mut block = deserialize_block( + dict, + fragment_data, + &self.schema, + self.arrow_schema.clone(), + )?; + + if meta.is_empty { + block = block.slice(0..0); } - Some(meta) => { - return match meta.typ == BUCKET_TYPE { - true => { - let mut block = deserialize_block( - dict, - fragment_data, - &self.schema, - self.arrow_schema.clone(), - )?; - - if meta.is_empty { - block = block.slice(0..0); - } - - Ok(DataBlock::empty_with_meta( - AggregateMeta::create_serialized( - meta.bucket, - block, - meta.max_partition_count, - ), - )) - } - false => { - let data_schema = Arc::new(exchange_defines::spilled_schema()); - let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema()); - let data_block = deserialize_block( - dict, - fragment_data, - &data_schema, - arrow_schema.clone(), - )?; - - let columns = data_block - .columns() - .iter() - .map(|c| c.as_column().unwrap().clone()) - .collect::>(); - - let buckets = - NumberType::::try_downcast_column(&columns[0]).unwrap(); - let data_range_start = - NumberType::::try_downcast_column(&columns[1]).unwrap(); - let data_range_end = - NumberType::::try_downcast_column(&columns[2]).unwrap(); - let columns_layout = - ArrayType::::try_downcast_column(&columns[3]).unwrap(); - - let columns_layout_data = columns_layout.values().as_slice(); - let columns_layout_offsets = columns_layout.offsets(); - - let mut buckets_payload = Vec::with_capacity(data_block.num_rows()); - for index in 0..data_block.num_rows() { - unsafe { - buckets_payload.push(BucketSpilledPayload { - bucket: *buckets.get_unchecked(index) as isize, - location: meta.location.clone().unwrap(), - data_range: *data_range_start.get_unchecked(index) - ..*data_range_end.get_unchecked(index), - columns_layout: columns_layout_data[columns_layout_offsets - [index] - as usize - ..columns_layout_offsets[index + 1] as usize] - .to_vec(), - max_partition_count: meta.max_partition_count, - }); - } - } - - Ok(DataBlock::empty_with_meta(AggregateMeta::create_spilled( - buckets_payload, - ))) - } - }; + + Ok(DataBlock::empty_with_meta( + AggregateMeta::create_serialized(meta.bucket, block, meta.max_partition_count), + )) + } + _ => { + let data_schema = Arc::new(exchange_defines::spilled_schema()); + let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema()); + let data_block = + deserialize_block(dict, fragment_data, &data_schema, arrow_schema.clone())?; + + let columns = data_block + .columns() + .iter() + .map(|c| c.as_column().unwrap().clone()) + .collect::>(); + + let buckets = NumberType::::try_downcast_column(&columns[0]).unwrap(); + let data_range_start = NumberType::::try_downcast_column(&columns[1]).unwrap(); + let data_range_end = NumberType::::try_downcast_column(&columns[2]).unwrap(); + let columns_layout = + ArrayType::::try_downcast_column(&columns[3]).unwrap(); + + let columns_layout_data = columns_layout.values().as_slice(); + let columns_layout_offsets = columns_layout.offsets(); + + let mut buckets_payload = Vec::with_capacity(data_block.num_rows()); + for index in 0..data_block.num_rows() { + unsafe { + buckets_payload.push(BucketSpilledPayload { + bucket: *buckets.get_unchecked(index) as isize, + location: meta.location.clone().unwrap(), + data_range: *data_range_start.get_unchecked(index) + ..*data_range_end.get_unchecked(index), + columns_layout: columns_layout_data[columns_layout_offsets[index] + as usize + ..columns_layout_offsets[index + 1] as usize] + .to_vec(), + max_partition_count: meta.max_partition_count, + }); + } } - }, - }; - match data_block.num_columns() == 0 { - true => Ok(DataBlock::new_with_meta(vec![], row_count as usize, meta)), - false => data_block.add_meta(meta), + Ok(DataBlock::empty_with_meta(AggregateMeta::create_spilled( + buckets_payload, + ))) + } } } } @@ -177,15 +160,9 @@ impl BlockMetaTransform for TransformDeserializer { fn transform(&mut self, mut meta: ExchangeDeserializeMeta) -> Result> { match meta.packet.pop().unwrap() { - DataPacket::ErrorCode(v) => Err(v), - DataPacket::Dictionary(_) => unreachable!(), - DataPacket::QueryProfiles(_) => unreachable!(), - DataPacket::SerializeProgress { .. } => unreachable!(), - DataPacket::CopyStatus { .. } => unreachable!(), - DataPacket::MutationStatus { .. } => unreachable!(), - DataPacket::DataCacheMetrics(_) => unreachable!(), DataPacket::FragmentData(v) => Ok(vec![self.recv_data(meta.packet, v)?]), - DataPacket::QueryPerf(_) => unreachable!(), + DataPacket::ErrorCode(err) => Err(err), + _ => unreachable!(), } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_async_barrier.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_async_barrier.rs index 1628bc9af5beb..eaebda0543e6d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_async_barrier.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_async_barrier.rs @@ -45,29 +45,31 @@ impl AsyncTransform for TransformExchangeAsyncBarrier { const NAME: &'static str = "TransformExchangeAsyncBarrier"; async fn transform(&mut self, mut data: DataBlock) -> Result { - if let Some(meta) = data + let Some(meta) = data .take_meta() .and_then(FlightSerializedMeta::downcast_from) - { - let mut futures = Vec::with_capacity(meta.serialized_blocks.len()); + else { + return Err(ErrorCode::Internal("")); + }; - for serialized_block in meta.serialized_blocks { - futures.push(databend_common_base::runtime::spawn(async move { - match serialized_block { - FlightSerialized::DataBlock(v) => Ok(v), - FlightSerialized::Future(f) => f.await, - } - })); - } - - return match futures::future::try_join_all(futures).await { - Err(_) => Err(ErrorCode::TokioError("Cannot join tokio job")), - Ok(spilled_data) => Ok(DataBlock::empty_with_meta(ExchangeShuffleMeta::create( - spilled_data.into_iter().collect::>>()?, - ))), - }; + let mut futures = Vec::with_capacity(meta.serialized_blocks.len()); + for serialized_block in meta.serialized_blocks { + futures.push(databend_common_base::runtime::spawn(async move { + match serialized_block { + FlightSerialized::DataBlock(v) => Ok(v), + FlightSerialized::Future(f) => f.await, + } + })); } - Err(ErrorCode::Internal("")) + match futures::future::try_join_all(futures).await { + Err(_) => Err(ErrorCode::TokioError("Cannot join tokio job")), + Ok(spilled_data) => { + let blocks = spilled_data.into_iter().collect::>()?; + Ok(DataBlock::empty_with_meta(ExchangeShuffleMeta::create( + blocks, + ))) + } + } } } diff --git a/src/query/service/src/servers/flight/v1/packets/packet_data.rs b/src/query/service/src/servers/flight/v1/packets/packet_data.rs index 2b5407ffb2946..81226501438f8 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_data.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_data.rs @@ -54,6 +54,7 @@ impl Debug for FragmentData { } } +#[derive(Debug)] pub enum DataPacket { ErrorCode(ErrorCode), Dictionary(FlightData), diff --git a/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs b/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs index a8a73071aaa57..5026f7ba38f9a 100644 --- a/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs +++ b/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs @@ -14,11 +14,11 @@ use databend_common_exception::Result; use databend_common_expression::types::DataType; -#[allow(unused_imports)] -use databend_common_expression::DataBlock; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; +use databend_common_expression::StateSerdeItem; +use databend_common_functions::aggregates::AggregateFunctionFactory; use super::SortDesc; use crate::executor::explain::PlanStatsInfo; @@ -47,11 +47,39 @@ impl AggregatePartial { let input_schema = self.input.output_schema()?; let mut fields = Vec::with_capacity(self.agg_funcs.len() + self.group_by.len()); + let factory = AggregateFunctionFactory::instance(); - fields.extend(self.agg_funcs.iter().map(|func| { - let name = func.output_column.to_string(); - DataField::new(&name, DataType::Binary) - })); + for desc in &self.agg_funcs { + let name = desc.output_column.to_string(); + + if desc.sig.udaf.is_some() { + fields.push(DataField::new( + &name, + DataType::Tuple(vec![DataType::Binary]), + )); + continue; + } + + let func = factory + .get( + &desc.sig.name, + desc.sig.params.clone(), + desc.sig.args.clone(), + desc.sig.sort_descs.clone(), + ) + .unwrap(); + + let tuple = func + .serialize_type() + .iter() + .map(|serde_type| match serde_type { + StateSerdeItem::Bool => DataType::Boolean, + StateSerdeItem::Binary(_) => DataType::Binary, + }) + .collect(); + + fields.push(DataField::new(&name, DataType::Tuple(tuple))) + } for (idx, field) in self.group_by.iter().zip( self.group_by From f303632aeb96ea402ca9ae37aa4064c2848331f1 Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 22 Jul 2025 17:35:21 +0800 Subject: [PATCH 3/5] fix --- src/query/expression/src/aggregate/payload.rs | 4 ++-- .../src/pipelines/executor/pipeline_executor.rs | 7 +++---- .../pipelines/executor/query_pipeline_executor.rs | 15 ++++++++++----- .../transform_exchange_aggregate_serializer.rs | 9 +++------ 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index 836e299f16d47..bd225a6a9f785 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -423,12 +423,12 @@ impl Payload { pub fn empty_block(&self, fake_rows: Option) -> DataBlock { let fake_rows = fake_rows.unwrap_or(0); + assert_eq!(self.aggrs.is_empty(), self.states_layout.is_none()); let entries = self .states_layout .as_ref() - .unwrap() - .serialize_type .iter() + .flat_map(|x| x.serialize_type.iter()) .map(|serde_type| { ColumnBuilder::repeat_default(&serde_type.data_type(), fake_rows) .build() diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 53a82eba8da10..7117dadb04634 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -146,6 +146,7 @@ impl PipelineExecutor { } } + #[fastrace::trace(name = "PipelineExecutor::init")] fn init(on_init_callback: &Mutex>, query_id: &Arc) -> Result<()> { // TODO: the on init callback cannot be killed. { @@ -158,10 +159,8 @@ impl PipelineExecutor { } } - info!( - "[PIPELINE-EXECUTOR] Pipeline initialized successfully for query {}, elapsed: {:?}", - query_id, - instant.elapsed() + info!(query_id, elapsed:? = instant.elapsed(); + "[PIPELINE-EXECUTOR] Pipeline initialized successfully", ); } Ok(()) diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index b45e707f30cc8..88aa39a5a51fa 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -266,6 +266,7 @@ impl QueryPipelineExecutor { Ok(()) } + #[fastrace::trace(name = "QueryPipelineExecutor::init")] fn init(self: &Arc, graph: Arc) -> Result<()> { unsafe { // TODO: the on init callback cannot be killed. @@ -286,10 +287,8 @@ impl QueryPipelineExecutor { } } - info!( - "[PIPELINE-EXECUTOR] Pipeline initialized successfully for query {}, elapsed: {:?}", - self.settings.query_id, - instant.elapsed() + info!(query_id = self.settings.query_id, elapsed:? = instant.elapsed(); + "[PIPELINE-EXECUTOR] Pipeline initialized successfully", ); } @@ -358,7 +357,7 @@ impl QueryPipelineExecutor { } } - let span = Span::enter_with_local_parent(func_path!()) + let span = Span::enter_with_local_parent("QueryPipelineExecutor::execute_threads") .with_property(|| ("thread_name", name.clone())); thread_join_handles.push(Thread::named_spawn(Some(name), move || unsafe { let _g = span.set_local_parent(); @@ -367,6 +366,12 @@ impl QueryPipelineExecutor { // finish the pipeline executor when has error or panic if let Err(cause) = try_result.flatten() { + span.with_property(|| ("error", "true")).add_properties(|| { + [ + ("error.type", cause.code().to_string()), + ("error.message", cause.display_text()), + ] + }); this.finish(Some(cause)); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index ffd80674bdab4..118f8119fed5e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -125,12 +125,7 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria continue; } - match AggregateMeta::downcast_from(block.take_meta().unwrap()) { - None => unreachable!(), - Some(AggregateMeta::Spilled(_)) => unreachable!(), - Some(AggregateMeta::Serialized(_)) => unreachable!(), - Some(AggregateMeta::BucketSpilled(_)) => unreachable!(), - Some(AggregateMeta::Partitioned { .. }) => unreachable!(), + match block.take_meta().and_then(AggregateMeta::downcast_from) { Some(AggregateMeta::AggregateSpilling(payload)) => { serialized_blocks.push(FlightSerialized::Future( match index == self.local_pos { @@ -172,6 +167,8 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria let c = serialize_block(block_number, c, &self.options)?; serialized_blocks.push(FlightSerialized::DataBlock(c)); } + + _ => unreachable!(), }; } From 64d74db68b17b9dbf800b005bc0ff185130bb5f9 Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 22 Jul 2025 18:20:24 +0800 Subject: [PATCH 4/5] fix --- src/query/expression/src/aggregate/aggregate_function.rs | 5 ----- src/query/expression/src/aggregate/payload.rs | 5 ++--- src/query/expression/src/aggregate/payload_flush.rs | 4 ++-- .../src/aggregates/adaptors/aggregate_null_adaptor.rs | 8 -------- .../src/aggregates/adaptors/aggregate_ornull_adaptor.rs | 4 ---- .../src/pipelines/executor/query_pipeline_executor.rs | 1 - .../aggregator/serde/transform_aggregate_serializer.rs | 2 +- 7 files changed, 5 insertions(+), 24 deletions(-) diff --git a/src/query/expression/src/aggregate/aggregate_function.rs b/src/query/expression/src/aggregate/aggregate_function.rs index e4eb3ea51f047..6ba5b3f569c89 100755 --- a/src/query/expression/src/aggregate/aggregate_function.rs +++ b/src/query/expression/src/aggregate/aggregate_function.rs @@ -166,9 +166,4 @@ pub trait AggregateFunction: fmt::Display + Sync + Send { fn get_if_condition(&self, _columns: ProjectedBlock) -> Option { None } - - // some features - fn convert_const_to_full(&self) -> bool { - true - } } diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index bd225a6a9f785..925293beb214c 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -421,14 +421,13 @@ impl Payload { true } - pub fn empty_block(&self, fake_rows: Option) -> DataBlock { - let fake_rows = fake_rows.unwrap_or(0); + pub fn empty_block(&self, fake_rows: usize) -> DataBlock { assert_eq!(self.aggrs.is_empty(), self.states_layout.is_none()); let entries = self .states_layout .as_ref() .iter() - .flat_map(|x| x.serialize_type.iter()) + .flat_map(|layout| layout.serialize_type.iter()) .map(|serde_type| { ColumnBuilder::repeat_default(&serde_type.data_type(), fake_rows) .build() diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index e0d15864a3a8e..5863b533e31b9 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -125,7 +125,7 @@ impl Payload { } if blocks.is_empty() { - return Ok(self.empty_block(None)); + return Ok(self.empty_block(0)); } DataBlock::concat(&blocks) } @@ -172,7 +172,7 @@ impl Payload { } if blocks.is_empty() { - return Ok(self.empty_block(None)); + return Ok(self.empty_block(0)); } DataBlock::concat(&blocks) diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs b/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs index b100564223d3a..548327ecdaff5 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs @@ -207,10 +207,6 @@ impl AggregateFunction for AggregateNullUnaryAdapto self.0.drop_state(place); } - fn convert_const_to_full(&self) -> bool { - self.0.nested.convert_const_to_full() - } - fn get_if_condition(&self, columns: ProjectedBlock) -> Option { self.0.nested.get_if_condition(columns) } @@ -332,10 +328,6 @@ impl AggregateFunction self.0.drop_state(place); } - fn convert_const_to_full(&self) -> bool { - self.0.nested.convert_const_to_full() - } - fn get_if_condition(&self, columns: ProjectedBlock) -> Option { self.0.nested.get_if_condition(columns) } diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs b/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs index 3fd0cbcd70ccc..677bb7fc6f814 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs @@ -237,10 +237,6 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor { unsafe fn drop_state(&self, place: AggrState) { self.inner.drop_state(place.remove_last_loc()) } - - fn convert_const_to_full(&self) -> bool { - self.inner.convert_const_to_full() - } } impl fmt::Display for AggregateFunctionOrNullAdaptor { diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index 88aa39a5a51fa..125d18929ea91 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -33,7 +33,6 @@ use databend_common_pipeline_core::FinishedCallbackChain; use databend_common_pipeline_core::LockGuard; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_core::PlanProfile; -use fastrace::func_path; use fastrace::prelude::*; use futures::future::select; use futures_util::future::Either; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index 03b45b75d6607..3032176fe5de8 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -224,7 +224,7 @@ impl SerializeAggregateStream { // always return at least one block if self.nums == 0 { self.nums += 1; - let block = p.payload.empty_block(Some(1)); + let block = p.payload.empty_block(1); Ok(Some(block.add_meta(Some( AggregateSerdeMeta::create_agg_payload( p.bucket, From de267414e88bd3740bcf8fdbe6b62080759f179d Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 22 Jul 2025 19:39:29 +0800 Subject: [PATCH 5/5] null adaptor --- .../src/aggregate/aggregate_function.rs | 15 +-- .../expression/src/aggregate/payload_flush.rs | 4 +- .../adaptors/aggregate_null_adaptor.rs | 78 ++++++++++---- .../adaptors/aggregate_ornull_adaptor.rs | 100 ++++++++++++++++-- .../aggregator/transform_single_key.rs | 3 +- 5 files changed, 161 insertions(+), 39 deletions(-) diff --git a/src/query/expression/src/aggregate/aggregate_function.rs b/src/query/expression/src/aggregate/aggregate_function.rs index 6ba5b3f569c89..08e5ac3759337 100755 --- a/src/query/expression/src/aggregate/aggregate_function.rs +++ b/src/query/expression/src/aggregate/aggregate_function.rs @@ -73,8 +73,8 @@ pub trait AggregateFunction: fmt::Display + Sync + Send { vec![StateSerdeItem::Binary(self.serialize_size_per_row())] } - fn serialize(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()> { - let binary_builder = builder.as_tuple_mut().unwrap()[0].as_binary_mut().unwrap(); + fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> { + let binary_builder = builders[0].as_binary_mut().unwrap(); self.serialize_binary(place, &mut binary_builder.data)?; binary_builder.commit_row(); Ok(()) @@ -86,8 +86,8 @@ pub trait AggregateFunction: fmt::Display + Sync + Send { None } - fn merge(&self, place: AggrState, data: ScalarRef) -> Result<()> { - let mut binary = *data.as_tuple().unwrap()[0].as_binary().unwrap(); + fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> { + let mut binary = *data[0].as_binary().unwrap(); self.merge_binary(place, &mut binary) } @@ -102,7 +102,10 @@ pub trait AggregateFunction: fmt::Display + Sync + Send { ) -> Result<()> { let column = state.to_column(); for (place, data) in places.iter().zip(column.iter()) { - self.merge(AggrState::new(*place, loc), data)?; + self.merge( + AggrState::new(*place, loc), + data.as_tuple().unwrap().as_slice(), + )?; } Ok(()) @@ -111,7 +114,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send { fn batch_merge_single(&self, place: AggrState, state: &BlockEntry) -> Result<()> { let column = state.to_column(); for data in column.iter() { - self.merge(place, data)?; + self.merge(place, data.as_tuple().unwrap().as_slice())?; } Ok(()) } diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index 5863b533e31b9..f6aa0d65d5772 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -149,8 +149,8 @@ impl Payload { .enumerate() { { - let builder = &mut builders[idx]; - func.serialize(AggrState::new(*place, loc), builder)?; + let builders = builders[idx].as_tuple_mut().unwrap().as_mut_slice(); + func.serialize(AggrState::new(*place, loc), builders)?; } } } diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs b/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs index 548327ecdaff5..6a9ef13527f69 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs @@ -23,7 +23,8 @@ use databend_common_expression::utils::column_merge_validity; use databend_common_expression::ColumnBuilder; use databend_common_expression::ProjectedBlock; use databend_common_expression::Scalar; -use databend_common_io::prelude::BinaryWrite; +use databend_common_expression::ScalarRef; +use databend_common_expression::StateSerdeItem; use super::AggrState; use super::AggrStateLoc; @@ -183,12 +184,24 @@ impl AggregateFunction for AggregateNullUnaryAdapto .accumulate_row(place, not_null_columns, validity, row) } - fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.0.serialize(place, writer) + fn serialize_type(&self) -> Vec { + self.0.serialize_type() } - fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - self.0.merge(place, reader) + fn serialize(&self, place: AggrState, builder: &mut [ColumnBuilder]) -> Result<()> { + self.0.serialize(place, builder) + } + + fn serialize_binary(&self, _: AggrState, _: &mut Vec) -> Result<()> { + unreachable!() + } + + fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> { + self.0.merge(place, data) + } + + fn merge_binary(&self, _: AggrState, _: &mut &[u8]) -> Result<()> { + unreachable!() } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { @@ -304,12 +317,20 @@ impl AggregateFunction .accumulate_row(place, not_null_columns, validity, row) } - fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.0.serialize(place, writer) + fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> { + self.0.serialize(place, builders) + } + + fn serialize_binary(&self, _: AggrState, _: &mut Vec) -> Result<()> { + unreachable!() + } + + fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> { + self.0.merge(place, data) } - fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - self.0.merge(place, reader) + fn merge_binary(&self, _: AggrState, _: &mut &[u8]) -> Result<()> { + unreachable!() } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { @@ -488,24 +509,42 @@ impl CommonNullAdaptor { .accumulate_row(place.remove_last_loc(), not_null_columns, row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_type(&self) -> Vec { if !NULLABLE_RESULT { - return self.nested.serialize_binary(place, writer); + return self.nested.serialize_type(); } - self.nested - .serialize_binary(place.remove_last_loc(), writer)?; + .serialize_type() + .into_iter() + .chain(Some(StateSerdeItem::Bool)) + .collect() + } + + fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> { + if !NULLABLE_RESULT { + return self.nested.serialize(place, builders); + } + let n = builders.len(); + debug_assert_eq!(self.nested.serialize_type().len() + 1, n); + let flag = get_flag(place); - writer.write_scalar(&flag) + builders + .last_mut() + .and_then(ColumnBuilder::as_boolean_mut) + .unwrap() + .push(flag); + self.nested + .serialize(place.remove_last_loc(), &mut builders[..(n - 1)]) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> { if !NULLABLE_RESULT { - return self.nested.merge_binary(place, reader); + return self.nested.merge(place, data); } - let flag = reader[reader.len() - 1]; - if flag == 0 { + let n = data.len(); + let flag = *data.last().and_then(ScalarRef::as_boolean).unwrap(); + if !flag { return Ok(()); } @@ -514,8 +553,7 @@ impl CommonNullAdaptor { self.init_state(place); } set_flag(place, true); - self.nested - .merge_binary(place.remove_last_loc(), &mut &reader[..reader.len() - 1]) + self.nested.merge(place.remove_last_loc(), &data[..n - 1]) } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs b/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs index 677bb7fc6f814..4c6ee6e25b62b 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs @@ -23,7 +23,8 @@ use databend_common_expression::AggrStateType; use databend_common_expression::ColumnBuilder; use databend_common_expression::ProjectedBlock; use databend_common_expression::Scalar; -use databend_common_io::prelude::BinaryWrite; +use databend_common_expression::ScalarRef; +use databend_common_expression::StateSerdeItem; use super::AggrState; use super::AggrStateLoc; @@ -177,23 +178,45 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor { Ok(()) } - #[inline] - fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_type(&self) -> Vec { self.inner - .serialize_binary(place.remove_last_loc(), writer)?; - let flag = get_flag(place) as u8; - writer.write_scalar(&flag) + .serialize_type() + .into_iter() + .chain(Some(StateSerdeItem::Bool)) + .collect() } - #[inline] - fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - let flag = get_flag(place) || reader[reader.len() - 1] > 0; + fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> { + let n = builders.len(); + debug_assert_eq!(self.inner.serialize_type().len() + 1, n); + + let flag = get_flag(place); + builders + .last_mut() + .and_then(ColumnBuilder::as_boolean_mut) + .unwrap() + .push(flag); + + self.inner + .serialize(place.remove_last_loc(), &mut builders[..n - 1]) + } + + fn serialize_binary(&self, _: AggrState, _: &mut Vec) -> Result<()> { + unreachable!() + } + + fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> { + let flag = get_flag(place) || *data.last().and_then(ScalarRef::as_boolean).unwrap(); self.inner - .merge_binary(place.remove_last_loc(), &mut &reader[..reader.len() - 1])?; + .merge(place.remove_last_loc(), &data[..data.len() - 1])?; set_flag(place, flag); Ok(()) } + fn merge_binary(&self, _: AggrState, _: &mut &[u8]) -> Result<()> { + unreachable!() + } + fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { self.inner .merge_states(place.remove_last_loc(), rhs.remove_last_loc())?; @@ -237,6 +260,63 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor { unsafe fn drop_state(&self, place: AggrState) { self.inner.drop_state(place.remove_last_loc()) } + + fn batch_merge( + &self, + places: &[StateAddr], + loc: &[AggrStateLoc], + state: &databend_common_expression::BlockEntry, + ) -> Result<()> { + let column = state.to_column(); + for (place, data) in places.iter().zip(column.iter()) { + self.merge( + AggrState::new(*place, loc), + data.as_tuple().unwrap().as_slice(), + )?; + } + + Ok(()) + } + + fn batch_merge_single( + &self, + place: AggrState, + state: &databend_common_expression::BlockEntry, + ) -> Result<()> { + let column = state.to_column(); + for data in column.iter() { + self.merge(place, data.as_tuple().unwrap().as_slice())?; + } + Ok(()) + } + + fn batch_merge_states( + &self, + places: &[StateAddr], + rhses: &[StateAddr], + loc: &[AggrStateLoc], + ) -> Result<()> { + for (place, rhs) in places.iter().zip(rhses.iter()) { + self.merge_states(AggrState::new(*place, loc), AggrState::new(*rhs, loc))?; + } + Ok(()) + } + + fn batch_merge_result( + &self, + places: &[StateAddr], + loc: Box<[AggrStateLoc]>, + builder: &mut ColumnBuilder, + ) -> Result<()> { + for place in places { + self.merge_result(AggrState::new(*place, &loc), builder)?; + } + Ok(()) + } + + fn get_if_condition(&self, _columns: ProjectedBlock) -> Option { + None + } } impl fmt::Display for AggregateFunctionOrNullAdaptor { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs index 9b5fba779986a..549a2ebc9765b 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs @@ -155,7 +155,8 @@ impl AccumulatingTransform for PartialSingleStateAggregator { ) .zip(builders.iter_mut()) { - func.serialize(place, builder)?; + let builders = builder.as_tuple_mut().unwrap().as_mut_slice(); + func.serialize(place, builders)?; } let columns = builders.into_iter().map(|b| b.build()).collect();