Skip to content

Commit 383f506

Browse files
committed
builder
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent bfa0ed6 commit 383f506

File tree

5 files changed

+170
-115
lines changed

5 files changed

+170
-115
lines changed

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,6 @@ impl SortPipelineBuilder {
300300

301301
pipeline.add_transform(|input, output| {
302302
let builder = TransformSortBuilder::create(
303-
input,
304-
output,
305303
sort_merge_output_schema.clone(),
306304
self.sort_desc.clone(),
307305
self.block_size,
@@ -313,7 +311,7 @@ impl SortPipelineBuilder {
313311
.with_memory_settings(memory_settings.clone())
314312
.with_enable_loser_tree(enable_loser_tree);
315313

316-
Ok(ProcessorPtr::create(builder.build()?))
314+
Ok(ProcessorPtr::create(builder.build(input, output)?))
317315
})
318316
}
319317

src/query/service/src/pipelines/processors/transforms/sort/builder.rs

Lines changed: 162 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,18 @@ use databend_common_pipeline_transforms::MemorySettings;
3434
use super::collect::TransformSortCollect;
3535
use super::execute::TransformSortExecute;
3636
use super::merge_sort::TransformSort;
37+
use super::shuffle::SortSampleState;
38+
use super::shuffle::TransformSortShuffle;
3739
use crate::spillers::Spiller;
3840

3941
enum SortType {
4042
Sort,
4143
Collect,
4244
Execute,
45+
Shuffle,
4346
}
4447

4548
pub struct TransformSortBuilder {
46-
input: Arc<InputPort>,
47-
output: Arc<OutputPort>,
4849
schema: DataSchemaRef,
4950
block_size: usize,
5051
sort_desc: Arc<[SortColumnDescription]>,
@@ -54,22 +55,16 @@ pub struct TransformSortBuilder {
5455
spiller: Arc<Spiller>,
5556
enable_loser_tree: bool,
5657
limit: Option<usize>,
57-
processor: Option<Result<Box<dyn Processor>>>,
58-
typ: SortType,
5958
}
6059

6160
impl TransformSortBuilder {
6261
pub fn create(
63-
input: Arc<InputPort>,
64-
output: Arc<OutputPort>,
6562
schema: DataSchemaRef,
6663
sort_desc: Arc<[SortColumnDescription]>,
6764
block_size: usize,
6865
spiller: Arc<Spiller>,
6966
) -> Self {
70-
Self {
71-
input,
72-
output,
67+
TransformSortBuilder {
7368
block_size,
7469
schema,
7570
sort_desc,
@@ -79,8 +74,6 @@ impl TransformSortBuilder {
7974
enable_loser_tree: false,
8075
limit: None,
8176
memory_settings: MemorySettings::disable_spill(),
82-
processor: None,
83-
typ: SortType::Sort,
8477
}
8578
}
8679

@@ -109,34 +102,133 @@ impl TransformSortBuilder {
109102
self
110103
}
111104

112-
pub fn build(mut self) -> Result<Box<dyn Processor>> {
113-
debug_assert!(if self.output_order_col {
105+
pub fn build(
106+
&self,
107+
input: Arc<InputPort>,
108+
output: Arc<OutputPort>,
109+
) -> Result<Box<dyn Processor>> {
110+
self.check();
111+
112+
let mut build = Build {
113+
params: self,
114+
input,
115+
output,
116+
processor: None,
117+
typ: SortType::Sort,
118+
id: 0,
119+
state: None,
120+
};
121+
122+
select_row_type(&mut build);
123+
build.processor.unwrap()
124+
}
125+
126+
pub fn build_collect(
127+
&self,
128+
input: Arc<InputPort>,
129+
output: Arc<OutputPort>,
130+
) -> Result<Box<dyn Processor>> {
131+
self.check();
132+
133+
let mut build = Build {
134+
params: self,
135+
input,
136+
output,
137+
processor: None,
138+
typ: SortType::Collect,
139+
id: 0,
140+
state: None,
141+
};
142+
143+
select_row_type(&mut build);
144+
build.processor.unwrap()
145+
}
146+
147+
pub fn build_exec(
148+
&self,
149+
input: Arc<InputPort>,
150+
output: Arc<OutputPort>,
151+
) -> Result<Box<dyn Processor>> {
152+
self.check();
153+
154+
let mut build = Build {
155+
params: self,
156+
input,
157+
output,
158+
processor: None,
159+
typ: SortType::Execute,
160+
id: 0,
161+
state: None,
162+
};
163+
164+
select_row_type(&mut build);
165+
build.processor.unwrap()
166+
}
167+
168+
pub fn build_shuffle(
169+
&self,
170+
input: Arc<InputPort>,
171+
output: Arc<OutputPort>,
172+
id: usize,
173+
state: Arc<SortSampleState>,
174+
) -> Result<Box<dyn Processor>> {
175+
self.check();
176+
177+
let mut build = Build {
178+
params: self,
179+
input,
180+
output,
181+
processor: None,
182+
typ: SortType::Shuffle,
183+
id,
184+
state: Some(state),
185+
};
186+
187+
select_row_type(&mut build);
188+
build.processor.unwrap()
189+
}
190+
191+
fn should_use_sort_limit(&self) -> bool {
192+
self.limit.map(|limit| limit < 10000).unwrap_or_default()
193+
}
194+
195+
fn check(&self) {
196+
assert!(if self.output_order_col {
114197
self.schema.has_field(ORDER_COL_NAME)
115198
} else {
116199
!self.schema.has_field(ORDER_COL_NAME)
117200
});
118-
119-
select_row_type(&mut self);
120-
self.processor.unwrap()
121201
}
202+
}
122203

204+
pub struct Build<'a> {
205+
params: &'a TransformSortBuilder,
206+
typ: SortType,
207+
input: Arc<InputPort>,
208+
output: Arc<OutputPort>,
209+
processor: Option<Result<Box<dyn Processor>>>,
210+
id: usize,
211+
state: Option<Arc<SortSampleState>>,
212+
}
213+
214+
impl Build<'_> {
123215
fn build_sort<A, C>(&mut self) -> Result<Box<dyn Processor>>
124216
where
125217
A: SortAlgorithm + 'static,
126218
C: RowConverter<A::Rows> + Send + 'static,
127219
{
128-
let schema = add_order_field(self.schema.clone(), &self.sort_desc);
220+
let schema = add_order_field(self.params.schema.clone(), &self.params.sort_desc);
129221
Ok(Box::new(TransformSort::<A, C>::new(
130222
self.input.clone(),
131223
self.output.clone(),
132224
schema,
133-
self.sort_desc.clone(),
134-
self.block_size,
135-
self.limit.map(|limit| (limit, false)),
136-
self.spiller.clone(),
137-
self.output_order_col,
138-
self.order_col_generated,
139-
self.memory_settings.clone(),
225+
self.params.sort_desc.clone(),
226+
self.params.block_size,
227+
self.params.limit.map(|limit| (limit, false)),
228+
self.params.spiller.clone(),
229+
self.params.output_order_col,
230+
self.params.order_col_generated,
231+
self.params.memory_settings.clone(),
140232
)?))
141233
}
142234

@@ -145,50 +237,38 @@ impl TransformSortBuilder {
145237
A: SortAlgorithm + 'static,
146238
C: RowConverter<A::Rows> + Send + 'static,
147239
{
148-
let schema = add_order_field(self.schema.clone(), &self.sort_desc);
240+
let schema = add_order_field(self.params.schema.clone(), &self.params.sort_desc);
149241
Ok(Box::new(TransformSort::<A, C>::new(
150242
self.input.clone(),
151243
self.output.clone(),
152244
schema,
153-
self.sort_desc.clone(),
154-
self.block_size,
155-
Some((self.limit.unwrap(), true)),
156-
self.spiller.clone(),
157-
self.output_order_col,
158-
self.order_col_generated,
159-
self.memory_settings.clone(),
245+
self.params.sort_desc.clone(),
246+
self.params.block_size,
247+
Some((self.params.limit.unwrap(), true)),
248+
self.params.spiller.clone(),
249+
self.params.output_order_col,
250+
self.params.order_col_generated,
251+
self.params.memory_settings.clone(),
160252
)?))
161253
}
162254

163-
pub fn build_collect(mut self) -> Result<Box<dyn Processor>> {
164-
debug_assert!(if self.output_order_col {
165-
self.schema.has_field(ORDER_COL_NAME)
166-
} else {
167-
!self.schema.has_field(ORDER_COL_NAME)
168-
});
169-
self.typ = SortType::Collect;
170-
171-
select_row_type(&mut self);
172-
self.processor.unwrap()
173-
}
174-
175255
fn build_sort_collect<A, C>(&mut self) -> Result<Box<dyn Processor>>
176256
where
177257
A: SortAlgorithm + 'static,
178258
C: RowConverter<A::Rows> + Send + 'static,
179259
{
180-
let schema = add_order_field(self.schema.clone(), &self.sort_desc);
260+
let schema = add_order_field(self.params.schema.clone(), &self.params.sort_desc);
181261

182262
Ok(Box::new(TransformSortCollect::<A, C>::new(
183263
self.input.clone(),
184264
self.output.clone(),
185265
schema,
186-
self.sort_desc.clone(),
187-
self.block_size,
188-
self.limit.map(|limit| (limit, false)),
189-
self.spiller.clone(),
190-
self.order_col_generated,
191-
self.memory_settings.clone(),
266+
self.params.sort_desc.clone(),
267+
self.params.block_size,
268+
self.params.limit.map(|limit| (limit, false)),
269+
self.params.spiller.clone(),
270+
self.params.order_col_generated,
271+
self.params.memory_settings.clone(),
192272
)?))
193273
}
194274

@@ -197,54 +277,53 @@ impl TransformSortBuilder {
197277
A: SortAlgorithm + 'static,
198278
C: RowConverter<A::Rows> + Send + 'static,
199279
{
200-
let schema = add_order_field(self.schema.clone(), &self.sort_desc);
280+
let schema = add_order_field(self.params.schema.clone(), &self.params.sort_desc);
201281
Ok(Box::new(TransformSortCollect::<A, C>::new(
202282
self.input.clone(),
203283
self.output.clone(),
204284
schema,
205-
self.sort_desc.clone(),
206-
self.block_size,
207-
Some((self.limit.unwrap(), true)),
208-
self.spiller.clone(),
209-
self.order_col_generated,
210-
self.memory_settings.clone(),
285+
self.params.sort_desc.clone(),
286+
self.params.block_size,
287+
Some((self.params.limit.unwrap(), true)),
288+
self.params.spiller.clone(),
289+
self.params.order_col_generated,
290+
self.params.memory_settings.clone(),
211291
)?))
212292
}
213293

214-
pub fn build_exec(mut self) -> Result<Box<dyn Processor>> {
215-
debug_assert!(if self.output_order_col {
216-
self.schema.has_field(ORDER_COL_NAME)
217-
} else {
218-
!self.schema.has_field(ORDER_COL_NAME)
219-
});
220-
self.typ = SortType::Execute;
221-
222-
select_row_type(&mut self);
223-
self.processor.unwrap()
224-
}
225-
226294
fn build_sort_exec<A>(&mut self) -> Result<Box<dyn Processor>>
227295
where A: SortAlgorithm + 'static {
228-
let schema = add_order_field(self.schema.clone(), &self.sort_desc);
296+
let schema = add_order_field(self.params.schema.clone(), &self.params.sort_desc);
229297

230298
Ok(Box::new(TransformSortExecute::<A>::new(
231299
self.input.clone(),
232300
self.output.clone(),
233301
schema,
234-
self.limit,
235-
self.spiller.clone(),
236-
self.output_order_col,
302+
self.params.limit,
303+
self.params.spiller.clone(),
304+
self.params.output_order_col,
237305
)?))
238306
}
307+
308+
fn build_sort_shuffle<R>(&mut self) -> Result<Box<dyn Processor>>
309+
where R: Rows + 'static {
310+
Ok(Box::new(TransformSortShuffle::<R>::new(
311+
self.input.clone(),
312+
self.output.clone(),
313+
self.id,
314+
self.state.clone().unwrap(),
315+
self.params.spiller.clone(),
316+
)))
317+
}
239318
}
240319

241-
impl RowsTypeVisitor for TransformSortBuilder {
320+
impl RowsTypeVisitor for Build<'_> {
242321
fn schema(&self) -> DataSchemaRef {
243-
self.schema.clone()
322+
self.params.schema.clone()
244323
}
245324

246325
fn sort_desc(&self) -> &[SortColumnDescription] {
247-
&self.sort_desc
326+
&self.params.sort_desc
248327
}
249328

250329
fn visit_type<R, C>(&mut self)
@@ -254,27 +333,28 @@ impl RowsTypeVisitor for TransformSortBuilder {
254333
{
255334
let processor = match self.typ {
256335
SortType::Sort => match (
257-
self.limit.map(|limit| limit < 10000).unwrap_or_default(),
258-
self.enable_loser_tree,
336+
self.params.should_use_sort_limit(),
337+
self.params.enable_loser_tree,
259338
) {
260339
(true, true) => self.build_sort_limit::<LoserTreeSort<R>, C>(),
261340
(true, false) => self.build_sort_limit::<HeapSort<R>, C>(),
262341
(false, true) => self.build_sort::<LoserTreeSort<R>, C>(),
263342
(false, false) => self.build_sort::<HeapSort<R>, C>(),
264343
},
265344
SortType::Collect => match (
266-
self.limit.map(|limit| limit < 10000).unwrap_or_default(),
267-
self.enable_loser_tree,
345+
self.params.should_use_sort_limit(),
346+
self.params.enable_loser_tree,
268347
) {
269348
(true, true) => self.build_sort_limit_collect::<LoserTreeSort<R>, C>(),
270349
(true, false) => self.build_sort_limit_collect::<HeapSort<R>, C>(),
271350
(false, true) => self.build_sort_collect::<LoserTreeSort<R>, C>(),
272351
(false, false) => self.build_sort_collect::<HeapSort<R>, C>(),
273352
},
274-
SortType::Execute => match self.enable_loser_tree {
353+
SortType::Execute => match self.params.enable_loser_tree {
275354
true => self.build_sort_exec::<LoserTreeSort<R>>(),
276355
false => self.build_sort_exec::<HeapSort<R>>(),
277356
},
357+
SortType::Shuffle => self.build_sort_shuffle::<R>(),
278358
};
279359
self.processor = Some(processor)
280360
}

0 commit comments

Comments
 (0)