Skip to content

Commit f8494c8

Browse files
Freejwwjwsundy-li
authored
chore(query): refactor new transform partition bucket for new aggregation hashtable (#15093)
* finish new transform partition bucket for singleton * support for cluster * fix new transform partition bucket * revert * fix cluster spill bug * remove code * fix logic shortcirt bug * fix logic shortcirt bug * refactor block_number * fix block_number * fix spill hang by avoid sending empty block --------- Co-authored-by: jw <freejw@gmail.com> Co-authored-by: sundy-li <543950155@qq.com>
1 parent dac261a commit f8494c8

19 files changed

+861
-438
lines changed

src/query/expression/src/aggregate/aggregate_hashtable.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,17 @@ impl AggregateHashTable {
9797
config: HashTableConfig,
9898
capacity: usize,
9999
arena: Arc<Bump>,
100+
need_init_entry: bool,
100101
) -> Self {
102+
let entries = if need_init_entry {
103+
vec![0u64; capacity]
104+
} else {
105+
vec![]
106+
};
101107
Self {
102-
entries: vec![],
108+
entries,
103109
count: 0,
104-
direct_append: true,
110+
direct_append: !need_init_entry,
105111
current_radix_bits: config.initial_radix_bits,
106112
payload: PartitionedPayload::new(
107113
group_types,

src/query/expression/src/aggregate/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ impl HashTableConfig {
105105

106106
pub fn cluster_with_partial(mut self, partial_agg: bool, node_nums: usize) -> Self {
107107
self.partial_agg = partial_agg;
108+
self.repartition_radix_bits_incr = 4;
108109
self.max_partial_capacity = 131072 * (2 << node_nums);
109110

110111
self

src/query/expression/src/aggregate/partitioned_payload.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -262,16 +262,6 @@ impl PartitionedPayload {
262262
pub fn memory_size(&self) -> usize {
263263
self.payloads.iter().map(|x| x.memory_size()).sum()
264264
}
265-
266-
pub fn include_arena(&self, other: &Arc<Bump>) -> bool {
267-
for arena in self.arenas.iter() {
268-
if Arc::ptr_eq(arena, other) {
269-
return true;
270-
}
271-
}
272-
273-
false
274-
}
275265
}
276266

277267
#[inline]

src/query/service/src/pipelines/builders/builder_aggregate.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,14 @@ impl PipelineBuilder {
106106
.settings
107107
.get_enable_experimental_aggregate_hashtable()?;
108108

109+
let in_cluster = !self.ctx.get_cluster().is_empty();
110+
109111
let params = Self::build_aggregator_params(
110112
aggregate.input.output_schema()?,
111113
&aggregate.group_by,
112114
&aggregate.agg_funcs,
113115
enable_experimental_aggregate_hashtable,
116+
in_cluster,
114117
max_block_size as usize,
115118
None,
116119
)?;
@@ -217,12 +220,13 @@ impl PipelineBuilder {
217220
let enable_experimental_aggregate_hashtable = self
218221
.settings
219222
.get_enable_experimental_aggregate_hashtable()?;
220-
223+
let in_cluster = !self.ctx.get_cluster().is_empty();
221224
let params = Self::build_aggregator_params(
222225
aggregate.before_group_by_schema.clone(),
223226
&aggregate.group_by,
224227
&aggregate.agg_funcs,
225228
enable_experimental_aggregate_hashtable,
229+
in_cluster,
226230
max_block_size as usize,
227231
aggregate.limit,
228232
)?;
@@ -288,6 +292,7 @@ impl PipelineBuilder {
288292
group_by: &[IndexType],
289293
agg_funcs: &[AggregateFunctionDesc],
290294
enable_experimental_aggregate_hashtable: bool,
295+
in_cluster: bool,
291296
max_block_size: usize,
292297
limit: Option<usize>,
293298
) -> Result<Arc<AggregatorParams>> {
@@ -329,6 +334,7 @@ impl PipelineBuilder {
329334
&aggs,
330335
&agg_args,
331336
enable_experimental_aggregate_hashtable,
337+
in_cluster,
332338
max_block_size,
333339
limit,
334340
)?;

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ struct AggregateExchangeSorting<Method: HashMethodBounds, V: Send + Sync + 'stat
6464
_phantom: PhantomData<(Method, V)>,
6565
}
6666

67+
pub fn compute_block_number(bucket: isize, max_partition_count: usize) -> Result<isize> {
68+
Ok(max_partition_count as isize * 1000 + bucket)
69+
}
70+
6771
impl<Method: HashMethodBounds, V: Send + Sync + 'static> ExchangeSorting
6872
for AggregateExchangeSorting<Method, V>
6973
{
@@ -78,14 +82,17 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> ExchangeSorting
7882
))),
7983
Some(meta_info) => match meta_info {
8084
AggregateMeta::Partitioned { .. } => unreachable!(),
81-
AggregateMeta::Serialized(v) => Ok(v.bucket),
85+
AggregateMeta::Serialized(v) => {
86+
compute_block_number(v.bucket, v.max_partition_count)
87+
}
8288
AggregateMeta::HashTable(v) => Ok(v.bucket),
83-
AggregateMeta::AggregateHashTable(_) => unreachable!(),
84-
AggregateMeta::AggregatePayload(v) => Ok(v.bucket),
89+
AggregateMeta::AggregatePayload(v) => {
90+
compute_block_number(v.bucket, v.max_partition_count)
91+
}
8592
AggregateMeta::AggregateSpilling(_)
8693
| AggregateMeta::Spilled(_)
87-
| AggregateMeta::Spilling(_)
88-
| AggregateMeta::BucketSpilled(_) => Ok(-1),
94+
| AggregateMeta::BucketSpilled(_)
95+
| AggregateMeta::Spilling(_) => Ok(-1),
8996
},
9097
}
9198
}
@@ -252,9 +259,12 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> FlightScatter
252259
}
253260
AggregateMeta::AggregateSpilling(payload) => {
254261
for p in scatter_partitioned_payload(payload, self.buckets)? {
255-
blocks.push(DataBlock::empty_with_meta(
256-
AggregateMeta::<Method, V>::create_agg_spilling(p),
257-
))
262+
blocks.push(match p.len() == 0 {
263+
true => DataBlock::empty(),
264+
false => DataBlock::empty_with_meta(
265+
AggregateMeta::<Method, V>::create_agg_spilling(p),
266+
),
267+
});
258268
}
259269
}
260270
AggregateMeta::HashTable(payload) => {
@@ -271,16 +281,18 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> FlightScatter
271281
});
272282
}
273283
}
274-
AggregateMeta::AggregateHashTable(_) => unreachable!(),
275284
AggregateMeta::AggregatePayload(p) => {
276285
for payload in scatter_payload(p.payload, self.buckets)? {
277-
blocks.push(DataBlock::empty_with_meta(
278-
AggregateMeta::<Method, V>::create_agg_payload(
279-
p.bucket,
280-
payload,
281-
p.max_partition_count,
286+
blocks.push(match payload.len() == 0 {
287+
true => DataBlock::empty(),
288+
false => DataBlock::empty_with_meta(
289+
AggregateMeta::<Method, V>::create_agg_payload(
290+
p.bucket,
291+
payload,
292+
p.max_partition_count,
293+
),
282294
),
283-
))
295+
});
284296
}
285297
}
286298
};

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,22 @@ impl SerializedPayload {
5959
aggrs: Vec<Arc<dyn AggregateFunction>>,
6060
radix_bits: u64,
6161
arena: Arc<Bump>,
62+
need_init_entry: bool,
6263
) -> Result<AggregateHashTable> {
6364
let rows_num = self.data_block.num_rows();
65+
let capacity = AggregateHashTable::get_capacity_for_count(rows_num);
6466
let config = HashTableConfig::default().with_initial_radix_bits(radix_bits);
6567
let mut state = ProbeState::default();
6668
let agg_len = aggrs.len();
6769
let group_len = group_types.len();
68-
let mut hashtable =
69-
AggregateHashTable::new_directly(group_types, aggrs, config, rows_num, arena);
70+
let mut hashtable = AggregateHashTable::new_directly(
71+
group_types,
72+
aggrs,
73+
config,
74+
capacity,
75+
arena,
76+
need_init_entry,
77+
);
7078

7179
let agg_states = (0..agg_len)
7280
.map(|i| {
@@ -103,7 +111,8 @@ impl SerializedPayload {
103111
radix_bits: u64,
104112
arena: Arc<Bump>,
105113
) -> Result<PartitionedPayload> {
106-
let hashtable = self.convert_to_aggregate_table(group_types, aggrs, radix_bits, arena)?;
114+
let hashtable =
115+
self.convert_to_aggregate_table(group_types, aggrs, radix_bits, arena, false)?;
107116
Ok(hashtable.payload)
108117
}
109118
}
@@ -126,7 +135,6 @@ pub struct AggregatePayload {
126135
pub enum AggregateMeta<Method: HashMethodBounds, V: Send + Sync + 'static> {
127136
Serialized(SerializedPayload),
128137
HashTable(HashTablePayload<Method, V>),
129-
AggregateHashTable(PartitionedPayload),
130138
AggregatePayload(AggregatePayload),
131139
AggregateSpilling(PartitionedPayload),
132140
BucketSpilled(BucketSpilledPayload),
@@ -144,10 +152,6 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> AggregateMeta<Method, V
144152
}))
145153
}
146154

147-
pub fn create_agg_hashtable(payload: PartitionedPayload) -> BlockMetaInfoPtr {
148-
Box::new(AggregateMeta::<Method, V>::AggregateHashTable(payload))
149-
}
150-
151155
pub fn create_agg_payload(
152156
bucket: isize,
153157
payload: Payload,
@@ -231,9 +235,6 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Debug for AggregateMeta
231235
AggregateMeta::Spilling(_) => f.debug_struct("Aggregate::Spilling").finish(),
232236
AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilling").finish(),
233237
AggregateMeta::BucketSpilled(_) => f.debug_struct("Aggregate::BucketSpilled").finish(),
234-
AggregateMeta::AggregateHashTable(_) => {
235-
f.debug_struct("AggregateMeta:AggHashTable").finish()
236-
}
237238
AggregateMeta::AggregatePayload(_) => {
238239
f.debug_struct("AggregateMeta:AggregatePayload").finish()
239240
}

src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub struct AggregatorParams {
4242
pub offsets_aggregate_states: Vec<usize>,
4343

4444
pub enable_experimental_aggregate_hashtable: bool,
45+
pub in_cluster: bool,
4546
pub max_block_size: usize,
4647
// Limit is push down to AggregatorTransform
4748
pub limit: Option<usize>,
@@ -55,6 +56,7 @@ impl AggregatorParams {
5556
agg_funcs: &[AggregateFunctionRef],
5657
agg_args: &[Vec<usize>],
5758
enable_experimental_aggregate_hashtable: bool,
59+
in_cluster: bool,
5860
max_block_size: usize,
5961
limit: Option<usize>,
6062
) -> Result<Arc<AggregatorParams>> {
@@ -74,6 +76,7 @@ impl AggregatorParams {
7476
layout: states_layout,
7577
offsets_aggregate_states: states_offsets,
7678
enable_experimental_aggregate_hashtable,
79+
in_cluster,
7780
max_block_size,
7881
limit,
7982
}))

src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ mod aggregate_cell;
1616
mod aggregate_exchange_injector;
1717
mod aggregate_meta;
1818
mod aggregator_params;
19+
mod new_transform_partition_bucket;
1920
mod serde;
2021
mod transform_aggregate_expand;
2122
mod transform_aggregate_final;

0 commit comments

Comments
 (0)