diff --git a/src/common/base/src/runtime/profile/profile.rs b/src/common/base/src/runtime/profile/profile.rs index 58cfb503be56a..1dc288e7303f0 100644 --- a/src/common/base/src/runtime/profile/profile.rs +++ b/src/common/base/src/runtime/profile/profile.rs @@ -23,7 +23,7 @@ use crate::runtime::metrics::ScopedRegistry; use crate::runtime::profile::ProfileStatisticsName; use crate::runtime::ThreadTracker; -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ProfileLabel { pub name: String, pub value: Vec, diff --git a/src/common/storage/src/copy.rs b/src/common/storage/src/copy.rs index 927835fdb496a..a3464e33ed4fe 100644 --- a/src/common/storage/src/copy.rs +++ b/src/common/storage/src/copy.rs @@ -20,7 +20,7 @@ use serde::Deserialize; use serde::Serialize; use thiserror::Error; -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct CopyStatus { /// Key is file path. pub files: DashMap, @@ -45,7 +45,7 @@ impl CopyStatus { } } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct FileStatus { pub num_rows_loaded: usize, pub error: Option, @@ -79,7 +79,7 @@ impl FileStatus { } } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileErrorsInfo { pub num_errors: usize, pub first_error: FileParseErrorAtLine, @@ -156,7 +156,7 @@ impl FileParseError { } } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileParseErrorAtLine { pub error: FileParseError, pub line: usize, diff --git a/src/common/storage/src/merge.rs b/src/common/storage/src/merge.rs index 9f4fe94080bc4..50125e8823c90 100644 --- a/src/common/storage/src/merge.rs +++ b/src/common/storage/src/merge.rs @@ -15,7 +15,7 @@ use serde::Deserialize; use serde::Serialize; -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct MutationStatus { pub insert_rows: u64, pub deleted_rows: u64, diff --git a/src/query/catalog/src/statistics/data_cache_statistics.rs b/src/query/catalog/src/statistics/data_cache_statistics.rs index 04ae435204eb5..6838a93cc212d 100644 --- a/src/query/catalog/src/statistics/data_cache_statistics.rs +++ b/src/query/catalog/src/statistics/data_cache_statistics.rs @@ -24,7 +24,7 @@ pub struct DataCacheMetrics { bytes_from_memory: AtomicUsize, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct DataCacheMetricValues { pub bytes_from_remote_disk: usize, pub bytes_from_local_disk: usize, diff --git a/src/query/expression/src/aggregate/aggregate_function.rs b/src/query/expression/src/aggregate/aggregate_function.rs index fe446c3ed1cf4..08e5ac3759337 100755 --- a/src/query/expression/src/aggregate/aggregate_function.rs +++ b/src/query/expression/src/aggregate/aggregate_function.rs @@ -22,13 +22,13 @@ use super::AggrState; use super::AggrStateLoc; use super::AggrStateRegistry; use super::StateAddr; -use crate::types::BinaryType; use crate::types::DataType; use crate::BlockEntry; use crate::ColumnBuilder; -use crate::ColumnView; use crate::ProjectedBlock; use crate::Scalar; +use crate::ScalarRef; +use crate::StateSerdeItem; pub type AggregateFunctionRef = Arc; @@ -69,32 +69,52 @@ pub trait AggregateFunction: fmt::Display + Sync + Send { // Used in aggregate_null_adaptor fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()>; - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()>; + fn serialize_type(&self) -> Vec { + vec![StateSerdeItem::Binary(self.serialize_size_per_row())] + } + + fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> { + let binary_builder = builders[0].as_binary_mut().unwrap(); + self.serialize_binary(place, &mut binary_builder.data)?; + binary_builder.commit_row(); + Ok(()) + } + + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()>; fn serialize_size_per_row(&self) -> Option { None } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()>; + fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> { + let mut binary = *data[0].as_binary().unwrap(); + self.merge_binary(place, &mut binary) + } + + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()>; /// Batch merge and deserialize the state from binary array fn batch_merge( &self, places: &[StateAddr], loc: &[AggrStateLoc], - state: &ColumnView, + state: &BlockEntry, ) -> Result<()> { - for (place, mut data) in places.iter().zip(state.iter()) { - self.merge(AggrState::new(*place, loc), &mut data)?; + let column = state.to_column(); + for (place, data) in places.iter().zip(column.iter()) { + self.merge( + AggrState::new(*place, loc), + data.as_tuple().unwrap().as_slice(), + )?; } Ok(()) } fn batch_merge_single(&self, place: AggrState, state: &BlockEntry) -> Result<()> { - let view = state.downcast::().unwrap(); - for mut data in view.iter() { - self.merge(place, &mut data)?; + let column = state.to_column(); + for data in column.iter() { + self.merge(place, data.as_tuple().unwrap().as_slice())?; } Ok(()) } @@ -149,9 +169,4 @@ pub trait AggregateFunction: fmt::Display + Sync + Send { fn get_if_condition(&self, _columns: ProjectedBlock) -> Option { None } - - // some features - fn convert_const_to_full(&self) -> bool { - true - } } diff --git a/src/query/expression/src/aggregate/aggregate_function_state.rs b/src/query/expression/src/aggregate/aggregate_function_state.rs index 5d1fe21ac4379..791ee68542ff7 100644 --- a/src/query/expression/src/aggregate/aggregate_function_state.rs +++ b/src/query/expression/src/aggregate/aggregate_function_state.rs @@ -20,6 +20,8 @@ use enum_as_inner::EnumAsInner; use super::AggregateFunctionRef; use crate::types::binary::BinaryColumnBuilder; +use crate::types::DataType; +use crate::ColumnBuilder; #[derive(Clone, Copy, Debug)] pub struct StateAddr { @@ -113,11 +115,11 @@ impl From for usize { pub fn get_states_layout(funcs: &[AggregateFunctionRef]) -> Result { let mut registry = AggrStateRegistry::default(); - let mut serialize_size = Vec::with_capacity(funcs.len()); + let mut serialize_type = Vec::with_capacity(funcs.len()); for func in funcs { func.register_state(&mut registry); registry.commit(); - serialize_size.push(func.serialize_size_per_row()); + serialize_type.push(StateSerdeType(func.serialize_type().into())); } let AggrStateRegistry { states, offsets } = registry; @@ -132,7 +134,7 @@ pub fn get_states_layout(funcs: &[AggregateFunctionRef]) -> Result Ok(StatesLayout { layout, states_loc, - serialize_size, + serialize_type, }) } @@ -191,18 +193,58 @@ impl AggrStateLoc { } } +#[derive(Debug, Clone, Copy)] +pub enum StateSerdeItem { + Bool, + Binary(Option), +} + +#[derive(Debug, Clone)] +pub struct StateSerdeType(Box<[StateSerdeItem]>); + +impl StateSerdeType { + pub fn data_type(&self) -> DataType { + DataType::Tuple( + self.0 + .iter() + .map(|item| match item { + StateSerdeItem::Bool => DataType::Boolean, + StateSerdeItem::Binary(_) => DataType::Binary, + }) + .collect(), + ) + } +} + #[derive(Debug, Clone)] pub struct StatesLayout { pub layout: Layout, pub states_loc: Vec>, - serialize_size: Vec>, + pub(super) serialize_type: Vec, } impl StatesLayout { - pub fn serialize_builders(&self, num_rows: usize) -> Vec { - self.serialize_size + pub fn serialize_builders(&self, num_rows: usize) -> Vec { + self.serialize_type .iter() - .map(|size| BinaryColumnBuilder::with_capacity(num_rows, num_rows * size.unwrap_or(0))) + .map(|serde_type| { + let builder = serde_type + .0 + .iter() + .map(|item| match item { + StateSerdeItem::Bool => { + ColumnBuilder::with_capacity(&DataType::Boolean, num_rows) + } + StateSerdeItem::Binary(size) => { + ColumnBuilder::Binary(BinaryColumnBuilder::with_capacity( + num_rows, + num_rows * size.unwrap_or(0), + )) + } + }) + .collect(); + ColumnBuilder::Tuple(builder) + }) .collect() } } diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 11907059ef90e..a605227b7f5ce 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -27,7 +27,6 @@ use crate::aggregate::payload_row::row_match_columns; use crate::group_hash_columns; use crate::new_sel; use crate::read; -use crate::types::BinaryType; use crate::types::DataType; use crate::AggregateFunctionRef; use crate::BlockEntry; @@ -219,7 +218,7 @@ impl AggregateHashTable { .zip(agg_states.iter()) .zip(states_layout.states_loc.iter()) { - func.batch_merge(state_places, loc, &state.downcast::().unwrap())?; + func.batch_merge(state_places, loc, state)?; } } } diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index aafe2921c8532..925293beb214c 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -421,11 +421,15 @@ impl Payload { true } - pub fn empty_block(&self, fake_rows: Option) -> DataBlock { - let fake_rows = fake_rows.unwrap_or(0); - let entries = (0..self.aggrs.len()) - .map(|_| { - ColumnBuilder::repeat_default(&DataType::Binary, fake_rows) + pub fn empty_block(&self, fake_rows: usize) -> DataBlock { + assert_eq!(self.aggrs.is_empty(), self.states_layout.is_none()); + let entries = self + .states_layout + .as_ref() + .iter() + .flat_map(|layout| layout.serialize_type.iter()) + .map(|serde_type| { + ColumnBuilder::repeat_default(&serde_type.data_type(), fake_rows) .build() .into() }) diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index 43dabd3b519c2..f6aa0d65d5772 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -125,7 +125,7 @@ impl Payload { } if blocks.is_empty() { - return Ok(self.empty_block(None)); + return Ok(self.empty_block(0)); } DataBlock::concat(&blocks) } @@ -149,18 +149,13 @@ impl Payload { .enumerate() { { - let builder = &mut builders[idx]; - func.serialize(AggrState::new(*place, loc), &mut builder.data)?; - builder.commit_row(); + let builders = builders[idx].as_tuple_mut().unwrap().as_mut_slice(); + func.serialize(AggrState::new(*place, loc), builders)?; } } } - entries.extend( - builders - .into_iter() - .map(|builder| Column::Binary(builder.build()).into()), - ); + entries.extend(builders.into_iter().map(|builder| builder.build().into())); } entries.extend_from_slice(&state.take_group_columns()); @@ -177,7 +172,7 @@ impl Payload { } if blocks.is_empty() { - return Ok(self.empty_block(None)); + return Ok(self.empty_block(0)); } DataBlock::concat(&blocks) diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs b/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs index 18af505c1241a..6a9ef13527f69 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs @@ -23,7 +23,8 @@ use databend_common_expression::utils::column_merge_validity; use databend_common_expression::ColumnBuilder; use databend_common_expression::ProjectedBlock; use databend_common_expression::Scalar; -use databend_common_io::prelude::BinaryWrite; +use databend_common_expression::ScalarRef; +use databend_common_expression::StateSerdeItem; use super::AggrState; use super::AggrStateLoc; @@ -183,12 +184,24 @@ impl AggregateFunction for AggregateNullUnaryAdapto .accumulate_row(place, not_null_columns, validity, row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.0.serialize(place, writer) + fn serialize_type(&self) -> Vec { + self.0.serialize_type() } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - self.0.merge(place, reader) + fn serialize(&self, place: AggrState, builder: &mut [ColumnBuilder]) -> Result<()> { + self.0.serialize(place, builder) + } + + fn serialize_binary(&self, _: AggrState, _: &mut Vec) -> Result<()> { + unreachable!() + } + + fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> { + self.0.merge(place, data) + } + + fn merge_binary(&self, _: AggrState, _: &mut &[u8]) -> Result<()> { + unreachable!() } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { @@ -207,10 +220,6 @@ impl AggregateFunction for AggregateNullUnaryAdapto self.0.drop_state(place); } - fn convert_const_to_full(&self) -> bool { - self.0.nested.convert_const_to_full() - } - fn get_if_condition(&self, columns: ProjectedBlock) -> Option { self.0.nested.get_if_condition(columns) } @@ -308,12 +317,20 @@ impl AggregateFunction .accumulate_row(place, not_null_columns, validity, row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.0.serialize(place, writer) + fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> { + self.0.serialize(place, builders) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - self.0.merge(place, reader) + fn serialize_binary(&self, _: AggrState, _: &mut Vec) -> Result<()> { + unreachable!() + } + + fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> { + self.0.merge(place, data) + } + + fn merge_binary(&self, _: AggrState, _: &mut &[u8]) -> Result<()> { + unreachable!() } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { @@ -332,10 +349,6 @@ impl AggregateFunction self.0.drop_state(place); } - fn convert_const_to_full(&self) -> bool { - self.0.nested.convert_const_to_full() - } - fn get_if_condition(&self, columns: ProjectedBlock) -> Option { self.0.nested.get_if_condition(columns) } @@ -496,23 +509,42 @@ impl CommonNullAdaptor { .accumulate_row(place.remove_last_loc(), not_null_columns, row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_type(&self) -> Vec { + if !NULLABLE_RESULT { + return self.nested.serialize_type(); + } + self.nested + .serialize_type() + .into_iter() + .chain(Some(StateSerdeItem::Bool)) + .collect() + } + + fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> { if !NULLABLE_RESULT { - return self.nested.serialize(place, writer); + return self.nested.serialize(place, builders); } + let n = builders.len(); + debug_assert_eq!(self.nested.serialize_type().len() + 1, n); - self.nested.serialize(place.remove_last_loc(), writer)?; let flag = get_flag(place); - writer.write_scalar(&flag) + builders + .last_mut() + .and_then(ColumnBuilder::as_boolean_mut) + .unwrap() + .push(flag); + self.nested + .serialize(place.remove_last_loc(), &mut builders[..(n - 1)]) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> { if !NULLABLE_RESULT { - return self.nested.merge(place, reader); + return self.nested.merge(place, data); } - let flag = reader[reader.len() - 1]; - if flag == 0 { + let n = data.len(); + let flag = *data.last().and_then(ScalarRef::as_boolean).unwrap(); + if !flag { return Ok(()); } @@ -521,8 +553,7 @@ impl CommonNullAdaptor { self.init_state(place); } set_flag(place, true); - self.nested - .merge(place.remove_last_loc(), &mut &reader[..reader.len() - 1]) + self.nested.merge(place.remove_last_loc(), &data[..n - 1]) } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs b/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs index c7f9d4c4677d4..4c6ee6e25b62b 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs @@ -23,7 +23,8 @@ use databend_common_expression::AggrStateType; use databend_common_expression::ColumnBuilder; use databend_common_expression::ProjectedBlock; use databend_common_expression::Scalar; -use databend_common_io::prelude::BinaryWrite; +use databend_common_expression::ScalarRef; +use databend_common_expression::StateSerdeItem; use super::AggrState; use super::AggrStateLoc; @@ -177,22 +178,45 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor { Ok(()) } - #[inline] - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.inner.serialize(place.remove_last_loc(), writer)?; - let flag = get_flag(place) as u8; - writer.write_scalar(&flag) + fn serialize_type(&self) -> Vec { + self.inner + .serialize_type() + .into_iter() + .chain(Some(StateSerdeItem::Bool)) + .collect() } - #[inline] - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - let flag = get_flag(place) || reader[reader.len() - 1] > 0; + fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> { + let n = builders.len(); + debug_assert_eq!(self.inner.serialize_type().len() + 1, n); + + let flag = get_flag(place); + builders + .last_mut() + .and_then(ColumnBuilder::as_boolean_mut) + .unwrap() + .push(flag); + self.inner - .merge(place.remove_last_loc(), &mut &reader[..reader.len() - 1])?; + .serialize(place.remove_last_loc(), &mut builders[..n - 1]) + } + + fn serialize_binary(&self, _: AggrState, _: &mut Vec) -> Result<()> { + unreachable!() + } + + fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> { + let flag = get_flag(place) || *data.last().and_then(ScalarRef::as_boolean).unwrap(); + self.inner + .merge(place.remove_last_loc(), &data[..data.len() - 1])?; set_flag(place, flag); Ok(()) } + fn merge_binary(&self, _: AggrState, _: &mut &[u8]) -> Result<()> { + unreachable!() + } + fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { self.inner .merge_states(place.remove_last_loc(), rhs.remove_last_loc())?; @@ -237,8 +261,61 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor { self.inner.drop_state(place.remove_last_loc()) } - fn convert_const_to_full(&self) -> bool { - self.inner.convert_const_to_full() + fn batch_merge( + &self, + places: &[StateAddr], + loc: &[AggrStateLoc], + state: &databend_common_expression::BlockEntry, + ) -> Result<()> { + let column = state.to_column(); + for (place, data) in places.iter().zip(column.iter()) { + self.merge( + AggrState::new(*place, loc), + data.as_tuple().unwrap().as_slice(), + )?; + } + + Ok(()) + } + + fn batch_merge_single( + &self, + place: AggrState, + state: &databend_common_expression::BlockEntry, + ) -> Result<()> { + let column = state.to_column(); + for data in column.iter() { + self.merge(place, data.as_tuple().unwrap().as_slice())?; + } + Ok(()) + } + + fn batch_merge_states( + &self, + places: &[StateAddr], + rhses: &[StateAddr], + loc: &[AggrStateLoc], + ) -> Result<()> { + for (place, rhs) in places.iter().zip(rhses.iter()) { + self.merge_states(AggrState::new(*place, loc), AggrState::new(*rhs, loc))?; + } + Ok(()) + } + + fn batch_merge_result( + &self, + places: &[StateAddr], + loc: Box<[AggrStateLoc]>, + builder: &mut ColumnBuilder, + ) -> Result<()> { + for place in places { + self.merge_result(AggrState::new(*place, &loc), builder)?; + } + Ok(()) + } + + fn get_if_condition(&self, _columns: ProjectedBlock) -> Option { + None } } diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_sort_adaptor.rs b/src/query/functions/src/aggregates/adaptors/aggregate_sort_adaptor.rs index 4894fc24a9c84..20b11214d15fc 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_sort_adaptor.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_sort_adaptor.rs @@ -121,12 +121,12 @@ impl AggregateFunction for AggregateFunctionSortAdaptor { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = Self::get_state(place); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = Self::get_state(place); let rhs = SortAggState::deserialize(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_arg_min_max.rs b/src/query/functions/src/aggregates/aggregate_arg_min_max.rs index 98506ab12b8b5..5311335b6b229 100644 --- a/src/query/functions/src/aggregates/aggregate_arg_min_max.rs +++ b/src/query/functions/src/aggregates/aggregate_arg_min_max.rs @@ -270,12 +270,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: State = borsh_partial_deserialize(reader)?; state.merge_from(rhs) diff --git a/src/query/functions/src/aggregates/aggregate_array_agg.rs b/src/query/functions/src/aggregates/aggregate_array_agg.rs index 7083b886996df..ddeba25f9c86f 100644 --- a/src/query/functions/src/aggregates/aggregate_array_agg.rs +++ b/src/query/functions/src/aggregates/aggregate_array_agg.rs @@ -548,12 +548,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs = State::deserialize_reader(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_array_moving.rs b/src/query/functions/src/aggregates/aggregate_array_moving.rs index 6ef407c1402e2..0070924a974df 100644 --- a/src/query/functions/src/aggregates/aggregate_array_moving.rs +++ b/src/query/functions/src/aggregates/aggregate_array_moving.rs @@ -447,12 +447,12 @@ where State: SumState state.accumulate_row(&columns[0], row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs = State::deserialize_reader(reader)?; @@ -616,12 +616,12 @@ where State: SumState state.accumulate_row(&columns[0], row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs = State::deserialize_reader(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_bitmap.rs b/src/query/functions/src/aggregates/aggregate_bitmap.rs index cce9492c709d8..3f56ff133296b 100644 --- a/src/query/functions/src/aggregates/aggregate_bitmap.rs +++ b/src/query/functions/src/aggregates/aggregate_bitmap.rs @@ -287,7 +287,7 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); // flag indicate where bitmap is none let flag: u8 = if state.rb.is_some() { 1 } else { 0 }; @@ -298,7 +298,7 @@ where Ok(()) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let flag = reader[0]; @@ -482,12 +482,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.inner.serialize(place, writer) + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + self.inner.serialize_binary(place, writer) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - self.inner.merge(place, reader) + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + self.inner.merge_binary(place, reader) } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { diff --git a/src/query/functions/src/aggregates/aggregate_combinator_distinct.rs b/src/query/functions/src/aggregates/aggregate_combinator_distinct.rs index 77a11ec9b44f8..d74b9a32f661a 100644 --- a/src/query/functions/src/aggregates/aggregate_combinator_distinct.rs +++ b/src/query/functions/src/aggregates/aggregate_combinator_distinct.rs @@ -106,12 +106,12 @@ where State: DistinctStateFunc state.add(columns, row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = Self::get_state(place); state.serialize(writer) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = Self::get_state(place); let rhs = State::deserialize(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_combinator_if.rs b/src/query/functions/src/aggregates/aggregate_combinator_if.rs index ea83b8028d23e..03bbdf625e4df 100644 --- a/src/query/functions/src/aggregates/aggregate_combinator_if.rs +++ b/src/query/functions/src/aggregates/aggregate_combinator_if.rs @@ -155,12 +155,12 @@ impl AggregateFunction for AggregateIfCombinator { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.nested.serialize(place, writer) + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + self.nested.serialize_binary(place, writer) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - self.nested.merge(place, reader) + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + self.nested.merge_binary(place, reader) } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { diff --git a/src/query/functions/src/aggregates/aggregate_combinator_state.rs b/src/query/functions/src/aggregates/aggregate_combinator_state.rs index 486707ec7af61..e83d78f24164c 100644 --- a/src/query/functions/src/aggregates/aggregate_combinator_state.rs +++ b/src/query/functions/src/aggregates/aggregate_combinator_state.rs @@ -106,13 +106,13 @@ impl AggregateFunction for AggregateStateCombinator { self.nested.accumulate_row(place, columns, row) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { - self.nested.serialize(place, writer) + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + self.nested.serialize_binary(place, writer) } #[inline] - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { - self.nested.merge(place, reader) + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + self.nested.merge_binary(place, reader) } fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> { @@ -121,7 +121,7 @@ impl AggregateFunction for AggregateStateCombinator { fn merge_result(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()> { let builder = builder.as_binary_mut().unwrap(); - self.nested.serialize(place, &mut builder.data)?; + self.nested.serialize_binary(place, &mut builder.data)?; builder.commit_row(); Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_count.rs b/src/query/functions/src/aggregates/aggregate_count.rs index b822b832896ee..9e1db7e50a989 100644 --- a/src/query/functions/src/aggregates/aggregate_count.rs +++ b/src/query/functions/src/aggregates/aggregate_count.rs @@ -160,12 +160,12 @@ impl AggregateFunction for AggregateCountFunction { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.count.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let other: u64 = borsh_partial_deserialize(reader)?; state.count += other; diff --git a/src/query/functions/src/aggregates/aggregate_covariance.rs b/src/query/functions/src/aggregates/aggregate_covariance.rs index 8af1216f56567..c69d242ea811a 100644 --- a/src/query/functions/src/aggregates/aggregate_covariance.rs +++ b/src/query/functions/src/aggregates/aggregate_covariance.rs @@ -231,12 +231,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: AggregateCovarianceState = borsh_partial_deserialize(reader)?; state.merge(&rhs); diff --git a/src/query/functions/src/aggregates/aggregate_json_array_agg.rs b/src/query/functions/src/aggregates/aggregate_json_array_agg.rs index f1ba8d35807fa..acd867e4f2a8e 100644 --- a/src/query/functions/src/aggregates/aggregate_json_array_agg.rs +++ b/src/query/functions/src/aggregates/aggregate_json_array_agg.rs @@ -240,12 +240,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: State = borsh_partial_deserialize(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_json_object_agg.rs b/src/query/functions/src/aggregates/aggregate_json_object_agg.rs index 9efe5fc0f6b90..45179f529cee2 100644 --- a/src/query/functions/src/aggregates/aggregate_json_object_agg.rs +++ b/src/query/functions/src/aggregates/aggregate_json_object_agg.rs @@ -293,12 +293,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: State = borsh_partial_deserialize(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_markov_tarin.rs b/src/query/functions/src/aggregates/aggregate_markov_tarin.rs index 0524caa523e39..285965bcbafa8 100644 --- a/src/query/functions/src/aggregates/aggregate_markov_tarin.rs +++ b/src/query/functions/src/aggregates/aggregate_markov_tarin.rs @@ -113,12 +113,12 @@ impl AggregateFunction for MarkovTarin { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let mut rhs = borsh_partial_deserialize::(reader)?; state.merge(&mut rhs); diff --git a/src/query/functions/src/aggregates/aggregate_null_result.rs b/src/query/functions/src/aggregates/aggregate_null_result.rs index 03f9b6a1ce041..74e2942e9bceb 100644 --- a/src/query/functions/src/aggregates/aggregate_null_result.rs +++ b/src/query/functions/src/aggregates/aggregate_null_result.rs @@ -86,11 +86,11 @@ impl AggregateFunction for AggregateNullResultFunction { Ok(()) } - fn serialize(&self, _place: AggrState, _writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, _place: AggrState, _writer: &mut Vec) -> Result<()> { Ok(()) } - fn merge(&self, _place: AggrState, _reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, _place: AggrState, _reader: &mut &[u8]) -> Result<()> { Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs b/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs index 5aadd1887b5db..d982f8b80ca14 100644 --- a/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs +++ b/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs @@ -362,12 +362,12 @@ where for<'a> T: AccessType = F64> + Send + Sync }); Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let mut rhs: QuantileTDigestState = borsh_partial_deserialize(reader)?; state.merge(&mut rhs) diff --git a/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs b/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs index 7ac562153fe53..d4aa266fd1a5e 100644 --- a/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs +++ b/src/query/functions/src/aggregates/aggregate_quantile_tdigest_weighted.rs @@ -143,12 +143,12 @@ where }); Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let mut rhs: QuantileTDigestState = borsh_partial_deserialize(reader)?; state.merge(&mut rhs) diff --git a/src/query/functions/src/aggregates/aggregate_retention.rs b/src/query/functions/src/aggregates/aggregate_retention.rs index 5f42198bec4c0..5b904f234e740 100644 --- a/src/query/functions/src/aggregates/aggregate_retention.rs +++ b/src/query/functions/src/aggregates/aggregate_retention.rs @@ -143,12 +143,12 @@ impl AggregateFunction for AggregateRetentionFunction { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: AggregateRetentionState = borsh_partial_deserialize(reader)?; state.merge(&rhs); diff --git a/src/query/functions/src/aggregates/aggregate_st_collect.rs b/src/query/functions/src/aggregates/aggregate_st_collect.rs index afdb3d88a373e..bfe960471c5ee 100644 --- a/src/query/functions/src/aggregates/aggregate_st_collect.rs +++ b/src/query/functions/src/aggregates/aggregate_st_collect.rs @@ -306,12 +306,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: State = borsh_partial_deserialize(reader)?; diff --git a/src/query/functions/src/aggregates/aggregate_string_agg.rs b/src/query/functions/src/aggregates/aggregate_string_agg.rs index 34a530c0253f7..743c2592d0b9b 100644 --- a/src/query/functions/src/aggregates/aggregate_string_agg.rs +++ b/src/query/functions/src/aggregates/aggregate_string_agg.rs @@ -147,12 +147,12 @@ impl AggregateFunction for AggregateStringAggFunction { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs: StringAggState = borsh_partial_deserialize(reader)?; state.values.push_str(&rhs.values); diff --git a/src/query/functions/src/aggregates/aggregate_unary.rs b/src/query/functions/src/aggregates/aggregate_unary.rs index 20b9fc5f85a70..be20d15157534 100644 --- a/src/query/functions/src/aggregates/aggregate_unary.rs +++ b/src/query/functions/src/aggregates/aggregate_unary.rs @@ -227,12 +227,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state: &mut S = place.get::(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state: &mut S = place.get::(); let rhs = S::deserialize_reader(reader)?; state.merge(&rhs) diff --git a/src/query/functions/src/aggregates/aggregate_window_funnel.rs b/src/query/functions/src/aggregates/aggregate_window_funnel.rs index 3d29c1734bff8..864e5d0eaeea5 100644 --- a/src/query/functions/src/aggregates/aggregate_window_funnel.rs +++ b/src/query/functions/src/aggregates/aggregate_window_funnel.rs @@ -275,12 +275,12 @@ where Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::>(); Ok(state.serialize(writer)?) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::>(); let mut rhs: AggregateWindowFunnelState = borsh_partial_deserialize(reader)?; state.merge(&mut rhs); diff --git a/src/query/functions/src/aggregates/aggregator_common.rs b/src/query/functions/src/aggregates/aggregator_common.rs index 6430bcf3cc647..8d4255a991983 100644 --- a/src/query/functions/src/aggregates/aggregator_common.rs +++ b/src/query/functions/src/aggregates/aggregator_common.rs @@ -187,9 +187,9 @@ pub fn eval_aggr_for_test( func.accumulate(state, entries.into(), None, rows)?; if with_serialize { let mut buf = vec![]; - func.serialize(state, &mut buf)?; + func.serialize_binary(state, &mut buf)?; func.init_state(state); - func.merge(state, &mut buf.as_slice())?; + func.merge_binary(state, &mut buf.as_slice())?; } let mut builder = ColumnBuilder::with_capacity(&data_type, 1024); func.merge_result(state, &mut builder)?; diff --git a/src/query/pipeline/core/src/processors/processor.rs b/src/query/pipeline/core/src/processors/processor.rs index ce70053b80ded..bb71f493a2061 100644 --- a/src/query/pipeline/core/src/processors/processor.rs +++ b/src/query/pipeline/core/src/processors/processor.rs @@ -161,12 +161,24 @@ impl ProcessorPtr { /// # Safety pub unsafe fn process(&self) -> Result<()> { - let mut name = self.name(); - name.push_str("::process"); - let _span = LocalSpan::enter_with_local_parent(name) + let span = LocalSpan::enter_with_local_parent(format!("{}::process", self.name())) .with_property(|| ("graph-node-id", self.id().index().to_string())); - (*self.inner.get()).process() + match (*self.inner.get()).process() { + Ok(_) => Ok(()), + Err(err) => { + let _ = span + .with_property(|| ("error", "true")) + .with_properties(|| { + [ + ("error.type", err.code().to_string()), + ("error.message", err.display_text()), + ] + }); + log::info!(error = err.to_string(); "[PIPELINE-EXECUTOR] Error in process"); + Err(err) + } + } } /// # Safety @@ -190,10 +202,23 @@ impl ProcessorPtr { async move { let span = Span::enter_with_local_parent(name) .with_property(|| ("graph-node-id", id.index().to_string())); - task.in_span(span).await?; - drop(inner); - Ok(()) + match task.await { + Ok(_) => { + drop(inner); + Ok(()) + } + Err(err) => { + span.with_property(|| ("error", "true")).add_properties(|| { + [ + ("error.type", err.code().to_string()), + ("error.message", err.display_text()), + ] + }); + log::info!(error = err.to_string(); "[PIPELINE-EXECUTOR] Error in process"); + Err(err) + } + } } .boxed() } diff --git a/src/query/pipeline/core/src/processors/profile.rs b/src/query/pipeline/core/src/processors/profile.rs index 7fb5af4f4e8be..58b0fd98169ab 100644 --- a/src/query/pipeline/core/src/processors/profile.rs +++ b/src/query/pipeline/core/src/processors/profile.rs @@ -43,7 +43,7 @@ impl Drop for PlanScopeGuard { } } -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct ErrorInfoDesc { message: String, detail: String, @@ -60,7 +60,7 @@ impl ErrorInfoDesc { } } -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum ErrorInfo { Other(ErrorInfoDesc), IoError(ErrorInfoDesc), @@ -68,7 +68,7 @@ pub enum ErrorInfo { CalculationError(ErrorInfoDesc), } -#[derive(Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PlanProfile { pub id: Option, pub name: Option, diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 53a82eba8da10..7117dadb04634 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -146,6 +146,7 @@ impl PipelineExecutor { } } + #[fastrace::trace(name = "PipelineExecutor::init")] fn init(on_init_callback: &Mutex>, query_id: &Arc) -> Result<()> { // TODO: the on init callback cannot be killed. { @@ -158,10 +159,8 @@ impl PipelineExecutor { } } - info!( - "[PIPELINE-EXECUTOR] Pipeline initialized successfully for query {}, elapsed: {:?}", - query_id, - instant.elapsed() + info!(query_id, elapsed:? = instant.elapsed(); + "[PIPELINE-EXECUTOR] Pipeline initialized successfully", ); } Ok(()) diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index b45e707f30cc8..125d18929ea91 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -33,7 +33,6 @@ use databend_common_pipeline_core::FinishedCallbackChain; use databend_common_pipeline_core::LockGuard; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_core::PlanProfile; -use fastrace::func_path; use fastrace::prelude::*; use futures::future::select; use futures_util::future::Either; @@ -266,6 +265,7 @@ impl QueryPipelineExecutor { Ok(()) } + #[fastrace::trace(name = "QueryPipelineExecutor::init")] fn init(self: &Arc, graph: Arc) -> Result<()> { unsafe { // TODO: the on init callback cannot be killed. @@ -286,10 +286,8 @@ impl QueryPipelineExecutor { } } - info!( - "[PIPELINE-EXECUTOR] Pipeline initialized successfully for query {}, elapsed: {:?}", - self.settings.query_id, - instant.elapsed() + info!(query_id = self.settings.query_id, elapsed:? = instant.elapsed(); + "[PIPELINE-EXECUTOR] Pipeline initialized successfully", ); } @@ -358,7 +356,7 @@ impl QueryPipelineExecutor { } } - let span = Span::enter_with_local_parent(func_path!()) + let span = Span::enter_with_local_parent("QueryPipelineExecutor::execute_threads") .with_property(|| ("thread_name", name.clone())); thread_join_handles.push(Thread::named_spawn(Some(name), move || unsafe { let _g = span.set_local_parent(); @@ -367,6 +365,12 @@ impl QueryPipelineExecutor { // finish the pipeline executor when has error or panic if let Err(cause) = try_result.flatten() { + span.with_property(|| ("error", "true")).add_properties(|| { + [ + ("error.type", cause.code().to_string()), + ("error.message", cause.display_text()), + ] + }); this.finish(Some(cause)); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index 096485fa98fcc..3032176fe5de8 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -121,26 +121,19 @@ impl Processor for TransformAggregateSerializer { impl TransformAggregateSerializer { fn transform_input_data(&mut self, mut data_block: DataBlock) -> Result { debug_assert!(data_block.is_empty()); - if let Some(block_meta) = data_block.take_meta() { - if let Some(block_meta) = AggregateMeta::downcast_from(block_meta) { - match block_meta { - AggregateMeta::Spilled(_) => unreachable!(), - AggregateMeta::Serialized(_) => unreachable!(), - AggregateMeta::BucketSpilled(_) => unreachable!(), - AggregateMeta::Partitioned { .. } => unreachable!(), - AggregateMeta::AggregateSpilling(_) => unreachable!(), - AggregateMeta::AggregatePayload(p) => { - self.input_data = Some(SerializeAggregateStream::create( - &self.params, - SerializePayload::AggregatePayload(p), - )); - return Ok(Event::Sync); - } - } - } - } - unreachable!() + let Some(AggregateMeta::AggregatePayload(p)) = data_block + .take_meta() + .and_then(AggregateMeta::downcast_from) + else { + unreachable!() + }; + + self.input_data = Some(SerializeAggregateStream::create( + &self.params, + SerializePayload::AggregatePayload(p), + )); + Ok(Event::Sync) } } @@ -218,41 +211,29 @@ impl SerializeAggregateStream { return Ok(None); } - match self.payload.as_ref().get_ref() { - SerializePayload::AggregatePayload(p) => { - let block = p.payload.aggregate_flush(&mut self.flush_state)?; - - if block.is_none() { - self.end_iter = true; - } - - match block { - Some(block) => { - self.nums += 1; - Ok(Some(block.add_meta(Some( - AggregateSerdeMeta::create_agg_payload( - p.bucket, - p.max_partition_count, - false, - ), - ))?)) - } - None => { - // always return at least one block - if self.nums == 0 { - self.nums += 1; - let block = p.payload.empty_block(Some(1)); - Ok(Some(block.add_meta(Some( - AggregateSerdeMeta::create_agg_payload( - p.bucket, - p.max_partition_count, - true, - ), - ))?)) - } else { - Ok(None) - } - } + let SerializePayload::AggregatePayload(p) = self.payload.as_ref().get_ref(); + match p.payload.aggregate_flush(&mut self.flush_state)? { + Some(block) => { + self.nums += 1; + Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count, false), + ))?)) + } + None => { + self.end_iter = true; + // always return at least one block + if self.nums == 0 { + self.nums += 1; + let block = p.payload.empty_block(1); + Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload( + p.bucket, + p.max_partition_count, + true, + ), + ))?)) + } else { + Ok(None) } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs index 09725ee3bb173..9f632ec0b187c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs @@ -79,94 +79,77 @@ impl TransformDeserializer { return Ok(DataBlock::new_with_meta(vec![], 0, meta)); } - let data_block = match &meta { - None => { - deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())? - } - Some(meta) => match AggregateSerdeMeta::downcast_ref_from(meta) { - None => { - deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())? + let Some(meta) = meta + .as_ref() + .and_then(AggregateSerdeMeta::downcast_ref_from) + else { + let data_block = + deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())?; + return match data_block.num_columns() == 0 { + true => Ok(DataBlock::new_with_meta(vec![], row_count as usize, meta)), + false => data_block.add_meta(meta), + }; + }; + + match meta.typ { + BUCKET_TYPE => { + let mut block = deserialize_block( + dict, + fragment_data, + &self.schema, + self.arrow_schema.clone(), + )?; + + if meta.is_empty { + block = block.slice(0..0); } - Some(meta) => { - return match meta.typ == BUCKET_TYPE { - true => { - let mut block = deserialize_block( - dict, - fragment_data, - &self.schema, - self.arrow_schema.clone(), - )?; - - if meta.is_empty { - block = block.slice(0..0); - } - - Ok(DataBlock::empty_with_meta( - AggregateMeta::create_serialized( - meta.bucket, - block, - meta.max_partition_count, - ), - )) - } - false => { - let data_schema = Arc::new(exchange_defines::spilled_schema()); - let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema()); - let data_block = deserialize_block( - dict, - fragment_data, - &data_schema, - arrow_schema.clone(), - )?; - - let columns = data_block - .columns() - .iter() - .map(|c| c.as_column().unwrap().clone()) - .collect::>(); - - let buckets = - NumberType::::try_downcast_column(&columns[0]).unwrap(); - let data_range_start = - NumberType::::try_downcast_column(&columns[1]).unwrap(); - let data_range_end = - NumberType::::try_downcast_column(&columns[2]).unwrap(); - let columns_layout = - ArrayType::::try_downcast_column(&columns[3]).unwrap(); - - let columns_layout_data = columns_layout.values().as_slice(); - let columns_layout_offsets = columns_layout.offsets(); - - let mut buckets_payload = Vec::with_capacity(data_block.num_rows()); - for index in 0..data_block.num_rows() { - unsafe { - buckets_payload.push(BucketSpilledPayload { - bucket: *buckets.get_unchecked(index) as isize, - location: meta.location.clone().unwrap(), - data_range: *data_range_start.get_unchecked(index) - ..*data_range_end.get_unchecked(index), - columns_layout: columns_layout_data[columns_layout_offsets - [index] - as usize - ..columns_layout_offsets[index + 1] as usize] - .to_vec(), - max_partition_count: meta.max_partition_count, - }); - } - } - - Ok(DataBlock::empty_with_meta(AggregateMeta::create_spilled( - buckets_payload, - ))) - } - }; + + Ok(DataBlock::empty_with_meta( + AggregateMeta::create_serialized(meta.bucket, block, meta.max_partition_count), + )) + } + _ => { + let data_schema = Arc::new(exchange_defines::spilled_schema()); + let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema()); + let data_block = + deserialize_block(dict, fragment_data, &data_schema, arrow_schema.clone())?; + + let columns = data_block + .columns() + .iter() + .map(|c| c.as_column().unwrap().clone()) + .collect::>(); + + let buckets = NumberType::::try_downcast_column(&columns[0]).unwrap(); + let data_range_start = NumberType::::try_downcast_column(&columns[1]).unwrap(); + let data_range_end = NumberType::::try_downcast_column(&columns[2]).unwrap(); + let columns_layout = + ArrayType::::try_downcast_column(&columns[3]).unwrap(); + + let columns_layout_data = columns_layout.values().as_slice(); + let columns_layout_offsets = columns_layout.offsets(); + + let mut buckets_payload = Vec::with_capacity(data_block.num_rows()); + for index in 0..data_block.num_rows() { + unsafe { + buckets_payload.push(BucketSpilledPayload { + bucket: *buckets.get_unchecked(index) as isize, + location: meta.location.clone().unwrap(), + data_range: *data_range_start.get_unchecked(index) + ..*data_range_end.get_unchecked(index), + columns_layout: columns_layout_data[columns_layout_offsets[index] + as usize + ..columns_layout_offsets[index + 1] as usize] + .to_vec(), + max_partition_count: meta.max_partition_count, + }); + } } - }, - }; - match data_block.num_columns() == 0 { - true => Ok(DataBlock::new_with_meta(vec![], row_count as usize, meta)), - false => data_block.add_meta(meta), + Ok(DataBlock::empty_with_meta(AggregateMeta::create_spilled( + buckets_payload, + ))) + } } } } @@ -177,15 +160,9 @@ impl BlockMetaTransform for TransformDeserializer { fn transform(&mut self, mut meta: ExchangeDeserializeMeta) -> Result> { match meta.packet.pop().unwrap() { - DataPacket::ErrorCode(v) => Err(v), - DataPacket::Dictionary(_) => unreachable!(), - DataPacket::QueryProfiles(_) => unreachable!(), - DataPacket::SerializeProgress { .. } => unreachable!(), - DataPacket::CopyStatus { .. } => unreachable!(), - DataPacket::MutationStatus { .. } => unreachable!(), - DataPacket::DataCacheMetrics(_) => unreachable!(), DataPacket::FragmentData(v) => Ok(vec![self.recv_data(meta.packet, v)?]), - DataPacket::QueryPerf(_) => unreachable!(), + DataPacket::ErrorCode(err) => Err(err), + _ => unreachable!(), } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index ffd80674bdab4..118f8119fed5e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -125,12 +125,7 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria continue; } - match AggregateMeta::downcast_from(block.take_meta().unwrap()) { - None => unreachable!(), - Some(AggregateMeta::Spilled(_)) => unreachable!(), - Some(AggregateMeta::Serialized(_)) => unreachable!(), - Some(AggregateMeta::BucketSpilled(_)) => unreachable!(), - Some(AggregateMeta::Partitioned { .. }) => unreachable!(), + match block.take_meta().and_then(AggregateMeta::downcast_from) { Some(AggregateMeta::AggregateSpilling(payload)) => { serialized_blocks.push(FlightSerialized::Future( match index == self.local_pos { @@ -172,6 +167,8 @@ impl BlockMetaTransform for TransformExchangeAggregateSeria let c = serialize_block(block_number, c, &self.options)?; serialized_blocks.push(FlightSerialized::DataBlock(c)); } + + _ => unreachable!(), }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_async_barrier.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_async_barrier.rs index 1628bc9af5beb..eaebda0543e6d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_async_barrier.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_async_barrier.rs @@ -45,29 +45,31 @@ impl AsyncTransform for TransformExchangeAsyncBarrier { const NAME: &'static str = "TransformExchangeAsyncBarrier"; async fn transform(&mut self, mut data: DataBlock) -> Result { - if let Some(meta) = data + let Some(meta) = data .take_meta() .and_then(FlightSerializedMeta::downcast_from) - { - let mut futures = Vec::with_capacity(meta.serialized_blocks.len()); + else { + return Err(ErrorCode::Internal("")); + }; - for serialized_block in meta.serialized_blocks { - futures.push(databend_common_base::runtime::spawn(async move { - match serialized_block { - FlightSerialized::DataBlock(v) => Ok(v), - FlightSerialized::Future(f) => f.await, - } - })); - } - - return match futures::future::try_join_all(futures).await { - Err(_) => Err(ErrorCode::TokioError("Cannot join tokio job")), - Ok(spilled_data) => Ok(DataBlock::empty_with_meta(ExchangeShuffleMeta::create( - spilled_data.into_iter().collect::>>()?, - ))), - }; + let mut futures = Vec::with_capacity(meta.serialized_blocks.len()); + for serialized_block in meta.serialized_blocks { + futures.push(databend_common_base::runtime::spawn(async move { + match serialized_block { + FlightSerialized::DataBlock(v) => Ok(v), + FlightSerialized::Future(f) => f.await, + } + })); } - Err(ErrorCode::Internal("")) + match futures::future::try_join_all(futures).await { + Err(_) => Err(ErrorCode::TokioError("Cannot join tokio job")), + Ok(spilled_data) => { + let blocks = spilled_data.into_iter().collect::>()?; + Ok(DataBlock::empty_with_meta(ExchangeShuffleMeta::create( + blocks, + ))) + } + } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs index a43003ba291b4..549a2ebc9765b 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs @@ -24,7 +24,6 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::AggrState; use databend_common_expression::BlockMetaInfoDowncast; -use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; use databend_common_expression::DataBlock; use databend_common_expression::ProjectedBlock; @@ -156,14 +155,11 @@ impl AccumulatingTransform for PartialSingleStateAggregator { ) .zip(builders.iter_mut()) { - func.serialize(place, &mut builder.data)?; - builder.commit_row(); + let builders = builder.as_tuple_mut().unwrap().as_mut_slice(); + func.serialize(place, builders)?; } - let columns = builders - .into_iter() - .map(|b| Column::Binary(b.build())) - .collect(); + let columns = builders.into_iter().map(|b| b.build()).collect(); vec![DataBlock::new_from_columns(columns)] } else { vec![] diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs index d92ace4e212fd..fdfc1dcbe6d39 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs @@ -98,14 +98,14 @@ impl AggregateFunction for AggregateUdfScript { Ok(()) } - fn serialize(&self, place: AggrState, writer: &mut Vec) -> Result<()> { + fn serialize_binary(&self, place: AggrState, writer: &mut Vec) -> Result<()> { let state = place.get::(); state .serialize(writer) .map_err(|e| ErrorCode::Internal(format!("state failed to serialize: {e}"))) } - fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { + fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> { let state = place.get::(); let rhs = UdfAggState::deserialize(reader).map_err(|e| ErrorCode::Internal(e.to_string()))?; diff --git a/src/query/service/src/servers/flight/v1/packets/packet_data.rs b/src/query/service/src/servers/flight/v1/packets/packet_data.rs index 2b5407ffb2946..81226501438f8 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_data.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_data.rs @@ -54,6 +54,7 @@ impl Debug for FragmentData { } } +#[derive(Debug)] pub enum DataPacket { ErrorCode(ErrorCode), Dictionary(FlightData), diff --git a/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs b/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs index a8a73071aaa57..5026f7ba38f9a 100644 --- a/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs +++ b/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs @@ -14,11 +14,11 @@ use databend_common_exception::Result; use databend_common_expression::types::DataType; -#[allow(unused_imports)] -use databend_common_expression::DataBlock; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; +use databend_common_expression::StateSerdeItem; +use databend_common_functions::aggregates::AggregateFunctionFactory; use super::SortDesc; use crate::executor::explain::PlanStatsInfo; @@ -47,11 +47,39 @@ impl AggregatePartial { let input_schema = self.input.output_schema()?; let mut fields = Vec::with_capacity(self.agg_funcs.len() + self.group_by.len()); + let factory = AggregateFunctionFactory::instance(); - fields.extend(self.agg_funcs.iter().map(|func| { - let name = func.output_column.to_string(); - DataField::new(&name, DataType::Binary) - })); + for desc in &self.agg_funcs { + let name = desc.output_column.to_string(); + + if desc.sig.udaf.is_some() { + fields.push(DataField::new( + &name, + DataType::Tuple(vec![DataType::Binary]), + )); + continue; + } + + let func = factory + .get( + &desc.sig.name, + desc.sig.params.clone(), + desc.sig.args.clone(), + desc.sig.sort_descs.clone(), + ) + .unwrap(); + + let tuple = func + .serialize_type() + .iter() + .map(|serde_type| match serde_type { + StateSerdeItem::Bool => DataType::Boolean, + StateSerdeItem::Binary(_) => DataType::Binary, + }) + .collect(); + + fields.push(DataField::new(&name, DataType::Tuple(tuple))) + } for (idx, field) in self.group_by.iter().zip( self.group_by