Skip to content

Commit f22ad96

Browse files
authored
feat: support left-related join spilling (#14828)
* feat: support left-related join spilling * fix test
1 parent 0217923 commit f22ad96

File tree

10 files changed

+254
-9
lines changed

10 files changed

+254
-9
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/build_spill_state.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databend_common_catalog::table_context::TableContext;
2121
use databend_common_exception::Result;
2222
use databend_common_expression::DataBlock;
2323
use databend_common_pipeline_core::query_spill_prefix;
24+
use databend_common_sql::plans::JoinType;
2425
use databend_common_storage::DataOperator;
2526
use log::info;
2627

@@ -53,10 +54,22 @@ impl BuildSpillState {
5354
}
5455

5556
// Get all hashes for build input data.
56-
pub fn get_hashes(&self, block: &DataBlock, hashes: &mut Vec<u64>) -> Result<()> {
57+
pub fn get_hashes(
58+
&self,
59+
block: &DataBlock,
60+
join_type: Option<&JoinType>,
61+
hashes: &mut Vec<u64>,
62+
) -> Result<()> {
5763
let func_ctx = self.build_state.ctx.get_function_context()?;
5864
let keys = &self.build_state.hash_join_state.hash_join_desc.build_keys;
59-
get_hashes(&func_ctx, block, keys, &self.build_state.method, hashes)
65+
get_hashes(
66+
&func_ctx,
67+
block,
68+
keys,
69+
&self.build_state.method,
70+
join_type,
71+
hashes,
72+
)
6073
}
6174
}
6275

src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/transform_build_spill.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::sync::Arc;
1616

1717
use databend_common_exception::Result;
1818
use databend_common_expression::DataBlock;
19+
use databend_common_sql::plans::JoinType;
1920
use log::info;
2021

2122
use crate::pipelines::processors::transforms::BuildSpillState;
@@ -91,12 +92,12 @@ impl BuildSpillHandler {
9192
}
9293

9394
// Spill pending data block
94-
pub(crate) async fn spill(&mut self) -> Result<()> {
95+
pub(crate) async fn spill(&mut self, join_type: &JoinType) -> Result<()> {
9596
let pending_spill_data = self.pending_spill_data.clone();
9697
for block in pending_spill_data.iter() {
9798
let mut hashes = Vec::with_capacity(block.num_rows());
9899
let spill_state = self.spill_state_mut();
99-
spill_state.get_hashes(block, &mut hashes)?;
100+
spill_state.get_hashes(block, Some(join_type), &mut hashes)?;
100101
spill_state
101102
.spiller
102103
.spill_input(block.clone(), &hashes, None)

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,10 @@ impl HashJoinBuildState {
10321032
}
10331033
Ok(())
10341034
}
1035+
1036+
pub(crate) fn join_type(&self) -> JoinType {
1037+
self.hash_join_state.hash_join_desc.join_type.clone()
1038+
}
10351039
}
10361040

10371041
pub fn supported_join_type_for_runtime_filter(join_type: &JoinType) -> bool {

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use parking_lot::RwLock;
4242
use super::merge_into_hash_join_optimization::MergeIntoState;
4343
use crate::pipelines::processors::transforms::hash_join::build_state::BuildState;
4444
use crate::pipelines::processors::transforms::hash_join::row::RowSpace;
45+
use crate::pipelines::processors::transforms::hash_join::spill_common::spilling_supported_join_type;
4546
use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable;
4647
use crate::pipelines::processors::HashJoinDesc;
4748
use crate::sessions::QueryContext;
@@ -145,7 +146,7 @@ impl HashJoinState {
145146
let (build_done_watcher, _build_done_dummy_receiver) = watch::channel(0);
146147
let (continue_build_watcher, _continue_build_dummy_receiver) = watch::channel(false);
147148
let mut enable_spill = false;
148-
if hash_join_desc.join_type == JoinType::Inner
149+
if spilling_supported_join_type(&hash_join_desc.join_type)
149150
&& ctx.get_settings().get_join_spilling_memory_ratio()? != 0
150151
{
151152
enable_spill = true;

src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/probe_spill_state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl ProbeSpillState {
5757
block,
5858
keys,
5959
&self.probe_state.hash_method,
60+
None,
6061
hashes,
6162
)
6263
}

src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/transform_probe_spill.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,7 @@ impl TransformHashJoinProbe {
145145
self.step = HashJoinProbeStep::Running;
146146
self.step_logs.push(HashJoinProbeStep::Running);
147147
self.probe_state.reset();
148-
if (self.join_probe_state.hash_join_state.need_outer_scan()
149-
|| self.join_probe_state.hash_join_state.need_mark_scan())
148+
if self.join_probe_state.hash_join_state.need_final_scan()
150149
&& self.join_probe_state.probe_workers.load(Ordering::Relaxed) == 0
151150
{
152151
self.join_probe_state

src/query/service/src/pipelines/processors/transforms/hash_join/spill_common.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
// Define some methods that are used by both the build and probe spilling of the hash join.
1616

17+
use databend_common_arrow::arrow::bitmap::Bitmap;
1718
use databend_common_exception::Result;
1819
use databend_common_expression::types::DataType;
1920
use databend_common_expression::Column;
@@ -23,17 +24,38 @@ use databend_common_expression::Expr;
2324
use databend_common_expression::FunctionContext;
2425
use databend_common_expression::HashMethodKind;
2526
use databend_common_functions::BUILTIN_FUNCTIONS;
27+
use databend_common_sql::plans::JoinType;
2628

29+
use crate::pipelines::processors::transforms::hash_join::common::wrap_true_validity;
2730
use crate::pipelines::processors::transforms::hash_join::util::hash_by_method;
2831

2932
pub fn get_hashes(
3033
func_ctx: &FunctionContext,
3134
block: &DataBlock,
3235
keys: &[Expr],
3336
method: &HashMethodKind,
37+
join_type: Option<&JoinType>,
3438
hashes: &mut Vec<u64>,
3539
) -> Result<()> {
36-
let evaluator = Evaluator::new(block, func_ctx, &BUILTIN_FUNCTIONS);
40+
let mut block = block.clone();
41+
let mut evaluator = Evaluator::new(&block, func_ctx, &BUILTIN_FUNCTIONS);
42+
if let Some(join_type) = join_type {
43+
if matches!(
44+
join_type,
45+
JoinType::Left | JoinType::LeftSingle | JoinType::Full
46+
) {
47+
let validity = Bitmap::new_constant(true, block.num_rows());
48+
block = DataBlock::new(
49+
block
50+
.columns()
51+
.iter()
52+
.map(|c| wrap_true_validity(c, block.num_rows(), &validity))
53+
.collect::<Vec<_>>(),
54+
block.num_rows(),
55+
);
56+
evaluator = Evaluator::new(&block, func_ctx, &BUILTIN_FUNCTIONS);
57+
}
58+
}
3759
let columns: Vec<(Column, DataType)> = keys
3860
.iter()
3961
.map(|expr| {
@@ -49,3 +71,14 @@ pub fn get_hashes(
4971
hash_by_method(method, &columns, block.num_rows(), hashes)?;
5072
Ok(())
5173
}
74+
75+
pub fn spilling_supported_join_type(join_type: &JoinType) -> bool {
76+
matches!(
77+
*join_type,
78+
JoinType::Inner
79+
| JoinType::Left
80+
| JoinType::LeftSemi
81+
| JoinType::LeftAnti
82+
| JoinType::LeftSingle
83+
)
84+
}

src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,9 @@ impl Processor for TransformHashJoinBuild {
218218
self.step = HashJoinBuildStep::Finalize;
219219
}
220220
HashJoinBuildStep::Spill => {
221-
self.spill_handler.spill().await?;
221+
self.spill_handler
222+
.spill(&self.build_state.join_type())
223+
.await?;
222224
// After spill, the processor should continue to run, and process incoming data.
223225
self.step = HashJoinBuildStep::Running;
224226
}
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
statement ok
2+
set sandbox_tenant = 'test_tenant';
3+
4+
statement ok
5+
use spill_test;
6+
7+
statement ok
8+
set join_spilling_memory_ratio = 10;
9+
10+
statement ok
11+
set join_spilling_bytes_threshold_per_proc = 1024;
12+
13+
statement ok
14+
set disable_join_reorder = 1;
15+
16+
query I
17+
select
18+
c_custkey, count(o_orderkey) as c_count
19+
from
20+
customer
21+
left join
22+
orders
23+
on c_custkey = o_custkey
24+
and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120
25+
group by
26+
c_custkey
27+
order by c_custkey
28+
limit 20;
29+
----
30+
1 0
31+
2 0
32+
3 0
33+
4 0
34+
5 0
35+
6 0
36+
7 0
37+
8 0
38+
9 0
39+
10 0
40+
11 0
41+
12 0
42+
13 0
43+
14 0
44+
15 0
45+
16 0
46+
17 0
47+
18 0
48+
19 0
49+
20 0
50+
51+
query II
52+
select
53+
c_custkey
54+
from
55+
customer
56+
left semi join
57+
orders
58+
on c_custkey = o_custkey
59+
and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120
60+
order by c_custkey
61+
limit 20;
62+
----
63+
101
64+
101
65+
103
66+
103
67+
104
68+
104
69+
106
70+
106
71+
107
72+
107
73+
109
74+
109
75+
110
76+
110
77+
112
78+
112
79+
113
80+
113
81+
115
82+
115
83+
84+
query I
85+
select
86+
c_custkey
87+
from
88+
customer
89+
left anti join
90+
orders
91+
on c_custkey = o_custkey
92+
and o_comment not like '%pending%deposits%' and c_custkey > 100 and c_custkey < 120
93+
order by c_custkey
94+
limit 20;
95+
----
96+
1
97+
1
98+
2
99+
2
100+
3
101+
3
102+
4
103+
4
104+
5
105+
5
106+
6
107+
6
108+
7
109+
7
110+
8
111+
8
112+
9
113+
9
114+
10
115+
10
116+
117+
118+
# tpch queries contain left join
119+
#Q13
120+
query I
121+
select
122+
c_count,
123+
count(*) as custdist
124+
from
125+
(
126+
select
127+
c_custkey,
128+
count(o_orderkey) as c_count
129+
from
130+
customer
131+
left outer join
132+
orders
133+
on c_custkey = o_custkey
134+
and o_comment not like '%pending%deposits%'
135+
group by
136+
c_custkey
137+
)
138+
c_orders
139+
group by
140+
c_count
141+
order by
142+
custdist desc,
143+
c_count desc;
144+
----
145+
0 5000
146+
40 676
147+
36 651
148+
44 618
149+
48 554
150+
32 548
151+
52 514
152+
28 487
153+
76 485
154+
72 461
155+
56 454
156+
80 444
157+
64 442
158+
68 438
159+
60 430
160+
84 396
161+
88 378
162+
24 355
163+
92 322
164+
96 262
165+
100 188
166+
20 184
167+
104 162
168+
108 138
169+
112 103
170+
16 92
171+
116 59
172+
12 49
173+
120 29
174+
124 26
175+
128 19
176+
8 12
177+
132 8
178+
136 7
179+
140 5
180+
4 3
181+
144 1
182+
183+
184+
statement ok
185+
set disable_join_reorder = 0;
186+
187+
statement ok
188+
set join_spilling_memory_ratio = 0;
189+
190+
statement ok
191+
set join_spilling_bytes_threshold_per_proc = 0;

0 commit comments

Comments
 (0)