Skip to content

Commit dfec718

Browse files
committed
serialize_type
1 parent de26741 commit dfec718

27 files changed

+135
-92
lines changed

src/query/expression/src/aggregate/aggregate_function.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
6969
// Used in aggregate_null_adaptor
7070
fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()>;
7171

72-
fn serialize_type(&self) -> Vec<StateSerdeItem> {
73-
vec![StateSerdeItem::Binary(self.serialize_size_per_row())]
74-
}
72+
fn serialize_type(&self) -> Vec<StateSerdeItem>;
7573

7674
fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> {
7775
let binary_builder = builders[0].as_binary_mut().unwrap();
@@ -82,10 +80,6 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
8280

8381
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()>;
8482

85-
fn serialize_size_per_row(&self) -> Option<usize> {
86-
None
87-
}
88-
8983
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
9084
let mut binary = *data[0].as_binary().unwrap();
9185
self.merge_binary(place, &mut binary)

src/query/expression/src/aggregate/aggregate_function_state.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,9 @@ impl AggrStateLoc {
193193
}
194194
}
195195

196-
#[derive(Debug, Clone, Copy)]
196+
#[derive(Debug, Clone)]
197197
pub enum StateSerdeItem {
198-
Bool,
198+
DataType(DataType),
199199
Binary(Option<usize>),
200200
}
201201

@@ -208,7 +208,7 @@ impl StateSerdeType {
208208
self.0
209209
.iter()
210210
.map(|item| match item {
211-
StateSerdeItem::Bool => DataType::Boolean,
211+
StateSerdeItem::DataType(data_type) => data_type.clone(),
212212
StateSerdeItem::Binary(_) => DataType::Binary,
213213
})
214214
.collect(),
@@ -232,8 +232,8 @@ impl StatesLayout {
232232
.0
233233
.iter()
234234
.map(|item| match item {
235-
StateSerdeItem::Bool => {
236-
ColumnBuilder::with_capacity(&DataType::Boolean, num_rows)
235+
StateSerdeItem::DataType(data_type) => {
236+
ColumnBuilder::with_capacity(data_type, num_rows)
237237
}
238238
StateSerdeItem::Binary(size) => {
239239
ColumnBuilder::Binary(BinaryColumnBuilder::with_capacity(

src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,6 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
131131
self.0.init_state(place);
132132
}
133133

134-
fn serialize_size_per_row(&self) -> Option<usize> {
135-
self.0.serialize_size_per_row()
136-
}
137-
138134
fn register_state(&self, registry: &mut AggrStateRegistry) {
139135
self.0.register_state(registry);
140136
}
@@ -188,8 +184,8 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
188184
self.0.serialize_type()
189185
}
190186

191-
fn serialize(&self, place: AggrState, builder: &mut [ColumnBuilder]) -> Result<()> {
192-
self.0.serialize(place, builder)
187+
fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> {
188+
self.0.serialize(place, builders)
193189
}
194190

195191
fn serialize_binary(&self, _: AggrState, _: &mut Vec<u8>) -> Result<()> {
@@ -257,10 +253,6 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction
257253
self.0.init_state(place);
258254
}
259255

260-
fn serialize_size_per_row(&self) -> Option<usize> {
261-
self.0.serialize_size_per_row()
262-
}
263-
264256
fn register_state(&self, registry: &mut AggrStateRegistry) {
265257
self.0.register_state(registry);
266258
}
@@ -317,6 +309,10 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction
317309
.accumulate_row(place, not_null_columns, validity, row)
318310
}
319311

312+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
313+
self.0.serialize_type()
314+
}
315+
320316
fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> {
321317
self.0.serialize(place, builders)
322318
}
@@ -384,12 +380,6 @@ impl<const NULLABLE_RESULT: bool> CommonNullAdaptor<NULLABLE_RESULT> {
384380
self.nested.init_state(place.remove_last_loc());
385381
}
386382

387-
fn serialize_size_per_row(&self) -> Option<usize> {
388-
self.nested
389-
.serialize_size_per_row()
390-
.map(|row| if NULLABLE_RESULT { row + 1 } else { row })
391-
}
392-
393383
fn register_state(&self, registry: &mut AggrStateRegistry) {
394384
self.nested.register_state(registry);
395385
if NULLABLE_RESULT {
@@ -516,7 +506,7 @@ impl<const NULLABLE_RESULT: bool> CommonNullAdaptor<NULLABLE_RESULT> {
516506
self.nested
517507
.serialize_type()
518508
.into_iter()
519-
.chain(Some(StateSerdeItem::Bool))
509+
.chain(Some(StateSerdeItem::DataType(DataType::Boolean)))
520510
.collect()
521511
}
522512

src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs

Lines changed: 1 addition & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,6 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
9090
self.inner.init_state(place.remove_last_loc())
9191
}
9292

93-
fn serialize_size_per_row(&self) -> Option<usize> {
94-
self.inner.serialize_size_per_row().map(|row| row + 1)
95-
}
96-
9793
fn register_state(&self, registry: &mut AggrStateRegistry) {
9894
self.inner.register_state(registry);
9995
registry.register(AggrStateType::Bool);
@@ -182,7 +178,7 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
182178
self.inner
183179
.serialize_type()
184180
.into_iter()
185-
.chain(Some(StateSerdeItem::Bool))
181+
.chain(Some(StateSerdeItem::DataType(DataType::Boolean)))
186182
.collect()
187183
}
188184

@@ -260,63 +256,6 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
260256
unsafe fn drop_state(&self, place: AggrState) {
261257
self.inner.drop_state(place.remove_last_loc())
262258
}
263-
264-
fn batch_merge(
265-
&self,
266-
places: &[StateAddr],
267-
loc: &[AggrStateLoc],
268-
state: &databend_common_expression::BlockEntry,
269-
) -> Result<()> {
270-
let column = state.to_column();
271-
for (place, data) in places.iter().zip(column.iter()) {
272-
self.merge(
273-
AggrState::new(*place, loc),
274-
data.as_tuple().unwrap().as_slice(),
275-
)?;
276-
}
277-
278-
Ok(())
279-
}
280-
281-
fn batch_merge_single(
282-
&self,
283-
place: AggrState,
284-
state: &databend_common_expression::BlockEntry,
285-
) -> Result<()> {
286-
let column = state.to_column();
287-
for data in column.iter() {
288-
self.merge(place, data.as_tuple().unwrap().as_slice())?;
289-
}
290-
Ok(())
291-
}
292-
293-
fn batch_merge_states(
294-
&self,
295-
places: &[StateAddr],
296-
rhses: &[StateAddr],
297-
loc: &[AggrStateLoc],
298-
) -> Result<()> {
299-
for (place, rhs) in places.iter().zip(rhses.iter()) {
300-
self.merge_states(AggrState::new(*place, loc), AggrState::new(*rhs, loc))?;
301-
}
302-
Ok(())
303-
}
304-
305-
fn batch_merge_result(
306-
&self,
307-
places: &[StateAddr],
308-
loc: Box<[AggrStateLoc]>,
309-
builder: &mut ColumnBuilder,
310-
) -> Result<()> {
311-
for place in places {
312-
self.merge_result(AggrState::new(*place, &loc), builder)?;
313-
}
314-
Ok(())
315-
}
316-
317-
fn get_if_condition(&self, _columns: ProjectedBlock) -> Option<Bitmap> {
318-
None
319-
}
320259
}
321260

322261
impl fmt::Display for AggregateFunctionOrNullAdaptor {

src/query/functions/src/aggregates/adaptors/aggregate_sort_adaptor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use databend_common_expression::DataBlock;
3434
use databend_common_expression::ProjectedBlock;
3535
use databend_common_expression::Scalar;
3636
use databend_common_expression::SortColumnDescription;
37+
use databend_common_expression::StateSerdeItem;
3738
use itertools::Itertools;
3839

3940
use crate::aggregates::AggregateFunctionSortDesc;
@@ -121,6 +122,10 @@ impl AggregateFunction for AggregateFunctionSortAdaptor {
121122
Ok(())
122123
}
123124

125+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
126+
vec![StateSerdeItem::Binary(None)]
127+
}
128+
124129
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
125130
let state = Self::get_state(place);
126131
Ok(state.serialize(writer)?)

src/query/functions/src/aggregates/aggregate_arg_min_max.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use databend_common_expression::ColumnBuilder;
3131
use databend_common_expression::ColumnView;
3232
use databend_common_expression::ProjectedBlock;
3333
use databend_common_expression::Scalar;
34+
use databend_common_expression::StateSerdeItem;
3435

3536
use super::aggregate_function_factory::AggregateFunctionDescription;
3637
use super::aggregate_function_factory::AggregateFunctionSortDesc;
@@ -270,6 +271,10 @@ where
270271
Ok(())
271272
}
272273

274+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
275+
vec![StateSerdeItem::Binary(None)]
276+
}
277+
273278
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
274279
let state = place.get::<State>();
275280
Ok(state.serialize(writer)?)

src/query/functions/src/aggregates/aggregate_array_agg.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use databend_common_expression::ColumnBuilder;
4848
use databend_common_expression::ProjectedBlock;
4949
use databend_common_expression::Scalar;
5050
use databend_common_expression::ScalarRef;
51+
use databend_common_expression::StateSerdeItem;
5152

5253
use super::aggregate_function_factory::AggregateFunctionDescription;
5354
use super::aggregate_scalar_state::ScalarStateFunc;
@@ -548,6 +549,10 @@ where
548549
Ok(())
549550
}
550551

552+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
553+
vec![StateSerdeItem::Binary(None)]
554+
}
555+
551556
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
552557
let state = place.get::<State>();
553558
Ok(state.serialize(writer)?)

src/query/functions/src/aggregates/aggregate_array_moving.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use databend_common_expression::ColumnBuilder;
4545
use databend_common_expression::ProjectedBlock;
4646
use databend_common_expression::Scalar;
4747
use databend_common_expression::ScalarRef;
48+
use databend_common_expression::StateSerdeItem;
4849
use num_traits::AsPrimitive;
4950

5051
use super::aggregate_function::AggregateFunction;
@@ -447,6 +448,10 @@ where State: SumState
447448
state.accumulate_row(&columns[0], row)
448449
}
449450

451+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
452+
vec![StateSerdeItem::Binary(None)]
453+
}
454+
450455
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
451456
let state = place.get::<State>();
452457
Ok(state.serialize(writer)?)
@@ -616,6 +621,10 @@ where State: SumState
616621
state.accumulate_row(&columns[0], row)
617622
}
618623

624+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
625+
vec![StateSerdeItem::Binary(None)]
626+
}
627+
619628
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
620629
let state = place.get::<State>();
621630
Ok(state.serialize(writer)?)

src/query/functions/src/aggregates/aggregate_bitmap.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use databend_common_expression::AggrStateType;
3636
use databend_common_expression::ColumnBuilder;
3737
use databend_common_expression::ProjectedBlock;
3838
use databend_common_expression::Scalar;
39+
use databend_common_expression::StateSerdeItem;
3940
use databend_common_io::prelude::BinaryWrite;
4041
use roaring::RoaringTreemap;
4142

@@ -287,6 +288,10 @@ where
287288
Ok(())
288289
}
289290

291+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
292+
vec![StateSerdeItem::Binary(None)]
293+
}
294+
290295
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
291296
let state = place.get::<BitmapAggState>();
292297
// flag indicate where bitmap is none
@@ -482,6 +487,10 @@ where
482487
Ok(())
483488
}
484489

490+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
491+
vec![StateSerdeItem::Binary(None)]
492+
}
493+
485494
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
486495
self.inner.serialize_binary(place, writer)
487496
}

src/query/functions/src/aggregates/aggregate_combinator_distinct.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_expression::AggrStateType;
2828
use databend_common_expression::ColumnBuilder;
2929
use databend_common_expression::ProjectedBlock;
3030
use databend_common_expression::Scalar;
31+
use databend_common_expression::StateSerdeItem;
3132

3233
use super::aggregate_distinct_state::AggregateDistinctNumberState;
3334
use super::aggregate_distinct_state::AggregateDistinctState;
@@ -106,6 +107,10 @@ where State: DistinctStateFunc
106107
state.add(columns, row)
107108
}
108109

110+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
111+
vec![StateSerdeItem::Binary(None)]
112+
}
113+
109114
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
110115
let state = Self::get_state(place);
111116
state.serialize(writer)

0 commit comments

Comments
 (0)