Skip to content

Commit 0ae9d0b

Browse files
committed
chore(query): add basic v2 agg
1 parent 8aee628 commit 0ae9d0b

23 files changed

+414
-414
lines changed

src/query/expression/src/types.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ impl DataType {
7979
_ => Self::Nullable(Box::new(self.clone())),
8080
}
8181
}
82+
pub fn is_nullable_or_null(&self) -> bool {
83+
matches!(self, &DataType::Nullable(_) | &DataType::Null)
84+
}
85+
86+
pub fn can_inside_nullable(&self) -> bool {
87+
!self.is_nullable_or_null()
88+
}
8289
}
8390

8491
pub trait ValueType: Debug + Clone + PartialEq + Sized + 'static {

src/query/expression/src/util.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,19 @@ use common_arrow::arrow::io::ipc::read::read_file_metadata;
2525
use common_arrow::arrow::io::ipc::read::FileReader;
2626
use common_arrow::arrow::io::ipc::write::FileWriter;
2727
use common_arrow::arrow::io::ipc::write::WriteOptions;
28+
use crate::Column;
29+
30+
pub fn column_merge_validity(column: &Column, bitmap: Option<Bitmap>) -> Option<Bitmap> {
31+
match column {
32+
Column::Nullable(c) => {
33+
match bitmap {
34+
None => Some(c.validity.clone()),
35+
Some(v) => Some(&c.validity & (&v))
36+
}
37+
}
38+
_ => bitmap
39+
}
40+
}
2841

2942
pub fn bitmap_into_mut(bitmap: Bitmap) -> MutableBitmap {
3043
bitmap

src/query/expression/src/values.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,13 @@ impl Column {
867867
}
868868
}
869869

870+
pub fn remove_nullable(&self) -> Self {
871+
match self {
872+
Column::Nullable(inner) => inner.column.clone(),
873+
_ => self.clone(),
874+
}
875+
}
876+
870877
pub fn memory_size(&self) -> usize {
871878
match self {
872879
Column::Null { .. } => std::mem::size_of::<usize>(),

src/query/functions-v2/src/aggregates/adaptors/aggregate_base_adaptor.rs

Lines changed: 0 additions & 142 deletions
This file was deleted.

src/query/functions-v2/src/aggregates/adaptors/aggregate_null_adaptor.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use common_exception::Result;
1516
use common_expression::types::DataType;
16-
use common_expression::{Result, Scalar};
17+
use common_expression::types::NumberDataType;
18+
use common_expression::Scalar;
1719

1820
use super::aggregate_null_variadic_adaptor::AggregateNullVariadicAdaptor;
1921
use super::AggregateNullUnaryAdaptor;
@@ -29,11 +31,13 @@ impl AggregateFunctionCombinatorNull {
2931
let mut results = Vec::with_capacity(arguments.len());
3032

3133
for arg in arguments.iter() {
32-
if arg.is_nullable() {
33-
let ty = remove_nullable(arg.data_type());
34-
results.push(DataType::new(arg.name(), ty));
35-
} else {
36-
results.push(arg.clone());
34+
match arg {
35+
DataType::Nullable(box ty) => {
36+
results.push(ty.clone());
37+
}
38+
_ => {
39+
results.push(arg.clone());
40+
}
3741
}
3842
}
3943
Ok(results)
@@ -51,15 +55,13 @@ impl AggregateFunctionCombinatorNull {
5155
properties: AggregateFunctionFeatures,
5256
) -> Result<AggregateFunctionRef> {
5357
// has_null_types
54-
if !arguments.is_empty()
55-
&& arguments
56-
.iter()
57-
.any(|f| f.data_type().data_type_id() == TypeID::Null)
58-
{
58+
if !arguments.is_empty() && arguments.iter().any(|f| f == &DataType::Null) {
5959
if properties.returns_default_when_only_null {
60-
return AggregateNullResultFunction::try_create(u64::to_data_type());
60+
return AggregateNullResultFunction::try_create(DataType::Number(
61+
NumberDataType::UInt64,
62+
));
6163
} else {
62-
return AggregateNullResultFunction::try_create(NullType::new_impl());
64+
return AggregateNullResultFunction::try_create(DataType::Null);
6365
}
6466
}
6567
let params = Self::transform_params(&params)?;

src/query/functions-v2/src/aggregates/adaptors/aggregate_null_unary_adaptor.rs

Lines changed: 51 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ use std::sync::Arc;
1818

1919
use bytes::BytesMut;
2020
use common_arrow::arrow::bitmap::Bitmap;
21-
use common_expression::ColumnBuilder;
22-
use common_expression::Result;
21+
use common_exception::Result;
22+
use common_expression::types::DataType;
23+
use common_expression::util::column_merge_validity;
2324
use common_expression::Column;
25+
use common_expression::ColumnBuilder;
2426
use common_io::prelude::BinaryWriteBuf;
2527

2628
use crate::aggregates::AggregateFunction;
@@ -79,10 +81,11 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
7981
"AggregateNullUnaryAdaptor"
8082
}
8183

82-
fn return_type(&self) -> Result<DataTypeImpl> {
84+
fn return_type(&self) -> Result<DataType> {
85+
let nested = self.nested.return_type()?;
8386
match NULLABLE_RESULT {
84-
true => Ok(wrap_nullable(&self.nested.return_type()?)),
85-
false => Ok(self.nested.return_type()?),
87+
true => Ok(nested.wrap_nullable()),
88+
false => Ok(nested),
8689
}
8790
}
8891

@@ -107,24 +110,22 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
107110
validity: Option<&Bitmap>,
108111
input_rows: usize,
109112
) -> Result<()> {
110-
let mut validity = validity.cloned();
111113
let col = &columns[0];
112-
let (all_null, v) = col.validity();
113-
validity = combine_validities(validity.as_ref(), v);
114-
let not_null_columns = Series::remove_nullable(col);
115-
116-
self.nested
117-
.accumulate(place, &[not_null_columns], validity.as_ref(), input_rows)?;
118-
119-
if !all_null {
120-
match validity {
121-
Some(v) => {
122-
if v.unset_bits() != input_rows {
123-
self.set_flag(place, 1);
124-
}
125-
}
126-
None => self.set_flag(place, 1),
114+
let validity = column_merge_validity(col, validity.cloned());
115+
let not_null_column = col.remove_nullable();
116+
117+
self.nested.accumulate(
118+
place,
119+
&[not_null_column.clone()],
120+
validity.as_ref(),
121+
input_rows,
122+
)?;
123+
124+
match validity {
125+
Some(v) if v.unset_bits() != input_rows => {
126+
self.set_flag(place, 1);
127127
}
128+
_ => self.set_flag(place, 1),
128129
}
129130
Ok(())
130131
}
@@ -138,33 +139,29 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
138139
input_rows: usize,
139140
) -> Result<()> {
140141
let col = &columns[0];
141-
let (all_null, validity) = col.validity();
142-
let not_null_column = Series::remove_nullable(col);
143-
let not_null_columns = &[not_null_column];
144-
145-
if !all_null {
146-
match validity {
147-
Some(v) if v.unset_bits() > 0 => {
148-
for (valid, (row, place)) in v.iter().zip(places.iter().enumerate()) {
149-
if valid {
150-
self.set_flag(place.next(offset), 1);
151-
self.nested.accumulate_row(
152-
place.next(offset),
153-
not_null_columns,
154-
row,
155-
)?;
156-
}
142+
let validity = column_merge_validity(col, None);
143+
let not_null_columns = vec![col.remove_nullable()];
144+
let not_null_columns = &not_null_columns;
145+
146+
match validity {
147+
Some(v) if v.unset_bits() > 0 => {
148+
for (valid, (row, place)) in v.iter().zip(places.iter().enumerate()) {
149+
if valid {
150+
self.set_flag(place.next(offset), 1);
151+
self.nested
152+
.accumulate_row(place.next(offset), not_null_columns, row)?;
157153
}
158154
}
159-
_ => {
160-
self.nested
161-
.accumulate_keys(places, offset, not_null_columns, input_rows)?;
162-
places
163-
.iter()
164-
.for_each(|place| self.set_flag(place.next(offset), 1));
165-
}
155+
}
156+
_ => {
157+
self.nested
158+
.accumulate_keys(places, offset, not_null_columns, input_rows)?;
159+
places
160+
.iter()
161+
.for_each(|place| self.set_flag(place.next(offset), 1));
166162
}
167163
}
164+
168165
Ok(())
169166
}
170167

@@ -208,21 +205,22 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
208205
self.nested.merge(place, rhs)
209206
}
210207

211-
fn merge_result(&self, place: StateAddr, column: &mut ColumnBuilder) -> Result<()> {
208+
fn merge_result(&self, place: StateAddr, builder: &mut ColumnBuilder) -> Result<()> {
212209
if NULLABLE_RESULT {
213-
let builder: &mut MutableNullableColumn = Series::check_get_mutable_column(column)?;
214210
if self.get_flag(place) == 1 {
215-
let inner = builder.inner_mut();
216-
self.nested.merge_result(place, inner.as_mut())?;
217-
let validity = builder.validity_mut();
218-
219-
validity.push(true);
211+
match builder {
212+
ColumnBuilder::Nullable(ref mut inner) => {
213+
self.nested.merge_result(place, &mut inner.builder)?;
214+
inner.validity.push(true);
215+
}
216+
_ => unreachable!(),
217+
}
220218
} else {
221-
builder.append_default();
219+
builder.push_default();
222220
}
223221
Ok(())
224222
} else {
225-
self.nested.merge_result(place, column)
223+
self.nested.merge_result(place, builder)
226224
}
227225
}
228226

0 commit comments

Comments
 (0)