Skip to content

Commit 5c408b8

Browse files
authored
chore: recluster disable sort spill (#15490)
recluster disable sort spill
1 parent 185a47b commit 5c408b8

File tree

10 files changed

+37
-3
lines changed

10 files changed

+37
-3
lines changed

src/query/catalog/src/table_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ pub trait TableContext: Send + Sync {
157157
fn set_cacheable(&self, cacheable: bool);
158158
fn get_can_scan_from_agg_index(&self) -> bool;
159159
fn set_can_scan_from_agg_index(&self, enable: bool);
160+
fn get_enable_sort_spill(&self) -> bool;
161+
fn set_enable_sort_spill(&self, enable: bool);
160162
fn set_compaction_num_block_hint(&self, hint: u64);
161163
fn get_compaction_num_block_hint(&self) -> u64;
162164

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ impl PipelineBuilder {
170170
})
171171
.collect();
172172

173+
self.ctx.set_enable_sort_spill(false);
173174
let sort_pipeline_builder =
174175
SortPipelineBuilder::create(self.ctx.clone(), schema, Arc::new(sort_descs))
175176
.with_partial_block_size(partial_block_size)

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ impl SortPipelineBuilder {
210210
}
211211

212212
fn get_memory_settings(&self, num_threads: usize) -> Result<(usize, usize)> {
213+
let enable_sort_spill = self.ctx.get_enable_sort_spill();
214+
if !enable_sort_spill {
215+
return Ok((0, 0));
216+
}
217+
213218
let settings = self.ctx.get_settings();
214219
let memory_ratio = settings.get_sort_spilling_memory_ratio()?;
215220
let bytes_limit_per_proc = settings.get_sort_spilling_bytes_threshold_per_proc()?;

src/query/service/src/sessions/query_ctx.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,16 @@ impl TableContext for QueryContext {
490490
.store(enable, Ordering::Release);
491491
}
492492

493+
fn get_enable_sort_spill(&self) -> bool {
494+
self.shared.enable_sort_spill.load(Ordering::Acquire)
495+
}
496+
497+
fn set_enable_sort_spill(&self, enable: bool) {
498+
self.shared
499+
.enable_sort_spill
500+
.store(enable, Ordering::Release);
501+
}
502+
493503
// get a hint at the number of blocks that need to be compacted.
494504
fn get_compaction_num_block_hint(&self) -> u64 {
495505
self.shared

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ pub struct QueryContextShared {
111111
pub(in crate::sessions) cacheable: Arc<AtomicBool>,
112112
pub(in crate::sessions) can_scan_from_agg_index: Arc<AtomicBool>,
113113
pub(in crate::sessions) num_fragmented_block_hint: Arc<AtomicU64>,
114+
pub(in crate::sessions) enable_sort_spill: Arc<AtomicBool>,
114115
// Status info.
115116
pub(in crate::sessions) status: Arc<RwLock<String>>,
116117

@@ -166,6 +167,7 @@ impl QueryContextShared {
166167
cacheable: Arc::new(AtomicBool::new(true)),
167168
can_scan_from_agg_index: Arc::new(AtomicBool::new(true)),
168169
num_fragmented_block_hint: Arc::new(AtomicU64::new(0)),
170+
enable_sort_spill: Arc::new(AtomicBool::new(true)),
169171
status: Arc::new(RwLock::new("null".to_string())),
170172
user_agent: Arc::new(RwLock::new("null".to_string())),
171173
materialized_cte_tables: Arc::new(Default::default()),

src/query/service/tests/it/sql/exec/get_table_bind_test.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,13 @@ impl TableContext for CtxDelegation {
548548
todo!()
549549
}
550550

551+
fn get_enable_sort_spill(&self) -> bool {
552+
todo!()
553+
}
554+
fn set_enable_sort_spill(&self, _enable: bool) {
555+
todo!()
556+
}
557+
551558
fn attach_query_str(&self, _kind: QueryKind, _query: String) {}
552559

553560
fn get_query_str(&self) -> String {

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,13 @@ impl TableContext for CtxDelegation {
489489
todo!()
490490
}
491491

492+
fn get_enable_sort_spill(&self) -> bool {
493+
todo!()
494+
}
495+
fn set_enable_sort_spill(&self, _enable: bool) {
496+
todo!()
497+
}
498+
492499
fn attach_query_str(&self, _kind: QueryKind, _query: String) {}
493500

494501
fn get_query_str(&self) -> String {

src/query/settings/src/settings_default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ impl DefaultSettings {
808808
let max_memory_usage = Self::max_memory_usage()?;
809809
// The sort merge consumes more than twice as much memory,
810810
// so the block size is set relatively conservatively here.
811-
let recluster_block_size = max_memory_usage * 35 / 100;
811+
let recluster_block_size = max_memory_usage * 32 / 100;
812812
Ok(recluster_block_size)
813813
}
814814

src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl ReclusterMutator {
108108

109109
let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?;
110110
let recluster_block_size = self.ctx.get_settings().get_recluster_block_size()? as usize;
111-
let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 40 / 100);
111+
let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 35 / 100);
112112

113113
let max_blocks_num = std::cmp::max(
114114
memory_threshold / self.block_thresholds.max_bytes_per_block,

tests/suites/1_stateful/02_query/02_0006_set_priority.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
)
2020

2121
with NativeClient(name="client1>") as client1:
22-
# TODO: Enable this test after enable new queries executor
22+
# TODO: Enable this test after enable new queries executor
2323
client1.expect(prompt)
2424
# client1.expect("")
2525
#

0 commit comments

Comments
 (0)