Skip to content

Commit 7dc8a1a

Browse files
authored
refactor(query): refactor hash join spill (#15746)
* chore: refactor HashJoinBuildState * chore: refactor HashJoinProbeState * chore: refactor HashJoinState * refactor: unify hash join spiller and optimize partition * refactor: improve SpillBuffer * refactor: new step transition mechanism. * chore: merge main * chore: fix fast return * chore: make lint * chore: fix WaitCollect and remove useless code * chore: fix unsplitted data blocks * chore: fix add_splitted_data_blocks * chore: fix add_splitted_data_blocks * chore(code): fix typos * chore(query): refine spill buffer and some code * chore(query): minimum threshold for each partition * chore(query): update minimum threshold of partition * chore(query): fix can probe first round * chore(query): improve HashJoinSpiller * chore(query): use 1024 instead of 1000 in SpillBuffer * chore(query): refine comment * chore(query): refine spill buffer partition_threshold * chore(test): add tpch join spill test * chore(test): add more test * chore(query): refine code * chore(query): remove probe first round * chore(query): fix is_left_related_join_type * chore(test): refine test * chore(test): refine test * chore(query): fix can_fast_return * chore(query): fix cross join spill * chore(test): refine tpch test * chore(test): add tpch spill test * chore(test): add tpcds spill test * chore(test): add tpch spill test * chore(test): add tpcds spill test * chore(test): disable tpcds aggregate spill test * chore(test): disable aggregate in tpch spill test * chore(query): disable aggregate spill by default * chore(query): support disable spill * chore(query): stricter atomic ordering * chore(query): refine atomic ordering * chore(query): remove unused code * chore(query): add #[allow(unused)]
1 parent 449d66e commit 7dc8a1a

25 files changed

+3076
-3028
lines changed

src/query/service/src/pipelines/builders/builder_join.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,10 @@ use databend_common_sql::IndexType;
2828
use crate::pipelines::processors::transforms::range_join::RangeJoinState;
2929
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinLeft;
3030
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinRight;
31-
use crate::pipelines::processors::transforms::BuildSpillState;
3231
use crate::pipelines::processors::transforms::HashJoinBuildState;
3332
use crate::pipelines::processors::transforms::HashJoinProbeState;
3433
use crate::pipelines::processors::transforms::MaterializedCteSink;
3534
use crate::pipelines::processors::transforms::MaterializedCteState;
36-
use crate::pipelines::processors::transforms::ProbeSpillState;
3735
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
3836
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
3937
use crate::pipelines::processors::HashJoinDesc;
@@ -161,19 +159,9 @@ impl PipelineBuilder {
161159
)?;
162160

163161
let create_sink_processor = |input| {
164-
let spill_state = if join_state.enable_spill {
165-
Some(Box::new(BuildSpillState::create(
166-
self.ctx.clone(),
167-
build_state.clone(),
168-
)?))
169-
} else {
170-
None
171-
};
172-
173162
Ok(ProcessorPtr::create(TransformHashJoinBuild::try_create(
174163
input,
175164
build_state.clone(),
176-
spill_state,
177165
)?))
178166
};
179167
// for distributed merge into when source as build side.
@@ -192,7 +180,6 @@ impl PipelineBuilder {
192180

193181
let max_block_size = self.settings.get_max_block_size()? as usize;
194182
let barrier = Barrier::new(self.main_pipeline.output_len());
195-
let restore_barrier = Barrier::new(self.main_pipeline.output_len());
196183
let probe_state = Arc::new(HashJoinProbeState::create(
197184
self.ctx.clone(),
198185
self.func_ctx.clone(),
@@ -203,29 +190,18 @@ impl PipelineBuilder {
203190
&join.join_type,
204191
self.main_pipeline.output_len(),
205192
barrier,
206-
restore_barrier,
207193
)?);
208194
let mut has_string_column = false;
209195
for field in join.output_schema()?.fields() {
210196
has_string_column |= field.data_type().is_string_column();
211197
}
212198

213199
self.main_pipeline.add_transform(|input, output| {
214-
let probe_spill_state = if state.enable_spill {
215-
Some(Box::new(ProbeSpillState::create(
216-
self.ctx.clone(),
217-
probe_state.clone(),
218-
)?))
219-
} else {
220-
None
221-
};
222-
223200
Ok(ProcessorPtr::create(TransformHashJoinProbe::create(
224201
input,
225202
output,
226203
join.projections.clone(),
227204
probe_state.clone(),
228-
probe_spill_state,
229205
max_block_size,
230206
self.func_ctx.clone(),
231207
&join.join_type,

src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/build_spill_state.rs

Lines changed: 0 additions & 117 deletions
This file was deleted.

src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/mod.rs

Lines changed: 0 additions & 19 deletions
This file was deleted.

0 commit comments

Comments
 (0)