Skip to content

Commit 8aee628

Browse files
committed
chore(query): migrate v2 agg
1 parent c3abc4a commit 8aee628

31 files changed

+5727
-0
lines changed

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/expression/src/types.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ pub enum DataType {
7272
Generic(usize),
7373
}
7474

75+
impl DataType {
76+
pub fn wrap_nullable(&self) -> Self {
77+
match self {
78+
DataType::Nullable(_) => self.clone(),
79+
_ => Self::Nullable(Box::new(self.clone())),
80+
}
81+
}
82+
}
83+
7584
pub trait ValueType: Debug + Clone + PartialEq + Sized + 'static {
7685
type Scalar: Debug + Clone + PartialEq;
7786
type ScalarRef<'a>: Debug + Clone + PartialEq;

src/query/functions-v2/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,16 @@ doctest = false
1313
# Workspace dependencies
1414
common-arrow = { path = "../../common/arrow" }
1515
common-base = { path = "../../common/base" }
16+
common-exception = { path = "../../common/exception" }
1617
common-expression = { path = "../expression" }
18+
common-hashtable = { path = "../../common/hashtable" }
19+
common-io = { path = "../../common/io" }
1720

1821
# Crates.io dependencies
1922
base64 = "0.13.0"
2023
bstr = "0.2.17"
24+
bumpalo = "3.10.0"
25+
bytes = "1.1.0"
2126
crc32fast = "1.3.2"
2227
hex = "0.4.3"
2328
itertools = "0.10.3"
@@ -33,6 +38,7 @@ rand = { version = "0.8.5", features = ["small_rng"] }
3338
strength_reduce = "0.2.3"
3439
# TODO: Switch to jsonb. bson is used for placeholder.
3540
bson = "2.4"
41+
once_cell = "1.12.0"
3642
serde_json = "1.0"
3743

3844
[dev-dependencies]
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright 2021 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt;
16+
use std::sync::Arc;
17+
18+
use common_arrow::arrow::bitmap::Bitmap;
19+
use common_exception::Result;
20+
use common_expression::Column;
21+
use common_expression::ColumnBuilder;
22+
use common_expression::Scalar;
23+
use common_expression::types::DataType;
24+
25+
use crate::aggregates::AggregateFunction;
26+
use crate::aggregates::AggregateFunctionRef;
27+
use crate::aggregates::StateAddr;
28+
29+
/// BasicAdaptor will convert all args into full column and apply the inner aggregation
30+
pub struct AggregateFunctionBasicAdaptor {
31+
inner: AggregateFunctionRef,
32+
}
33+
34+
impl AggregateFunctionBasicAdaptor {
35+
pub fn create(inner: AggregateFunctionRef) -> Result<AggregateFunctionRef> {
36+
Ok(Arc::new(AggregateFunctionBasicAdaptor { inner }))
37+
}
38+
}
39+
40+
impl AggregateFunction for AggregateFunctionBasicAdaptor {
41+
fn name(&self) -> &str {
42+
self.inner.name()
43+
}
44+
45+
fn return_type(&self) -> Result<DataType> {
46+
self.inner.return_type()
47+
}
48+
49+
#[inline]
50+
fn init_state(&self, place: StateAddr) {
51+
self.inner.init_state(place)
52+
}
53+
54+
#[inline]
55+
fn state_layout(&self) -> std::alloc::Layout {
56+
self.inner.state_layout()
57+
}
58+
59+
#[inline]
60+
fn accumulate(
61+
&self,
62+
place: StateAddr,
63+
columns: &[Column],
64+
validity: Option<&Bitmap>,
65+
input_rows: usize,
66+
) -> Result<()> {
67+
if self.inner.convert_const_to_full() && columns.iter().any(|c| c.is_const()) {
68+
let columns: Vec<Column> = columns.iter().map(|c| c.convert_full_column()).collect();
69+
self.inner.accumulate(place, &columns, validity, input_rows)
70+
} else {
71+
self.inner.accumulate(place, columns, validity, input_rows)
72+
}
73+
}
74+
75+
fn accumulate_keys(
76+
&self,
77+
places: &[StateAddr],
78+
offset: usize,
79+
columns: &[Column],
80+
input_rows: usize,
81+
) -> Result<()> {
82+
// if self.inner.convert_const_to_full() && columns.iter().any(|c| c.is_const()) {
83+
// let columns: Vec<Column> = columns.iter().map(|c| c.convert_full_column()).collect();
84+
// self.inner
85+
// .accumulate_keys(places, offset, &columns, input_rows)?;
86+
// } else {
87+
// self.inner
88+
// .accumulate_keys(places, offset, columns, input_rows)?;
89+
// }
90+
Ok(())
91+
}
92+
93+
#[inline]
94+
fn accumulate_row(&self, place: StateAddr, columns: &[Column], row: usize) -> Result<()> {
95+
self.inner.accumulate_row(place, columns, row)
96+
}
97+
98+
#[inline]
99+
fn serialize(&self, place: StateAddr, writer: &mut bytes::BytesMut) -> Result<()> {
100+
self.inner.serialize(place, writer)
101+
}
102+
103+
#[inline]
104+
fn deserialize(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> {
105+
self.inner.deserialize(place, reader)
106+
}
107+
108+
fn merge(&self, place: StateAddr, rhs: StateAddr) -> Result<()> {
109+
self.inner.merge(place, rhs)
110+
}
111+
112+
fn merge_result(&self, place: StateAddr, array: &mut ColumnBuilder) -> Result<()> {
113+
self.inner.merge_result(place, array)
114+
}
115+
116+
fn get_own_null_adaptor(
117+
&self,
118+
nested_function: AggregateFunctionRef,
119+
params: Vec<Scalar>,
120+
arguments: Vec<DataType>,
121+
) -> Result<Option<AggregateFunctionRef>> {
122+
self.inner
123+
.get_own_null_adaptor(nested_function, params, arguments)
124+
}
125+
126+
fn need_manual_drop_state(&self) -> bool {
127+
self.inner.need_manual_drop_state()
128+
}
129+
130+
unsafe fn drop_state(&self, place: StateAddr) {
131+
self.inner.drop_state(place)
132+
}
133+
134+
fn convert_const_to_full(&self) -> bool {
135+
self.inner.convert_const_to_full()
136+
}
137+
}
138+
impl fmt::Display for AggregateFunctionBasicAdaptor {
139+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
140+
write!(f, "{}", self.inner)
141+
}
142+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2021 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_expression::types::DataType;
16+
use common_expression::{Result, Scalar};
17+
18+
use super::aggregate_null_variadic_adaptor::AggregateNullVariadicAdaptor;
19+
use super::AggregateNullUnaryAdaptor;
20+
use crate::aggregates::aggregate_function_factory::AggregateFunctionFeatures;
21+
use crate::aggregates::aggregate_null_result::AggregateNullResultFunction;
22+
use crate::aggregates::AggregateFunctionRef;
23+
24+
#[derive(Clone)]
25+
pub struct AggregateFunctionCombinatorNull {}
26+
27+
impl AggregateFunctionCombinatorNull {
28+
pub fn transform_arguments(arguments: &[DataType]) -> Result<Vec<DataType>> {
29+
let mut results = Vec::with_capacity(arguments.len());
30+
31+
for arg in arguments.iter() {
32+
if arg.is_nullable() {
33+
let ty = remove_nullable(arg.data_type());
34+
results.push(DataType::new(arg.name(), ty));
35+
} else {
36+
results.push(arg.clone());
37+
}
38+
}
39+
Ok(results)
40+
}
41+
42+
pub fn transform_params(params: &[Scalar]) -> Result<Vec<Scalar>> {
43+
Ok(params.to_owned())
44+
}
45+
46+
pub fn try_create(
47+
_name: &str,
48+
params: Vec<Scalar>,
49+
arguments: Vec<DataType>,
50+
nested: AggregateFunctionRef,
51+
properties: AggregateFunctionFeatures,
52+
) -> Result<AggregateFunctionRef> {
53+
// has_null_types
54+
if !arguments.is_empty()
55+
&& arguments
56+
.iter()
57+
.any(|f| f.data_type().data_type_id() == TypeID::Null)
58+
{
59+
if properties.returns_default_when_only_null {
60+
return AggregateNullResultFunction::try_create(u64::to_data_type());
61+
} else {
62+
return AggregateNullResultFunction::try_create(NullType::new_impl());
63+
}
64+
}
65+
let params = Self::transform_params(&params)?;
66+
let arguments = Self::transform_arguments(&arguments)?;
67+
let size = arguments.len();
68+
69+
// Some functions may have their own null adaptor
70+
if let Some(null_adaptor) =
71+
nested.get_own_null_adaptor(nested.clone(), params, arguments)?
72+
{
73+
return Ok(null_adaptor);
74+
}
75+
76+
let return_type = nested.return_type()?;
77+
let result_is_null =
78+
!properties.returns_default_when_only_null && return_type.can_inside_nullable();
79+
80+
match size {
81+
1 => match result_is_null {
82+
true => Ok(AggregateNullUnaryAdaptor::<true>::create(nested)),
83+
false => Ok(AggregateNullUnaryAdaptor::<false>::create(nested)),
84+
},
85+
86+
_ => match result_is_null {
87+
true => Ok(AggregateNullVariadicAdaptor::<true, true>::create(nested)),
88+
false => Ok(AggregateNullVariadicAdaptor::<false, true>::create(nested)),
89+
},
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)