Skip to content

Commit 2ab9ced

Browse files
authored
fix(query): sort will still be OOM even if spill is enabled (#17883)
* spilling_to_disk_bytes_limit * fix spill Signed-off-by: coldWater <forsaken628@gmail.com> --------- Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent 7fe578d commit 2ab9ced

File tree

8 files changed

+66
-44
lines changed

8 files changed

+66
-44
lines changed

src/query/service/src/pipelines/builders/builder_hilbert_partition.rs

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ use databend_common_storages_fuse::statistics::ClusterStatsGenerator;
2727
use databend_common_storages_fuse::FuseTable;
2828
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
2929
use databend_storages_common_cache::TempDirManager;
30-
use opendal::services::Fs;
31-
use opendal::Operator;
3230

3331
use crate::pipelines::memory_settings::MemorySettingsExt;
3432
use crate::pipelines::processors::transforms::CompactStrategy;
@@ -56,21 +54,10 @@ impl PipelineBuilder {
5654
let temp_dir_manager = TempDirManager::instance();
5755

5856
let enable_dio = settings.get_enable_dio()?;
59-
let disk_spill =
60-
match temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()) {
61-
Some(temp_dir) if !enable_dio => {
62-
let builder = Fs::default().root(temp_dir.path().to_str().unwrap());
63-
Some(SpillerDiskConfig {
64-
temp_dir,
65-
local_operator: Some(Operator::new(builder)?.finish()),
66-
})
67-
}
68-
Some(temp_dir) => Some(SpillerDiskConfig {
69-
temp_dir,
70-
local_operator: None,
71-
}),
72-
None => None,
73-
};
57+
let disk_spill = temp_dir_manager
58+
.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id())
59+
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
60+
.transpose()?;
7461

7562
let window_spill_settings = MemorySettings::from_window_settings(&self.ctx)?;
7663
let processor_id = AtomicUsize::new(0);

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ use databend_common_sql::evaluator::CompoundBlockOperator;
3131
use databend_common_sql::executor::physical_plans::Sort;
3232
use databend_common_storage::DataOperator;
3333
use databend_common_storages_fuse::TableContext;
34+
use databend_storages_common_cache::TempDirManager;
3435

3536
use crate::pipelines::memory_settings::MemorySettingsExt;
3637
use crate::pipelines::processors::transforms::TransformSortBuilder;
3738
use crate::pipelines::PipelineBuilder;
3839
use crate::sessions::QueryContext;
3940
use crate::spillers::Spiller;
4041
use crate::spillers::SpillerConfig;
42+
use crate::spillers::SpillerDiskConfig;
4143
use crate::spillers::SpillerType;
4244

4345
impl PipelineBuilder {
@@ -220,11 +222,19 @@ impl SortPipelineBuilder {
220222
let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?;
221223

222224
let spiller = {
225+
let temp_dir_manager = TempDirManager::instance();
226+
let disk_bytes_limit = settings.get_sort_spilling_to_disk_bytes_limit()?;
227+
let enable_dio = settings.get_enable_dio()?;
228+
let disk_spill = temp_dir_manager
229+
.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id())
230+
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
231+
.transpose()?;
232+
223233
let location_prefix = self.ctx.query_id_spill_prefix();
224234
let config = SpillerConfig {
225235
spiller_type: SpillerType::OrderBy,
226236
location_prefix,
227-
disk_spill: None,
237+
disk_spill,
228238
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
229239
};
230240
let op = DataOperator::instance().spill_operator();

src/query/service/src/pipelines/builders/builder_window.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ use databend_common_pipeline_transforms::MemorySettings;
2727
use databend_common_sql::executor::physical_plans::Window;
2828
use databend_common_sql::executor::physical_plans::WindowPartition;
2929
use databend_storages_common_cache::TempDirManager;
30-
use opendal::services::Fs;
31-
use opendal::Operator;
3230

3331
use crate::pipelines::memory_settings::MemorySettingsExt;
3432
use crate::pipelines::processors::transforms::FrameBound;
@@ -192,25 +190,13 @@ impl PipelineBuilder {
192190
);
193191
}
194192

195-
let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?;
196193
let temp_dir_manager = TempDirManager::instance();
197-
194+
let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?;
198195
let enable_dio = settings.get_enable_dio()?;
199-
let disk_spill =
200-
match temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()) {
201-
Some(temp_dir) if !enable_dio => {
202-
let builder = Fs::default().root(temp_dir.path().to_str().unwrap());
203-
Some(SpillerDiskConfig {
204-
temp_dir,
205-
local_operator: Some(Operator::new(builder)?.finish()),
206-
})
207-
}
208-
Some(temp_dir) => Some(SpillerDiskConfig {
209-
temp_dir,
210-
local_operator: None,
211-
}),
212-
None => None,
213-
};
196+
let disk_spill = temp_dir_manager
197+
.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id())
198+
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
199+
.transpose()?;
214200

215201
let have_order_col = window_partition.after_exchange.unwrap_or(false);
216202
let window_spill_settings = MemorySettings::from_window_settings(&self.ctx)?;

src/query/service/src/pipelines/processors/transforms/transform_merge_sort.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ where
199199
let unit_size = self.memory_settings.spill_unit_size;
200200
let num_merge = bytes.div_ceil(unit_size).max(2);
201201
let batch_rows = rows.div_ceil(num_merge);
202+
203+
/// The memory will be doubled during merging.
204+
const MERGE_RATIO: usize = 2;
205+
let num_merge = num_merge.div_ceil(MERGE_RATIO).max(2);
202206
log::info!("determine sort spill params, buffer_bytes: {bytes}, buffer_rows: {rows}, spill_unit_size: {unit_size}, batch_rows: {batch_rows}, batch_num_merge {num_merge}");
203207
SortSpillParams {
204208
batch_rows,
@@ -459,7 +463,9 @@ where
459463
.await?;
460464
}
461465
if input > max || finished && input > 0 {
462-
spill_sort.sort_input_data(std::mem::take(input_data), &self.aborting)?;
466+
spill_sort
467+
.sort_input_data(std::mem::take(input_data), &self.aborting)
468+
.await?;
463469
}
464470
if finished {
465471
self.state = State::Sort;

src/query/service/src/pipelines/processors/transforms/transform_merge_sort/sort_spill.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,17 @@ where A: SortAlgorithm
9292
Self { base, step }
9393
}
9494

95-
pub fn sort_input_data(
95+
pub async fn sort_input_data(
9696
&mut self,
9797
input_data: Vec<DataBlock>,
9898
aborting: &AtomicBool,
9999
) -> Result<()> {
100100
let Step::Collect(collect) = &mut self.step else {
101101
unreachable!()
102102
};
103-
collect.sort_input_data(&self.base, input_data, aborting)
103+
collect
104+
.sort_input_data(&self.base, input_data, aborting)
105+
.await
104106
}
105107

106108
pub async fn subsequent_spill_last(&mut self, target_rows: usize) -> Result<()> {
@@ -158,7 +160,7 @@ where A: SortAlgorithm
158160
}
159161

160162
impl<A: SortAlgorithm> StepCollect<A> {
161-
fn sort_input_data(
163+
async fn sort_input_data(
162164
&mut self,
163165
base: &Base,
164166
mut input_data: Vec<DataBlock>,
@@ -191,7 +193,11 @@ impl<A: SortAlgorithm> StepCollect<A> {
191193
));
192194
}
193195

194-
sorted.push_back(base.new_block(data));
196+
let mut block = base.new_block(data);
197+
if !sorted.is_empty() {
198+
block.spill(&base.spiller).await?;
199+
}
200+
sorted.push_back(block);
195201
}
196202
debug_assert!(merger.is_finished());
197203
sorted

src/query/service/src/spillers/spiller.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use databend_common_exception::Result;
3333
use databend_common_expression::DataBlock;
3434
use databend_storages_common_cache::TempDir;
3535
use databend_storages_common_cache::TempPath;
36+
use opendal::services::Fs;
3637
use opendal::Buffer;
3738
use opendal::Operator;
3839
use parking_lot::RwLock;
@@ -73,8 +74,23 @@ pub struct SpillerConfig {
7374

7475
#[derive(Clone)]
7576
pub struct SpillerDiskConfig {
76-
pub temp_dir: Arc<TempDir>,
77-
pub local_operator: Option<Operator>,
77+
temp_dir: Arc<TempDir>,
78+
local_operator: Option<Operator>,
79+
}
80+
81+
impl SpillerDiskConfig {
82+
pub fn new(temp_dir: Arc<TempDir>, enable_dio: bool) -> Result<SpillerDiskConfig> {
83+
let local_operator = if !enable_dio {
84+
let builder = Fs::default().root(temp_dir.path().to_str().unwrap());
85+
Some(Operator::new(builder)?.finish())
86+
} else {
87+
None
88+
};
89+
Ok(SpillerDiskConfig {
90+
temp_dir,
91+
local_operator,
92+
})
93+
}
7894
}
7995

8096
/// Spiller is a unified framework for operators which need to spill data from memory.

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,13 @@ impl DefaultSettings {
605605
scope: SettingScope::Both,
606606
range: Some(SettingRange::Numeric(0..=100)),
607607
}),
608+
("sort_spilling_to_disk_bytes_limit", DefaultSettingValue {
609+
value: UserSettingValue::UInt64(0),
610+
desc: "Sets the maximum amount of local disk in bytes that sorter can use before spilling data to storage during one query execution.",
611+
mode: SettingMode::Both,
612+
scope: SettingScope::Both,
613+
range: Some(SettingRange::Numeric(0..=u64::MAX)),
614+
}),
608615
("sort_spilling_batch_bytes", DefaultSettingValue {
609616
value: UserSettingValue::UInt64(8 * 1024 * 1024),
610617
desc: "Sets the uncompressed size that merge sorter will spill to storage",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,10 @@ impl Settings {
490490
Ok(self.try_get_u64("sort_spilling_memory_ratio")? as usize)
491491
}
492492

493+
pub fn get_sort_spilling_to_disk_bytes_limit(&self) -> Result<usize> {
494+
Ok(self.try_get_u64("sort_spilling_to_disk_bytes_limit")? as usize)
495+
}
496+
493497
pub fn get_group_by_shuffle_mode(&self) -> Result<String> {
494498
self.try_get_string("group_by_shuffle_mode")
495499
}

0 commit comments

Comments
 (0)