Skip to content

Commit 3340c38

Browse files
authored
fix(query): Pass ci Test when enable enable_experimental_aggregate_hashtable . (#14544)
* test-perf * fix * fix * fix * fix decimal * fix decimal * fix decimal * revert settings
1 parent 6c9bfa7 commit 3340c38

File tree

15 files changed

+244
-108
lines changed

15 files changed

+244
-108
lines changed

src/query/expression/src/aggregate/aggregate_hashtable.rs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,11 @@ impl AggregateHashTable {
8787
state: &mut ProbeState,
8888
group_columns: &[Column],
8989
params: &[Vec<Column>],
90+
agg_states: &[Column],
9091
row_count: usize,
9192
) -> Result<usize> {
9293
if row_count <= BATCH_ADD_SIZE {
93-
self.add_groups_inner(state, group_columns, params, row_count)
94+
self.add_groups_inner(state, group_columns, params, agg_states, row_count)
9495
} else {
9596
let mut new_count = 0;
9697
for start in (0..row_count).step_by(BATCH_ADD_SIZE) {
@@ -104,9 +105,18 @@ impl AggregateHashTable {
104105
.iter()
105106
.map(|c| c.iter().map(|x| x.slice(start..end)).collect())
106107
.collect::<Vec<_>>();
108+
let agg_states = agg_states
109+
.iter()
110+
.map(|c| c.slice(start..end))
111+
.collect::<Vec<_>>();
107112

108-
new_count +=
109-
self.add_groups_inner(state, &step_group_columns, &step_params, end - start)?;
113+
new_count += self.add_groups_inner(
114+
state,
115+
&step_group_columns,
116+
&step_params,
117+
&agg_states,
118+
end - start,
119+
)?;
110120
}
111121
Ok(new_count)
112122
}
@@ -118,6 +128,7 @@ impl AggregateHashTable {
118128
state: &mut ProbeState,
119129
group_columns: &[Column],
120130
params: &[Vec<Column>],
131+
agg_states: &[Column],
121132
row_count: usize,
122133
) -> Result<usize> {
123134
state.row_count = row_count;
@@ -132,19 +143,30 @@ impl AggregateHashTable {
132143
state.addresses[i].add(self.payload.state_offset) as _,
133144
) as usize)
134145
};
135-
debug_assert_eq!(usize::from(state.state_places[i]) % 8, 0);
136146
}
137147

138148
let state_places = &state.state_places.as_slice()[0..row_count];
139149

140-
for ((aggr, params), addr_offset) in self
141-
.payload
142-
.aggrs
143-
.iter()
144-
.zip(params.iter())
145-
.zip(self.payload.state_addr_offsets.iter())
146-
{
147-
aggr.accumulate_keys(state_places, *addr_offset, params, row_count)?;
150+
if agg_states.is_empty() {
151+
for ((aggr, params), addr_offset) in self
152+
.payload
153+
.aggrs
154+
.iter()
155+
.zip(params.iter())
156+
.zip(self.payload.state_addr_offsets.iter())
157+
{
158+
aggr.accumulate_keys(state_places, *addr_offset, params, row_count)?;
159+
}
160+
} else {
161+
for ((aggr, agg_state), addr_offset) in self
162+
.payload
163+
.aggrs
164+
.iter()
165+
.zip(agg_states.iter())
166+
.zip(self.payload.state_addr_offsets.iter())
167+
{
168+
aggr.batch_merge(state_places, *addr_offset, agg_state)?;
169+
}
148170
}
149171
}
150172

src/query/expression/src/aggregate/group_hash.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use ordered_float::OrderedFloat;
1717

1818
use crate::types::decimal::DecimalType;
1919
use crate::types::geometry::GeometryType;
20+
use crate::types::AnyType;
2021
use crate::types::ArgType;
2122
use crate::types::BinaryType;
2223
use crate::types::BitmapType;
@@ -28,9 +29,11 @@ use crate::types::NumberDataType;
2829
use crate::types::NumberType;
2930
use crate::types::StringType;
3031
use crate::types::TimestampType;
32+
use crate::types::ValueType;
3133
use crate::types::VariantType;
3234
use crate::with_number_mapped_type;
3335
use crate::Column;
36+
use crate::ScalarRef;
3437

3538
const NULL_HASH_VAL: u64 = 0xd1cefa08eb382d69;
3639

@@ -96,14 +99,12 @@ pub fn combine_group_hash_column<const IS_FIRST: bool>(c: &Column, values: &mut
9699
}
97100
}
98101
}
99-
DataType::Tuple(_) => todo!(),
100-
DataType::Array(_) => todo!(),
101-
DataType::Map(_) => todo!(),
102102
DataType::Generic(_) => unreachable!(),
103+
_ => combine_group_hash_type_column::<IS_FIRST, AnyType>(c, values),
103104
}
104105
}
105106

106-
fn combine_group_hash_type_column<const IS_FIRST: bool, T: ArgType>(
107+
fn combine_group_hash_type_column<const IS_FIRST: bool, T: ValueType>(
107108
col: &Column,
108109
values: &mut [u64],
109110
) where
@@ -246,3 +247,10 @@ impl AggHash for OrderedFloat<f64> {
246247
}
247248
}
248249
}
250+
251+
impl AggHash for ScalarRef<'_> {
252+
#[inline(always)]
253+
fn agg_hash(&self) -> u64 {
254+
self.to_string().as_bytes().agg_hash()
255+
}
256+
}

src/query/expression/src/aggregate/payload.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -224,21 +224,20 @@ impl Payload {
224224
for col in group_columns {
225225
if let Column::Nullable(c) = col {
226226
let bitmap = &c.validity;
227-
if bitmap.unset_bits() == 0 {
227+
if bitmap.unset_bits() == 0 || bitmap.unset_bits() == bitmap.len() {
228+
let val: u8 = if bitmap.unset_bits() == 0 { 1 } else { 0 };
228229
// faster path
229230
for idx in select_vector.iter().take(new_group_rows).copied() {
230231
unsafe {
231232
let dst = address[idx].add(write_offset);
232-
store(1, dst as *mut u8);
233+
store(val, dst as *mut u8);
233234
}
234235
}
235-
} else if bitmap.unset_bits() != bitmap.len() {
236+
} else {
236237
for idx in select_vector.iter().take(new_group_rows).copied() {
237-
if bitmap.get_bit(idx) {
238-
unsafe {
239-
let dst = address[idx].add(write_offset);
240-
store(1, dst as *mut u8);
241-
}
238+
unsafe {
239+
let dst = address[idx].add(write_offset);
240+
store(bitmap.get_bit(idx) as u8, dst as *mut u8);
242241
}
243242
}
244243
}

src/query/expression/src/aggregate/payload_flush.rs

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,31 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_io::prelude::bincode_deserialize_from_slice;
1516
use ethnum::i256;
1617

1718
use super::partitioned_payload::PartitionedPayload;
1819
use super::payload::Payload;
1920
use super::probe_state::ProbeState;
2021
use crate::types::binary::BinaryColumn;
2122
use crate::types::binary::BinaryColumnBuilder;
23+
use crate::types::decimal::Decimal;
2224
use crate::types::decimal::DecimalType;
2325
use crate::types::nullable::NullableColumn;
2426
use crate::types::string::StringColumn;
2527
use crate::types::ArgType;
2628
use crate::types::BooleanType;
2729
use crate::types::DataType;
2830
use crate::types::DateType;
31+
use crate::types::DecimalSize;
2932
use crate::types::NumberDataType;
3033
use crate::types::NumberType;
3134
use crate::types::TimestampType;
35+
use crate::types::ValueType;
3236
use crate::with_number_mapped_type;
3337
use crate::Column;
38+
use crate::ColumnBuilder;
39+
use crate::Scalar;
3440
use crate::StateAddr;
3541
use crate::BATCH_SIZE;
3642

@@ -160,11 +166,11 @@ impl Payload {
160166
self.flush_type_column::<NumberType<NUM_TYPE>>(col_offset, state),
161167
}),
162168
DataType::Decimal(v) => match v {
163-
crate::types::DecimalDataType::Decimal128(_) => {
164-
self.flush_type_column::<DecimalType<i128>>(col_offset, state)
169+
crate::types::DecimalDataType::Decimal128(s) => {
170+
self.flush_decimal_column::<i128>(col_offset, state, s)
165171
}
166-
crate::types::DecimalDataType::Decimal256(_) => {
167-
self.flush_type_column::<DecimalType<i256>>(col_offset, state)
172+
crate::types::DecimalDataType::Decimal256(s) => {
173+
self.flush_decimal_column::<i256>(col_offset, state, s)
168174
}
169175
},
170176
DataType::Timestamp => self.flush_type_column::<TimestampType>(col_offset, state),
@@ -175,10 +181,7 @@ impl Payload {
175181
DataType::Variant => Column::Variant(self.flush_binary_column(col_offset, state)),
176182
DataType::Geometry => Column::Geometry(self.flush_binary_column(col_offset, state)),
177183
DataType::Nullable(_) => unreachable!(),
178-
DataType::Array(_) => todo!(),
179-
DataType::Map(_) => todo!(),
180-
DataType::Tuple(_) => todo!(),
181-
DataType::Generic(_) => unreachable!(),
184+
other => self.flush_generic_column(&other, col_offset, state),
182185
};
183186

184187
let validity_offset = self.validity_offsets[col_index];
@@ -208,6 +211,22 @@ impl Payload {
208211
T::upcast_column(col)
209212
}
210213

214+
fn flush_decimal_column<Num: Decimal>(
215+
&self,
216+
col_offset: usize,
217+
state: &mut PayloadFlushState,
218+
decimal_size: DecimalSize,
219+
) -> Column {
220+
let len = state.probe_state.row_count;
221+
let iter = (0..len).map(|idx| unsafe {
222+
core::ptr::read::<<DecimalType<Num> as ValueType>::Scalar>(
223+
state.addresses[idx].add(col_offset) as _,
224+
)
225+
});
226+
let col = DecimalType::<Num>::column_from_iter(iter, &[]);
227+
Num::upcast_column(col, decimal_size)
228+
}
229+
211230
fn flush_binary_column(
212231
&self,
213232
col_offset: usize,
@@ -240,4 +259,30 @@ impl Payload {
240259
) -> StringColumn {
241260
unsafe { StringColumn::from_binary_unchecked(self.flush_binary_column(col_offset, state)) }
242261
}
262+
263+
fn flush_generic_column(
264+
&self,
265+
data_type: &DataType,
266+
col_offset: usize,
267+
state: &mut PayloadFlushState,
268+
) -> Column {
269+
let len = state.probe_state.row_count;
270+
let mut builder = ColumnBuilder::with_capacity(data_type, len);
271+
272+
unsafe {
273+
for idx in 0..len {
274+
let str_len =
275+
core::ptr::read::<u32>(state.addresses[idx].add(col_offset) as _) as usize;
276+
let data_address =
277+
core::ptr::read::<u64>(state.addresses[idx].add(col_offset + 4) as _) as usize
278+
as *const u8;
279+
280+
let scalar = std::slice::from_raw_parts(data_address, str_len);
281+
let scalar: Scalar = bincode_deserialize_from_slice(scalar).unwrap();
282+
283+
builder.push(scalar.as_ref());
284+
}
285+
}
286+
builder.build()
287+
}
243288
}

0 commit comments

Comments
 (0)