Skip to content

Commit 065712b

Browse files
committed
fix
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent afb47eb commit 065712b

File tree

4 files changed

+32
-16
lines changed

4 files changed

+32
-16
lines changed

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ impl SortPipelineBuilder {
216216
fn build_range_shuffle_sort_pipeline(self, pipeline: &mut Pipeline) -> Result<()> {
217217
let inputs = pipeline.output_len();
218218
let settings = self.ctx.get_settings();
219-
let max_threads = settings.get_max_threads()? as usize;
219+
let num_exec = inputs;
220220
let max_block_size = settings.get_max_block_size()? as usize;
221221

222222
// Partial sort
@@ -258,12 +258,11 @@ impl SortPipelineBuilder {
258258
Ok(ProcessorPtr::create(builder.build_collect(input, output)?))
259259
})?;
260260

261-
let state =
262-
SortSampleState::new(inputs, max_threads, builder.inner_schema(), max_block_size);
261+
let state = SortSampleState::new(inputs, num_exec, builder.inner_schema(), max_block_size);
263262

264263
builder.add_shuffle(pipeline, state.clone())?;
265264

266-
pipeline.exchange(max_threads, Arc::new(SortRangeExchange));
265+
pipeline.exchange(num_exec, Arc::new(SortRangeExchange));
267266

268267
pipeline.add_transform(|input, output| {
269268
Ok(ProcessorPtr::create(builder.build_combine(input, output)?))

src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ where
435435

436436
if memory_rows > 0 && memory_rows + input > max {
437437
spill_sort
438-
.subsequent_spill_last(memory_rows + input - max)
438+
.collect_spill_last(memory_rows + input - max)
439439
.await?;
440440
}
441441
if input > max || finished && input > 0 {

src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub struct TransformSortCollect<A: SortAlgorithm, C> {
5151
output: Arc<OutputPort>,
5252
output_data: Option<DataBlock>,
5353

54+
max_block_size: usize,
5455
row_converter: C,
5556
sort_desc: Arc<[SortColumnDescription]>,
5657
/// If this transform is after an Exchange transform,
@@ -102,6 +103,7 @@ where
102103
inner,
103104
aborting: AtomicBool::new(false),
104105
memory_settings,
106+
max_block_size,
105107
})
106108
}
107109

@@ -117,12 +119,19 @@ where
117119
Ok((rows, block))
118120
}
119121

120-
fn limit_trans_to_spill(&mut self) -> Result<()> {
122+
fn limit_trans_to_spill(&mut self, no_spill: bool) -> Result<()> {
121123
let Inner::Limit(merger) = &self.inner else {
122124
unreachable!()
123125
};
124126
assert!(merger.num_rows() > 0);
125-
let params = self.determine_params(merger.num_bytes(), merger.num_rows());
127+
let params = if no_spill {
128+
SortSpillParams {
129+
batch_rows: self.max_block_size,
130+
num_merge: merger.num_rows().div_ceil(self.max_block_size),
131+
}
132+
} else {
133+
self.determine_params(merger.num_bytes(), merger.num_rows())
134+
};
126135
let Inner::Limit(merger) = &mut self.inner else {
127136
unreachable!()
128137
};
@@ -132,25 +141,32 @@ where
132141
Ok(())
133142
}
134143

135-
fn collect_trans_to_spill(&mut self, input_data: Vec<DataBlock>) {
144+
fn collect_trans_to_spill(&mut self, input_data: Vec<DataBlock>, no_spill: bool) {
136145
let (num_rows, num_bytes) = input_data
137146
.iter()
138147
.map(|block| (block.num_rows(), block.memory_size()))
139148
.fold((0, 0), |(acc_rows, acc_bytes), (rows, bytes)| {
140149
(acc_rows + rows, acc_bytes + bytes)
141150
});
142151
assert!(num_rows > 0);
143-
let params = self.determine_params(num_bytes, num_rows);
152+
let params = if no_spill {
153+
SortSpillParams {
154+
batch_rows: self.max_block_size,
155+
num_merge: num_rows.div_ceil(self.max_block_size),
156+
}
157+
} else {
158+
self.determine_params(num_bytes, num_rows)
159+
};
144160
let spill_sort = SortSpill::new(self.base.clone(), params);
145161
self.inner = Inner::Spill(input_data, spill_sort);
146162
}
147163

148-
fn trans_to_spill(&mut self) -> Result<()> {
164+
fn trans_to_spill(&mut self, no_spill: bool) -> Result<()> {
149165
match &mut self.inner {
150-
Inner::Limit(_) => self.limit_trans_to_spill(),
166+
Inner::Limit(_) => self.limit_trans_to_spill(no_spill),
151167
Inner::Collect(input_data) => {
152168
let input_data = std::mem::take(input_data);
153-
self.collect_trans_to_spill(input_data);
169+
self.collect_trans_to_spill(input_data, no_spill);
154170
Ok(())
155171
}
156172
Inner::Spill(_, _) => Ok(()),
@@ -322,18 +338,19 @@ where
322338
#[async_backtrace::framed]
323339
async fn async_process(&mut self) -> Result<()> {
324340
let finished = self.input.is_finished();
325-
self.trans_to_spill()?;
341+
self.trans_to_spill(finished)?;
326342

327-
let input = self.input_rows();
328343
let Inner::Spill(input_data, spill_sort) = &mut self.inner else {
329344
unreachable!()
330345
};
346+
347+
let input = input_data.in_memory_rows();
331348
let memory_rows = spill_sort.collect_memory_rows();
332349
let max = spill_sort.max_rows();
333350

334351
if memory_rows > 0 && memory_rows + input > max {
335352
spill_sort
336-
.subsequent_spill_last(memory_rows + input - max)
353+
.collect_spill_last(memory_rows + input - max)
337354
.await?;
338355
}
339356
if input > max || finished && input > 0 {

src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ where A: SortAlgorithm
136136
.await
137137
}
138138

139-
pub async fn subsequent_spill_last(&mut self, target_rows: usize) -> Result<()> {
139+
pub async fn collect_spill_last(&mut self, target_rows: usize) -> Result<()> {
140140
let Step::Collect(collect) = &mut self.step else {
141141
unreachable!()
142142
};

0 commit comments

Comments
 (0)