Skip to content

Commit 09770c7

Browse files
committed
x
1 parent 028584e commit 09770c7

File tree

1 file changed

+26
-6
lines changed

1 file changed

+26
-6
lines changed

src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414

1515
use databend_common_exception::Result;
1616
use databend_common_expression::types::DataType;
17-
#[allow(unused_imports)]
18-
use databend_common_expression::DataBlock;
17+
use databend_common_expression::AggrStateSerdeType;
1918
use databend_common_expression::DataField;
2019
use databend_common_expression::DataSchemaRef;
2120
use databend_common_expression::DataSchemaRefExt;
21+
use databend_common_functions::aggregates::AggregateFunctionFactory;
2222

2323
use super::SortDesc;
2424
use crate::executor::explain::PlanStatsInfo;
@@ -47,11 +47,31 @@ impl AggregatePartial {
4747
let input_schema = self.input.output_schema()?;
4848

4949
let mut fields = Vec::with_capacity(self.agg_funcs.len() + self.group_by.len());
50+
let factory = AggregateFunctionFactory::instance();
5051

51-
fields.extend(self.agg_funcs.iter().map(|func| {
52-
let name = func.output_column.to_string();
53-
DataField::new(&name, DataType::Binary)
54-
}));
52+
for desc in &self.agg_funcs {
53+
let name = desc.output_column.to_string();
54+
55+
let func = factory
56+
.get(
57+
&desc.sig.name,
58+
desc.sig.params.clone(),
59+
desc.sig.args.clone(),
60+
desc.sig.sort_descs.clone(),
61+
)
62+
.unwrap();
63+
64+
let tuple = func
65+
.serialize_type()
66+
.iter()
67+
.map(|serde_type| match serde_type {
68+
AggrStateSerdeType::Bool => DataType::Boolean,
69+
AggrStateSerdeType::Binary(_) => DataType::Binary,
70+
})
71+
.collect();
72+
73+
fields.push(DataField::new(&name, DataType::Tuple(tuple)))
74+
}
5575

5676
for (idx, field) in self.group_by.iter().zip(
5777
self.group_by

0 commit comments

Comments
 (0)