Skip to content

Commit 31a63b1

Browse files
authored
feat: add buffer for spiller (#15021)
1 parent 4c852e2 commit 31a63b1

File tree

6 files changed

+169
-18
lines changed

6 files changed

+169
-18
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/transform_build_spill.rs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ impl BuildSpillHandler {
9999
if join_type == &JoinType::Cross {
100100
return self.spill_cross_join().await;
101101
}
102+
if self.pending_spill_data.is_empty() && !self.spill_state().spiller.empty_buffer() {
103+
// Spill data in spiller buffer
104+
return self.spill_state_mut().spiller.spill_buffer().await;
105+
}
102106
// Concat the data blocks that pending to spill to reduce the spill file number.
103107
let pending_spill_data = DataBlock::concat(&self.pending_spill_data)?;
104108
let mut hashes = Vec::with_capacity(pending_spill_data.num_rows());
@@ -130,15 +134,7 @@ impl BuildSpillHandler {
130134
build_state: &Arc<HashJoinBuildState>,
131135
processor_id: usize,
132136
) -> Result<HashJoinBuildStep> {
133-
// Add spilled partition ids to `spill_partitions` of `HashJoinBuildState`
134137
let spilled_partition_set = self.spill_state().spiller.spilled_partitions();
135-
if build_state.join_type() != JoinType::Cross {
136-
info!(
137-
"build processor-{:?}: spill finished with spilled partitions {:?}",
138-
processor_id, spilled_partition_set
139-
);
140-
}
141-
142138
// For left-related join, will spill all build input blocks which means there isn't first-round hash table.
143139
// Because first-round hash table will make left join generate wrong results.
144140
// Todo: make left-related join leverage first-round hash table to reduce I/O.
@@ -151,18 +147,28 @@ impl BuildSpillHandler {
151147
return Ok(HashJoinBuildStep::Spill);
152148
}
153149

154-
if !spilled_partition_set.is_empty() {
155-
build_state
156-
.spilled_partition_set
157-
.write()
158-
.extend(spilled_partition_set);
159-
}
160150
// The processor has accepted all data from downstream
161151
// If there is still pending spill data, add to row space.
162152
for data in self.pending_spill_data.iter() {
163153
build_state.build(data.clone())?;
164154
}
165155
self.pending_spill_data.clear();
156+
// Check if there is data in spiller buffer
157+
if !self.spill_state().spiller.empty_buffer() {
158+
return Ok(HashJoinBuildStep::Spill);
159+
}
160+
if build_state.join_type() != JoinType::Cross {
161+
info!(
162+
"build processor-{:?}: spill finished with spilled partitions {:?}",
163+
processor_id, spilled_partition_set
164+
);
165+
}
166+
if !spilled_partition_set.is_empty() {
167+
build_state
168+
.spilled_partition_set
169+
.write()
170+
.extend(spilled_partition_set);
171+
}
166172
Ok(HashJoinBuildStep::Running)
167173
}
168174

src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/transform_probe_spill.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ pub struct ProbeSpillHandler {
3838
next_restore_file: usize,
3939
// Save input block from the processor if the input block has zero columns.
4040
input_blocks: Vec<DataBlock>,
41+
// The flag indicates whether spill the buffer data
42+
spill_buffer: bool,
4143
}
4244

4345
impl ProbeSpillHandler {
@@ -48,6 +50,7 @@ impl ProbeSpillHandler {
4850
probe_first_round_hashtable: true,
4951
next_restore_file: 0,
5052
input_blocks: vec![],
53+
spill_buffer: false,
5154
}
5255
}
5356

@@ -85,6 +88,14 @@ impl ProbeSpillHandler {
8588
self.spill_done = true;
8689
}
8790

91+
pub fn need_spill_buffer(&self) -> bool {
92+
self.spill_buffer
93+
}
94+
95+
pub fn set_need_spill_buffer(&mut self) {
96+
self.spill_buffer = true;
97+
}
98+
8899
pub fn add_partition_loc(&mut self, id: u8, loc: Vec<String>) {
89100
self.spill_state_mut()
90101
.spiller
@@ -138,6 +149,16 @@ impl ProbeSpillHandler {
138149
.spill_input(data_block, hashes, left_related_join, spilled_partitions)
139150
.await
140151
}
152+
153+
// Check if spiller buffer is empty
154+
pub fn empty_buffer(&self) -> bool {
155+
self.spill_state().spiller.empty_buffer()
156+
}
157+
158+
// Spill buffer data
159+
pub async fn spill_buffer(&mut self) -> Result<()> {
160+
self.spill_state_mut().spiller.spill_buffer().await
161+
}
141162
}
142163

143164
/// The following methods only used for cross join
@@ -226,6 +247,12 @@ impl TransformHashJoinProbe {
226247
// then add spill_partitions to `spill_partition_set` and set `spill_done` to true.
227248
// change current step to `WaitBuild`
228249
pub(crate) fn spill_finished(&mut self, processor_id: usize) -> Result<Event> {
250+
if !self.spill_handler.empty_buffer() {
251+
self.step = HashJoinProbeStep::Spill;
252+
self.step_logs.push(HashJoinProbeStep::Spill);
253+
self.spill_handler.set_need_spill_buffer();
254+
return Ok(Event::Async);
255+
}
229256
self.spill_handler.set_spill_done();
230257
// Add spilled partition ids to `spill_partitions` of `HashJoinProbeState`
231258
let spilled_partition_set = &self.spill_handler.spilled_partitions();
@@ -293,6 +320,11 @@ impl TransformHashJoinProbe {
293320

294321
// Async spill action
295322
pub(crate) async fn spill_action(&mut self) -> Result<()> {
323+
if self.spill_handler.need_spill_buffer() {
324+
self.spill_handler.spill_buffer().await?;
325+
self.spill_finished(self.processor_id)?;
326+
return Ok(());
327+
}
296328
// Before spilling, if there is a hash table, probe the hash table first
297329
self.try_probe_first_round_hashtable(self.input_data.clone())?;
298330
let left_related_join_type = matches!(

src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,10 @@ impl TransformHashJoinProbe {
235235
Ok(Event::Async)
236236
}
237237

238+
fn spill(&self) -> Result<Event> {
239+
Ok(Event::Async)
240+
}
241+
238242
// Running
239243
// When spilling is enabled, the method contains two running paths
240244
// 1. Before spilling, it will pull data from input port and go to spill
@@ -391,7 +395,7 @@ impl Processor for TransformHashJoinProbe {
391395
HashJoinProbeStep::Running => self.run(),
392396
HashJoinProbeStep::Restore => self.restore(),
393397
HashJoinProbeStep::FinalScan => self.final_scan(),
394-
HashJoinProbeStep::Spill => unreachable!("{:?}", self.step),
398+
HashJoinProbeStep::Spill => self.spill(),
395399
}
396400
}
397401

src/query/service/src/spillers/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
mod spiller;
16+
mod spiller_buffer;
1617

1718
pub use spiller::Spiller;
1819
pub use spiller::SpillerConfig;

src/query/service/src/spillers/spiller.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_expression::DataBlock;
2828
use opendal::Operator;
2929

3030
use crate::sessions::QueryContext;
31+
use crate::spillers::spiller_buffer::SpillerBuffer;
3132

3233
/// Spiller type, currently only supports HashJoin
3334
#[derive(Clone, Debug, Eq, PartialEq)]
@@ -72,6 +73,7 @@ pub struct Spiller {
7273
operator: Operator,
7374
config: SpillerConfig,
7475
_spiller_type: SpillerType,
76+
spiller_buffer: SpillerBuffer,
7577
pub join_spilling_partition_bits: usize,
7678
/// 1 partition -> N partition files
7779
pub partition_location: HashMap<u8, Vec<String>>,
@@ -93,6 +95,7 @@ impl Spiller {
9395
operator,
9496
config,
9597
_spiller_type: spiller_type,
98+
spiller_buffer: SpillerBuffer::create(),
9699
join_spilling_partition_bits,
97100
partition_location: Default::default(),
98101
columns_layout: Default::default(),
@@ -191,7 +194,6 @@ impl Spiller {
191194
}
192195

193196
#[async_backtrace::framed]
194-
// Directly spill input data without buffering.
195197
// Need to compute hashes for data block advanced.
196198
// For probe, only need to spill rows in build spilled partitions.
197199
// For left-related join, need to record rows not in build spilled partitions.
@@ -234,8 +236,12 @@ impl Spiller {
234236
&block_row_indexes,
235237
row_indexes.len(),
236238
);
237-
// Spill block with partition id
238-
self.spill_with_partition(*p_id, block).await?;
239+
if let Some((p_id, block)) = self
240+
.spiller_buffer
241+
.add_partition_unspilled_data(*p_id, block)?
242+
{
243+
self.spill_with_partition(p_id, block).await?;
244+
}
239245
}
240246
if !left_related_join {
241247
return Ok(None);
@@ -251,6 +257,24 @@ impl Spiller {
251257
)))
252258
}
253259

260+
// Spill the data in the buffer at the end of spilling
261+
pub(crate) async fn spill_buffer(&mut self) -> Result<()> {
262+
let partition_unspilled_data = self.spiller_buffer.partition_unspilled_data();
263+
for (partition_id, blocks) in partition_unspilled_data.iter() {
264+
if !blocks.is_empty() {
265+
let merged_block = DataBlock::concat(blocks)?;
266+
self.spill_with_partition(*partition_id, merged_block)
267+
.await?;
268+
}
269+
}
270+
self.spiller_buffer.reset();
271+
Ok(())
272+
}
273+
274+
pub(crate) fn empty_buffer(&self) -> bool {
275+
self.spiller_buffer.empty()
276+
}
277+
254278
pub(crate) fn spilled_files(&self) -> Vec<String> {
255279
self.columns_layout.keys().cloned().collect()
256280
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
17+
use databend_common_exception::ErrorCode;
18+
use databend_common_exception::Result;
19+
use databend_common_expression::DataBlock;
20+
21+
// The spiller buffer will record each partition's unspilled data.
22+
// When the buffer is full(>=8MB), it will pick the partition with the most unspilled data to spill.
23+
#[derive(Clone)]
24+
pub(crate) struct SpillerBuffer {
25+
partition_unspilled_data: HashMap<u8, Vec<DataBlock>>,
26+
buffer_size: usize,
27+
}
28+
29+
impl SpillerBuffer {
30+
pub(crate) fn create() -> Self {
31+
SpillerBuffer {
32+
partition_unspilled_data: HashMap::new(),
33+
buffer_size: 0,
34+
}
35+
}
36+
37+
// Add a partition's unspilled data to the buffer
38+
// The method will check if the buffer is full, if so, it will spill the partition with the most unspilled data
39+
// After spilling, the buffer will be clear the spilled partition's unspilled data
40+
// The return value is the partition id and the spilled data
41+
pub(crate) fn add_partition_unspilled_data(
42+
&mut self,
43+
partition_id: u8,
44+
data: DataBlock,
45+
) -> Result<Option<(u8, DataBlock)>> {
46+
fn rows(data: &[DataBlock]) -> usize {
47+
data.iter().map(|block| block.num_rows()).sum()
48+
}
49+
let data_size = data.memory_size();
50+
self.buffer_size += data_size;
51+
self.partition_unspilled_data
52+
.entry(partition_id)
53+
.or_default()
54+
.push(data);
55+
if self.buffer_size >= 8 * 1024 * 1024 {
56+
// Pick the partition with the most unspilled data in `partition_unspilled_data`
57+
let (partition_id, blocks) = self
58+
.partition_unspilled_data
59+
.iter()
60+
.max_by_key(|(_, v)| rows(v))
61+
.map(|(k, v)| (*k, v.clone()))
62+
.ok_or_else(|| ErrorCode::Internal("No unspilled data in the buffer"))?;
63+
debug_assert!(!blocks.is_empty());
64+
self.partition_unspilled_data.remove(&partition_id);
65+
let merged_block = DataBlock::concat(&blocks)?;
66+
self.buffer_size -= merged_block.memory_size();
67+
return Ok(Some((partition_id, merged_block)));
68+
}
69+
Ok(None)
70+
}
71+
72+
pub(crate) fn empty(&self) -> bool {
73+
self.buffer_size == 0
74+
}
75+
76+
pub(crate) fn partition_unspilled_data(&self) -> HashMap<u8, Vec<DataBlock>> {
77+
self.partition_unspilled_data.clone()
78+
}
79+
80+
pub(crate) fn reset(&mut self) {
81+
self.partition_unspilled_data.clear();
82+
self.buffer_size = 0;
83+
}
84+
}

0 commit comments

Comments
 (0)