@@ -20,16 +20,6 @@ use databend_common_catalog::plan::DataSourceInfo;
20
20
use databend_common_catalog:: plan:: DataSourcePlan ;
21
21
use databend_common_exception:: ErrorCode ;
22
22
use databend_common_exception:: Result ;
23
- use databend_common_expression:: row:: RowConverter as CommonConverter ;
24
- use databend_common_expression:: types:: AccessType ;
25
- use databend_common_expression:: types:: ArgType ;
26
- use databend_common_expression:: types:: DataType ;
27
- use databend_common_expression:: types:: DateType ;
28
- use databend_common_expression:: types:: NumberDataType ;
29
- use databend_common_expression:: types:: NumberType ;
30
- use databend_common_expression:: types:: StringType ;
31
- use databend_common_expression:: types:: TimestampType ;
32
- use databend_common_expression:: with_number_mapped_type;
33
23
use databend_common_expression:: DataSchemaRef ;
34
24
use databend_common_expression:: DataSchemaRefExt ;
35
25
use databend_common_expression:: SortColumnDescription ;
@@ -41,28 +31,27 @@ use databend_common_pipeline_core::Pipeline;
41
31
use databend_common_pipeline_sources:: EmptySource ;
42
32
use databend_common_pipeline_transforms:: processors:: build_compact_block_no_split_pipeline;
43
33
use databend_common_pipeline_transforms:: processors:: TransformPipelineHelper ;
44
- use databend_common_pipeline_transforms:: sort:: CommonRows ;
45
- use databend_common_pipeline_transforms:: sort:: RowConverter ;
46
- use databend_common_pipeline_transforms:: sort:: Rows ;
47
- use databend_common_pipeline_transforms:: sort:: SimpleRowConverter ;
48
- use databend_common_pipeline_transforms:: sort:: SimpleRowsAsc ;
34
+ use databend_common_pipeline_transforms:: sort:: utils:: add_order_field;
35
+ use databend_common_pipeline_transforms:: MemorySettings ;
49
36
use databend_common_sql:: evaluator:: CompoundBlockOperator ;
50
37
use databend_common_sql:: executor:: physical_plans:: MutationKind ;
51
38
use databend_common_sql:: executor:: physical_plans:: Recluster ;
52
39
use databend_common_sql:: StreamContext ;
53
40
use databend_common_storages_factory:: Table ;
54
41
use databend_common_storages_fuse:: io:: StreamBlockProperties ;
42
+ use databend_common_storages_fuse:: operations:: TransformBlockBuilder ;
55
43
use databend_common_storages_fuse:: operations:: TransformBlockWriter ;
56
44
use databend_common_storages_fuse:: operations:: TransformSerializeBlock ;
57
45
use databend_common_storages_fuse:: FuseTable ;
58
46
use databend_common_storages_fuse:: TableContext ;
59
- use match_template:: match_template;
60
47
61
48
use crate :: pipelines:: builders:: SortPipelineBuilder ;
62
49
use crate :: pipelines:: processors:: transforms:: ReclusterPartitionExchange ;
50
+ use crate :: pipelines:: processors:: transforms:: ReclusterPartitionStrategys ;
63
51
use crate :: pipelines:: processors:: transforms:: SampleState ;
64
52
use crate :: pipelines:: processors:: transforms:: TransformAddOrderColumn ;
65
53
use crate :: pipelines:: processors:: transforms:: TransformAddStreamColumns ;
54
+ use crate :: pipelines:: processors:: transforms:: TransformPartitionCollect ;
66
55
use crate :: pipelines:: processors:: transforms:: TransformRangePartitionIndexer ;
67
56
use crate :: pipelines:: processors:: transforms:: TransformReclusterCollect ;
68
57
use crate :: pipelines:: processors:: transforms:: TransformReclusterPartition ;
@@ -172,9 +161,7 @@ impl PipelineBuilder {
172
161
} ) ;
173
162
}
174
163
175
- let fields_with_cluster_key = properties. fields_with_cluster_key ( ) ;
176
- let schema = DataSchemaRefExt :: create ( fields_with_cluster_key) ;
177
- let sort_descs: Vec < _ > = properties
164
+ let sort_desc: Vec < _ > = properties
178
165
. cluster_key_index ( )
179
166
. iter ( )
180
167
. map ( |& offset| SortColumnDescription {
@@ -183,6 +170,10 @@ impl PipelineBuilder {
183
170
nulls_first : false ,
184
171
} )
185
172
. collect ( ) ;
173
+ let fields_with_cluster_key = properties. fields_with_cluster_key ( ) ;
174
+ let schema = DataSchemaRefExt :: create ( fields_with_cluster_key) ;
175
+ let schema = add_order_field ( schema, & sort_desc) ;
176
+ let order_offset = schema. fields . len ( ) - 1 ;
186
177
187
178
let num_processors = self . main_pipeline . output_len ( ) ;
188
179
let sample_size = self
@@ -196,9 +187,12 @@ impl PipelineBuilder {
196
187
task. total_compressed ,
197
188
) ;
198
189
let state = SampleState :: new ( num_processors, partitions) ;
199
- let recluster_pipeline_builder =
200
- ReclusterPipelineBuilder :: create ( schema, sort_descs. into ( ) , sample_size)
201
- . with_state ( state) ;
190
+ let recluster_pipeline_builder = ReclusterPipelineBuilder :: create (
191
+ schema. clone ( ) ,
192
+ sort_desc. clone ( ) ,
193
+ sample_size,
194
+ )
195
+ . with_state ( state) ;
202
196
recluster_pipeline_builder
203
197
. build_recluster_sample_pipeline ( & mut self . main_pipeline ) ?;
204
198
@@ -207,16 +201,46 @@ impl PipelineBuilder {
207
201
ReclusterPartitionExchange :: create ( 0 , partitions) ,
208
202
) ;
209
203
let processor_id = AtomicUsize :: new ( 0 ) ;
210
- self . main_pipeline . add_transform ( |input, output| {
211
- TransformReclusterPartition :: try_create (
212
- input,
213
- output,
214
- properties. clone ( ) ,
215
- processor_id. fetch_add ( 1 , atomic:: Ordering :: AcqRel ) ,
216
- num_processors,
217
- partitions,
218
- )
219
- } ) ?;
204
+
205
+ let settings = self . ctx . get_settings ( ) ;
206
+ let enable_writings = settings. get_enable_block_stream_writes ( ) ?;
207
+ if enable_writings {
208
+ let memory_settings = MemorySettings :: disable_spill ( ) ;
209
+ self . main_pipeline . add_transform ( |input, output| {
210
+ let strategy =
211
+ ReclusterPartitionStrategys :: new ( properties. clone ( ) , order_offset) ;
212
+
213
+ Ok ( ProcessorPtr :: create ( Box :: new (
214
+ TransformPartitionCollect :: new (
215
+ self . ctx . clone ( ) ,
216
+ input,
217
+ output,
218
+ & settings,
219
+ processor_id. fetch_add ( 1 , atomic:: Ordering :: AcqRel ) ,
220
+ num_processors,
221
+ partitions,
222
+ memory_settings. clone ( ) ,
223
+ None ,
224
+ strategy,
225
+ ) ?,
226
+ ) ) )
227
+ } ) ?;
228
+
229
+ self . main_pipeline . add_transform ( |input, output| {
230
+ TransformBlockBuilder :: try_create ( input, output, properties. clone ( ) )
231
+ } ) ?;
232
+ } else {
233
+ self . main_pipeline . add_transform ( |input, output| {
234
+ TransformReclusterPartition :: try_create (
235
+ input,
236
+ output,
237
+ properties. clone ( ) ,
238
+ processor_id. fetch_add ( 1 , atomic:: Ordering :: AcqRel ) ,
239
+ num_processors,
240
+ partitions,
241
+ )
242
+ } ) ?;
243
+ }
220
244
221
245
self . main_pipeline . add_async_accumulating_transformer ( || {
222
246
TransformBlockWriter :: create (
@@ -249,7 +273,7 @@ impl PipelineBuilder {
249
273
// construct output fields
250
274
let output_fields = cluster_stats_gen. out_fields . clone ( ) ;
251
275
let schema = DataSchemaRefExt :: create ( output_fields) ;
252
- let sort_descs : Vec < _ > = cluster_stats_gen
276
+ let sort_desc : Vec < _ > = cluster_stats_gen
253
277
. cluster_key_index
254
278
. iter ( )
255
279
. map ( |offset| SortColumnDescription {
@@ -267,10 +291,9 @@ impl PipelineBuilder {
267
291
) ;
268
292
269
293
let sort_pipeline_builder =
270
- SortPipelineBuilder :: create ( self . ctx . clone ( ) , schema, sort_descs . into ( ) ) ?
294
+ SortPipelineBuilder :: create ( self . ctx . clone ( ) , schema, sort_desc . into ( ) ) ?
271
295
. with_block_size_hit ( sort_block_size)
272
296
. remove_order_col_at_last ( ) ;
273
- // Todo(zhyass): Recluster will no longer perform sort in the near future.
274
297
sort_pipeline_builder. build_full_sort_pipeline ( & mut self . main_pipeline ) ?;
275
298
276
299
// Compact after merge sort.
@@ -306,7 +329,7 @@ impl PipelineBuilder {
306
329
307
330
struct ReclusterPipelineBuilder {
308
331
schema : DataSchemaRef ,
309
- sort_desc : Arc < [ SortColumnDescription ] > ,
332
+ sort_desc : Vec < SortColumnDescription > ,
310
333
state : Option < Arc < SampleState > > ,
311
334
sample_size : usize ,
312
335
seed : u64 ,
@@ -315,7 +338,7 @@ struct ReclusterPipelineBuilder {
315
338
impl ReclusterPipelineBuilder {
316
339
fn create (
317
340
schema : DataSchemaRef ,
318
- sort_desc : Arc < [ SortColumnDescription ] > ,
341
+ sort_desc : Vec < SortColumnDescription > ,
319
342
sample_size : usize ,
320
343
) -> Self {
321
344
Self {
@@ -339,53 +362,17 @@ impl ReclusterPipelineBuilder {
339
362
}
340
363
341
364
fn build_recluster_sample_pipeline ( & self , pipeline : & mut Pipeline ) -> Result < ( ) > {
342
- match self . sort_desc . as_ref ( ) {
343
- [ desc] => {
344
- let schema = self . schema . clone ( ) ;
345
- let sort_type = schema. field ( desc. offset ) . data_type ( ) ;
346
- assert ! ( desc. asc) ;
347
-
348
- match_template ! {
349
- T = [ Date => DateType , Timestamp => TimestampType , String => StringType ] ,
350
- match sort_type {
351
- DataType :: T => {
352
- self . visit_type:: <SimpleRowsAsc <T >, SimpleRowConverter <T >>( pipeline)
353
- } ,
354
- DataType :: Number ( num_ty) => with_number_mapped_type!( |NUM_TYPE | match num_ty {
355
- NumberDataType :: NUM_TYPE => {
356
- self . visit_type:: <SimpleRowsAsc <NumberType <NUM_TYPE >>, SimpleRowConverter <NumberType <NUM_TYPE >>>( pipeline)
357
- }
358
- } ) ,
359
- _ => self . visit_type:: <CommonRows , CommonConverter >( pipeline)
360
- }
361
- }
362
- }
363
- _ => self . visit_type :: < CommonRows , CommonConverter > ( pipeline) ,
364
- }
365
- }
366
-
367
- fn visit_type < R , C > ( & self , pipeline : & mut Pipeline ) -> Result < ( ) >
368
- where
369
- R : Rows + ' static ,
370
- C : RowConverter < R > + Send + ' static ,
371
- R :: Type : ArgType + Send + Sync ,
372
- <R :: Type as AccessType >:: Scalar : Ord + Send + Sync ,
373
- {
374
365
pipeline. try_add_transformer ( || {
375
- TransformAddOrderColumn :: < R , C > :: try_new ( self . sort_desc . clone ( ) , self . schema . clone ( ) )
366
+ TransformAddOrderColumn :: try_new ( self . sort_desc . clone ( ) , self . schema . clone ( ) )
376
367
} ) ?;
377
- let offset = self . schema . num_fields ( ) ;
368
+ let offset = self . schema . num_fields ( ) - 1 ;
378
369
pipeline. add_accumulating_transformer ( || {
379
- TransformReclusterCollect :: < R :: Type > :: new ( offset, self . sample_size , self . seed )
370
+ TransformReclusterCollect :: new ( offset, self . sample_size , self . seed )
380
371
} ) ;
381
372
pipeline. add_transform ( |input, output| {
382
- Ok ( ProcessorPtr :: create ( TransformRangePartitionIndexer :: <
383
- R :: Type ,
384
- > :: create (
385
- input,
386
- output,
387
- self . state . clone ( ) . unwrap ( ) ,
388
- ) ) )
373
+ Ok ( ProcessorPtr :: create (
374
+ TransformRangePartitionIndexer :: create ( input, output, self . state . clone ( ) . unwrap ( ) ) ,
375
+ ) )
389
376
} )
390
377
}
391
378
}
0 commit comments