Skip to content

Commit 476ffde

Browse files
committed
fix
1 parent 3670ee1 commit 476ffde

File tree

8 files changed

+123
-108
lines changed

8 files changed

+123
-108
lines changed

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 85 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ use crate::spillers::SpillerType;
5151

5252
impl PipelineBuilder {
5353
pub(crate) fn build_sort(&mut self, sort: &Sort) -> Result<()> {
54-
let plan_schema = sort.output_schema()?;
54+
let output_schema = sort.output_schema()?;
5555
let sort_desc = sort
5656
.order_by
5757
.iter()
5858
.map(|desc| {
59-
let offset = plan_schema.index_of(&desc.order_by.to_string())?;
59+
let offset = output_schema.index_of(&desc.order_by.to_string())?;
6060
Ok(SortColumnDescription {
6161
offset,
6262
asc: desc.asc,
@@ -103,7 +103,7 @@ impl PipelineBuilder {
103103

104104
let builder = SortPipelineBuilder::create(
105105
self.ctx.clone(),
106-
plan_schema,
106+
output_schema,
107107
sort_desc,
108108
sort.broadcast_id,
109109
)?
@@ -155,7 +155,9 @@ impl PipelineBuilder {
155155
self.build_pipeline(&sort.input)?;
156156
}
157157

158-
builder.build_bounded_merge_sort(&mut self.main_pipeline)
158+
builder
159+
.remove_order_col_at_last()
160+
.build_bounded_merge_sort(&mut self.main_pipeline)
159161
}
160162
SortStep::Route => TransformSortBuilder::add_route(&mut self.main_pipeline),
161163
}
@@ -164,7 +166,7 @@ impl PipelineBuilder {
164166

165167
pub struct SortPipelineBuilder {
166168
ctx: Arc<QueryContext>,
167-
schema: DataSchemaRef,
169+
output_schema: DataSchemaRef,
168170
sort_desc: Arc<[SortColumnDescription]>,
169171
limit: Option<usize>,
170172
block_size: usize,
@@ -176,7 +178,7 @@ pub struct SortPipelineBuilder {
176178
impl SortPipelineBuilder {
177179
pub fn create(
178180
ctx: Arc<QueryContext>,
179-
schema: DataSchemaRef,
181+
output_schema: DataSchemaRef,
180182
sort_desc: Arc<[SortColumnDescription]>,
181183
broadcast_id: Option<u32>,
182184
) -> Result<Self> {
@@ -185,7 +187,7 @@ impl SortPipelineBuilder {
185187
let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?;
186188
Ok(Self {
187189
ctx,
188-
schema,
190+
output_schema,
189191
sort_desc,
190192
limit: None,
191193
block_size,
@@ -223,65 +225,6 @@ impl SortPipelineBuilder {
223225
self.build_merge_sort_pipeline(pipeline, false)
224226
}
225227

226-
fn build_sample(self, pipeline: &mut Pipeline) -> Result<()> {
227-
let settings = self.ctx.get_settings();
228-
let max_block_size = settings.get_max_block_size()? as usize;
229-
230-
// Partial sort
231-
pipeline.add_transformer(|| {
232-
TransformSortPartial::new(
233-
LimitType::from_limit_rows(self.limit),
234-
self.sort_desc.clone(),
235-
)
236-
});
237-
238-
let spiller = {
239-
let location_prefix = self.ctx.query_id_spill_prefix();
240-
let config = SpillerConfig {
241-
spiller_type: SpillerType::OrderBy,
242-
location_prefix,
243-
disk_spill: None,
244-
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
245-
};
246-
let op = DataOperator::instance().spill_operator();
247-
Arc::new(Spiller::create(self.ctx.clone(), op, config)?)
248-
};
249-
250-
let memory_settings = MemorySettings::from_sort_settings(&self.ctx)?;
251-
let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?;
252-
253-
let builder =
254-
TransformSortBuilder::new(self.schema.clone(), self.sort_desc.clone(), max_block_size)
255-
.with_spiller(spiller)
256-
.with_limit(self.limit)
257-
.with_order_column(false, true)
258-
.with_memory_settings(memory_settings)
259-
.with_enable_loser_tree(enable_loser_tree);
260-
261-
pipeline.add_transform(|input, output| {
262-
Ok(ProcessorPtr::create(builder.build_collect(input, output)?))
263-
})?;
264-
265-
builder.add_bound_broadcast(
266-
pipeline,
267-
max_block_size,
268-
self.ctx.clone(),
269-
self.broadcast_id.unwrap(),
270-
)?;
271-
272-
pipeline.add_transform(|input, output| {
273-
Ok(ProcessorPtr::create(builder.build_restore(input, output)?))
274-
})?;
275-
276-
pipeline.add_transform(|input, output| {
277-
Ok(ProcessorPtr::create(
278-
builder.build_bound_edge(input, output)?,
279-
))
280-
})?;
281-
282-
Ok(())
283-
}
284-
285228
fn build_merge_sort(&self, pipeline: &mut Pipeline, order_col_generated: bool) -> Result<()> {
286229
// Merge sort
287230
let need_multi_merge = pipeline.output_len() > 1;
@@ -293,8 +236,8 @@ impl SortPipelineBuilder {
293236

294237
let memory_settings = MemorySettings::from_sort_settings(&self.ctx)?;
295238
let sort_merge_output_schema = match output_order_col {
296-
true => add_order_field(self.schema.clone(), &self.sort_desc),
297-
false => self.schema.clone(),
239+
true => add_order_field(self.output_schema.clone(), &self.sort_desc),
240+
false => self.output_schema.clone(),
298241
};
299242

300243
let settings = self.ctx.get_settings();
@@ -336,7 +279,7 @@ impl SortPipelineBuilder {
336279
})
337280
}
338281

339-
pub fn build_merge_sort_pipeline(
282+
fn build_merge_sort_pipeline(
340283
self,
341284
pipeline: &mut Pipeline,
342285
order_col_generated: bool,
@@ -358,7 +301,7 @@ impl SortPipelineBuilder {
358301
let max_threads = settings.get_max_threads()? as usize;
359302
add_k_way_merge_sort(
360303
pipeline,
361-
self.schema.clone(),
304+
self.output_schema.clone(),
362305
max_threads,
363306
self.block_size,
364307
self.limit,
@@ -369,7 +312,7 @@ impl SortPipelineBuilder {
369312
} else {
370313
try_add_multi_sort_merge(
371314
pipeline,
372-
self.schema.clone(),
315+
self.output_schema.clone(),
373316
self.block_size,
374317
self.limit,
375318
self.sort_desc,
@@ -379,12 +322,77 @@ impl SortPipelineBuilder {
379322
}
380323
}
381324

382-
pub fn build_bounded_merge_sort(self, pipeline: &mut Pipeline) -> Result<()> {
383-
let builder =
384-
TransformSortBuilder::new(self.schema.clone(), self.sort_desc.clone(), self.block_size)
385-
.with_limit(self.limit)
386-
.with_order_column(true, !self.remove_order_col_at_last)
387-
.with_enable_loser_tree(self.enable_loser_tree);
325+
fn build_sample(self, pipeline: &mut Pipeline) -> Result<()> {
326+
let settings = self.ctx.get_settings();
327+
let max_block_size = settings.get_max_block_size()? as usize;
328+
329+
// Partial sort
330+
pipeline.add_transformer(|| {
331+
TransformSortPartial::new(
332+
LimitType::from_limit_rows(self.limit),
333+
self.sort_desc.clone(),
334+
)
335+
});
336+
337+
let spiller = {
338+
let location_prefix = self.ctx.query_id_spill_prefix();
339+
let config = SpillerConfig {
340+
spiller_type: SpillerType::OrderBy,
341+
location_prefix,
342+
disk_spill: None,
343+
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
344+
};
345+
let op = DataOperator::instance().spill_operator();
346+
Arc::new(Spiller::create(self.ctx.clone(), op, config)?)
347+
};
348+
349+
let memory_settings = MemorySettings::from_sort_settings(&self.ctx)?;
350+
let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?;
351+
352+
let builder = TransformSortBuilder::new(
353+
self.output_schema.clone(),
354+
self.sort_desc.clone(),
355+
max_block_size,
356+
)
357+
.with_spiller(spiller)
358+
.with_limit(self.limit)
359+
.with_order_column(false, true)
360+
.with_memory_settings(memory_settings)
361+
.with_enable_loser_tree(enable_loser_tree);
362+
363+
pipeline.add_transform(|input, output| {
364+
Ok(ProcessorPtr::create(builder.build_collect(input, output)?))
365+
})?;
366+
367+
builder.add_bound_broadcast(
368+
pipeline,
369+
max_block_size,
370+
self.ctx.clone(),
371+
self.broadcast_id.unwrap(),
372+
)?;
373+
374+
pipeline.add_transform(|input, output| {
375+
Ok(ProcessorPtr::create(builder.build_restore(input, output)?))
376+
})?;
377+
378+
pipeline.add_transform(|input, output| {
379+
Ok(ProcessorPtr::create(
380+
builder.build_bound_edge(input, output)?,
381+
))
382+
})?;
383+
384+
Ok(())
385+
}
386+
387+
fn build_bounded_merge_sort(self, pipeline: &mut Pipeline) -> Result<()> {
388+
let builder = TransformSortBuilder::new(
389+
self.output_schema.clone(),
390+
self.sort_desc.clone(),
391+
self.block_size,
392+
)
393+
.with_limit(self.limit)
394+
.with_order_column(true, !self.remove_order_col_at_last)
395+
.with_enable_loser_tree(self.enable_loser_tree);
388396

389397
let inputs_port: Vec<_> = (0..pipeline.output_len())
390398
.map(|_| InputPort::create())

src/query/service/src/pipelines/processors/transforms/sort/bounds.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ pub struct Bounds(
3131

3232
impl Bounds {
3333
pub fn new_unchecked(column: Column) -> Bounds {
34+
if column.len() == 0 {
35+
return Self::default();
36+
}
3437
Bounds(vec![column])
3538
}
3639

@@ -49,7 +52,6 @@ impl Bounds {
4952
}
5053

5154
pub fn merge<R: Rows>(mut vector: Vec<Bounds>, batch_rows: usize) -> Result<Self> {
52-
debug_assert!(vector.iter().all(|bounds| !bounds.is_empty()));
5355
match vector.len() {
5456
0 => Ok(Bounds(vec![])),
5557
1 => Ok(vector.pop().unwrap()),
@@ -68,7 +70,7 @@ impl Bounds {
6870
blocks
6971
.iter()
7072
.rev()
71-
.map(|b| b.get_last_column().clone())
73+
.map(|b| b.get_by_offset(0).to_column())
7274
.collect(),
7375
))
7476
}
@@ -136,6 +138,7 @@ impl Bounds {
136138
)]))
137139
}
138140

141+
#[allow(dead_code)]
139142
pub fn dedup_reduce<R: Rows>(&self, n: usize) -> Self {
140143
if n == 0 {
141144
return Self::default();
@@ -178,11 +181,8 @@ impl Bounds {
178181
last = Some((cur_rows, r_idx));
179182
}
180183

181-
Bounds(vec![Column::take_column_indices(
182-
&self.0,
183-
&indices,
184-
indices.len(),
185-
)])
184+
let col = Column::take_column_indices(&self.0, &indices, indices.len());
185+
Bounds::new_unchecked(col)
186186
}
187187

188188
pub fn dedup<R: Rows>(&self) -> Self {

src/query/service/src/pipelines/processors/transforms/sort/sort_broadcast.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,7 @@ impl<R: Rows + 'static> HookTransform for TransformSortBoundBroadcast<R> {
138138
let bounds = Bounds::merge::<R>(
139139
self.input_data
140140
.iter_mut()
141-
.filter_map(|meta| {
142-
(!meta.bounds.is_empty()).then(|| std::mem::take(&mut meta.bounds))
143-
})
141+
.map(|meta| std::mem::take(&mut meta.bounds))
144142
.collect(),
145143
self.state.batch_rows,
146144
)?;
@@ -163,42 +161,43 @@ impl<R: Rows + 'static> HookTransform for TransformSortBoundBroadcast<R> {
163161
sequences,
164162
};
165163

164+
let bounds = local.normalize_bounds();
166165
let global = self
167166
.state
168-
.commit_sample(Some(SortExchangeMeta {
169-
params,
170-
bounds: local.normalize_bounds::<R>(),
171-
}))
167+
.commit_sample(Some(SortExchangeMeta { params, bounds }))
172168
.await?;
173169

174170
let bounds_vec = global
175171
.into_iter()
176-
.filter_map(|meta| (!meta.bounds.is_empty()).then_some(meta.bounds))
172+
.map(|meta| {
173+
assert!(!meta.bounds.is_empty());
174+
meta.bounds.clone()
175+
})
177176
.collect();
178-
self.output_data = Some(SortCollectedMeta {
179-
bounds: Bounds::merge::<R>(bounds_vec, self.state.batch_rows)?.dedup::<R>(),
180-
..local
181-
});
177+
178+
let bounds = Bounds::merge::<R>(bounds_vec, self.state.batch_rows)?.dedup::<R>();
179+
log::debug!("global_bounds.len: {}", bounds.len());
180+
self.output_data = Some(SortCollectedMeta { bounds, ..local });
182181
Ok(())
183182
}
184183
}
185184

186185
impl SortCollectedMeta {
187-
fn normalize_bounds<R: Rows>(&self) -> Bounds {
186+
fn normalize_bounds(&self) -> Bounds {
188187
if self.bounds.len() > 1 {
189-
return self.bounds.dedup::<R>();
188+
return self.bounds.clone();
190189
}
191190

192191
let Some(seq) = self.sequences.get(self.sequences.len() / 2) else {
193-
return Bounds::default();
192+
unreachable!()
194193
};
195194

196195
seq.get(seq.len() / 2)
197196
.map(|block| match block.domain.len() {
198197
0 => Bounds::default(),
199198
1 => Bounds::new_unchecked(block.domain.clone()),
200-
_ => Bounds::new_unchecked(block.domain.slice(0..1)).dedup::<R>(),
199+
_ => Bounds::new_unchecked(block.domain.slice(0..1)),
201200
})
202-
.unwrap_or_default()
201+
.unwrap()
203202
}
204203
}

src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,8 @@ impl Build<'_> {
355355
Ok(Box::new(BoundedMultiSortMergeProcessor::<A>::new(
356356
inputs,
357357
self.output.clone(),
358-
self.schema().clone(),
358+
self.params.schema.clone(),
359359
self.params.block_size,
360-
!self.params.output_order_col,
361360
)?))
362361
}
363362
}

0 commit comments

Comments
 (0)