15
15
use std:: sync:: Arc ;
16
16
17
17
use databend_common_catalog:: lock:: LockTableOption ;
18
+ use databend_common_catalog:: plan:: StageTableInfo ;
18
19
use databend_common_exception:: Result ;
19
20
use databend_common_expression:: types:: Int32Type ;
20
21
use databend_common_expression:: types:: StringType ;
@@ -24,7 +25,6 @@ use databend_common_expression::SendableDataBlockStream;
24
25
use databend_common_sql:: executor:: physical_plans:: MutationKind ;
25
26
use databend_common_sql:: executor:: PhysicalPlanBuilder ;
26
27
use databend_common_sql:: optimizer:: SExpr ;
27
- use databend_common_sql:: plans:: StageContext ;
28
28
use log:: debug;
29
29
use log:: info;
30
30
@@ -45,7 +45,7 @@ pub struct CopyIntoTableInterpreter {
45
45
ctx : Arc < QueryContext > ,
46
46
s_expr : SExpr ,
47
47
metadata : MetadataRef ,
48
- stage_context : Option < Box < StageContext > > ,
48
+ stage_table_info : Option < Box < StageTableInfo > > ,
49
49
overwrite : bool ,
50
50
}
51
51
@@ -87,13 +87,15 @@ impl Interpreter for CopyIntoTableInterpreter {
87
87
& copy_into_table. table_name ,
88
88
)
89
89
. await ?;
90
- let copied_files_meta_req = match & self . stage_context {
91
- Some ( stage_context ) => PipelineBuilder :: build_upsert_copied_files_to_meta_req (
90
+ let copied_files_meta_req = match & self . stage_table_info {
91
+ Some ( stage_table_info ) => PipelineBuilder :: build_upsert_copied_files_to_meta_req (
92
92
self . ctx . clone ( ) ,
93
93
target_table. as_ref ( ) ,
94
- stage_context. purge ,
95
- & stage_context. files_to_copy ,
96
- stage_context. force ,
94
+ stage_table_info
95
+ . files_to_copy
96
+ . as_deref ( )
97
+ . unwrap_or_default ( ) ,
98
+ & stage_table_info. copy_into_table_options ,
97
99
) ?,
98
100
None => None ,
99
101
} ;
@@ -110,30 +112,27 @@ impl Interpreter for CopyIntoTableInterpreter {
110
112
) ?;
111
113
112
114
// Purge files on pipeline finished.
113
- if let Some ( stage_context) = & self . stage_context {
114
- let StageContext {
115
- purge,
116
- force : _,
117
- files_to_copy,
118
- duplicated_files_detected,
119
- stage_info,
120
- } = stage_context. as_ref ( ) ;
115
+ if let Some ( stage_table_info) = & self . stage_table_info {
116
+ let files_to_copy = stage_table_info
117
+ . files_to_copy
118
+ . as_deref ( )
119
+ . unwrap_or_default ( ) ;
121
120
info ! (
122
121
"set files to be purged, # of copied files: {}, # of duplicated files: {}" ,
123
122
files_to_copy. len( ) ,
124
- duplicated_files_detected. len( )
123
+ stage_table_info . duplicated_files_detected. len( )
125
124
) ;
126
125
127
126
let files_to_be_deleted = files_to_copy
128
127
. iter ( )
129
- . map ( |v| v . path . clone ( ) )
130
- . chain ( duplicated_files_detected. clone ( ) )
128
+ . map ( |f| f . path . clone ( ) )
129
+ . chain ( stage_table_info . duplicated_files_detected . clone ( ) )
131
130
. collect :: < Vec < _ > > ( ) ;
132
131
PipelineBuilder :: set_purge_files_on_finished (
133
132
self . ctx . clone ( ) ,
134
133
files_to_be_deleted,
135
- * purge ,
136
- stage_info. clone ( ) ,
134
+ & stage_table_info . copy_into_table_options ,
135
+ stage_table_info . stage_info . clone ( ) ,
137
136
& mut build_res. main_pipeline ,
138
137
) ?;
139
138
}
@@ -159,10 +158,7 @@ impl Interpreter for CopyIntoTableInterpreter {
159
158
let copy_into_table: CopyIntoTablePlan = self . s_expr . plan ( ) . clone ( ) . try_into ( ) ?;
160
159
match & copy_into_table. mutation_kind {
161
160
MutationKind :: CopyInto => {
162
- let stage_context = self . stage_context . as_ref ( ) . unwrap ( ) ;
163
- let blocks = self . get_copy_into_table_result (
164
- stage_context. stage_info . copy_options . return_failed_only ,
165
- ) ?;
161
+ let blocks = self . get_copy_into_table_result ( ) ?;
166
162
Ok ( Box :: pin ( DataBlockStream :: create ( None , blocks) ) )
167
163
}
168
164
MutationKind :: Insert => Ok ( Box :: pin ( DataBlockStream :: create ( None , vec ! [ ] ) ) ) ,
@@ -177,24 +173,24 @@ impl CopyIntoTableInterpreter {
177
173
ctx : Arc < QueryContext > ,
178
174
s_expr : SExpr ,
179
175
metadata : MetadataRef ,
180
- stage_context : Option < Box < StageContext > > ,
176
+ stage_table_info : Option < Box < StageTableInfo > > ,
181
177
overwrite : bool ,
182
178
) -> Result < Self > {
183
179
Ok ( CopyIntoTableInterpreter {
184
180
ctx,
185
181
s_expr,
186
182
metadata,
187
- stage_context ,
183
+ stage_table_info ,
188
184
overwrite,
189
185
} )
190
186
}
191
187
192
188
fn get_copy_into_table_result ( & self ) -> Result < Vec < DataBlock > > {
193
189
let return_all = !self
194
- . plan
195
190
. stage_table_info
196
- . stage_info
197
- . copy_options
191
+ . as_ref ( )
192
+ . unwrap ( )
193
+ . copy_into_table_options
198
194
. return_failed_only ;
199
195
let cs = self . ctx . get_copy_status ( ) ;
200
196
@@ -233,183 +229,4 @@ impl CopyIntoTableInterpreter {
233
229
] ) ] ;
234
230
Ok ( blocks)
235
231
}
236
-
237
- /// Build commit insertion pipeline.
238
- async fn commit_insertion (
239
- & self ,
240
- main_pipeline : & mut Pipeline ,
241
- plan : & CopyIntoTablePlan ,
242
- files_to_copy : Vec < StageFileInfo > ,
243
- duplicated_files_detected : Vec < String > ,
244
- update_stream_meta : Vec < UpdateStreamMetaReq > ,
245
- deduplicated_label : Option < String > ,
246
- ) -> Result < ( ) > {
247
- let ctx = self . ctx . clone ( ) ;
248
- let to_table = ctx
249
- . get_table (
250
- plan. catalog_info . catalog_name ( ) ,
251
- & plan. database_name ,
252
- & plan. table_name ,
253
- )
254
- . await ?;
255
-
256
- // Commit.
257
- {
258
- let copied_files_meta_req = PipelineBuilder :: build_upsert_copied_files_to_meta_req (
259
- ctx. clone ( ) ,
260
- to_table. as_ref ( ) ,
261
- & plan. stage_table_info . stage_info ,
262
- & files_to_copy,
263
- plan. force ,
264
- ) ?;
265
-
266
- to_table. commit_insertion (
267
- ctx. clone ( ) ,
268
- main_pipeline,
269
- copied_files_meta_req,
270
- update_stream_meta,
271
- plan. write_mode . is_overwrite ( ) ,
272
- None ,
273
- deduplicated_label,
274
- ) ?;
275
- }
276
-
277
- // Purge files.
278
- {
279
- info ! (
280
- "set files to be purged, # of copied files: {}, # of duplicated files: {}" ,
281
- files_to_copy. len( ) ,
282
- duplicated_files_detected. len( )
283
- ) ;
284
-
285
- let files_to_be_deleted = files_to_copy
286
- . into_iter ( )
287
- . map ( |v| v. path )
288
- . chain ( duplicated_files_detected)
289
- . collect :: < Vec < _ > > ( ) ;
290
- // set on_finished callback.
291
- PipelineBuilder :: set_purge_files_on_finished (
292
- ctx. clone ( ) ,
293
- files_to_be_deleted,
294
- plan. stage_table_info . stage_info . copy_options . purge ,
295
- plan. stage_table_info . stage_info . clone ( ) ,
296
- main_pipeline,
297
- ) ?;
298
- }
299
- Ok ( ( ) )
300
- }
301
-
302
- async fn on_no_files_to_copy ( & self ) -> Result < PipelineBuildResult > {
303
- // currently, there is only one thing that we care about:
304
- //
305
- // if `purge_duplicated_files_in_copy` and `purge` are all enabled,
306
- // and there are duplicated files detected, we should clean them up immediately.
307
-
308
- // it might be better to reuse the PipelineBuilder::set_purge_files_on_finished,
309
- // unfortunately, hooking the on_finished callback of a "blank" pipeline,
310
- // e.g. `PipelineBuildResult::create` leads to runtime error (during pipeline execution).
311
-
312
- if self . plan . stage_table_info . stage_info . copy_options . purge
313
- && !self
314
- . plan
315
- . stage_table_info
316
- . duplicated_files_detected
317
- . is_empty ( )
318
- && self
319
- . ctx
320
- . get_settings ( )
321
- . get_enable_purge_duplicated_files_in_copy ( ) ?
322
- {
323
- info ! (
324
- "purge_duplicated_files_in_copy enabled, number of duplicated files: {}" ,
325
- self . plan. stage_table_info. duplicated_files_detected. len( )
326
- ) ;
327
-
328
- PipelineBuilder :: purge_files_immediately (
329
- self . ctx . clone ( ) ,
330
- self . plan . stage_table_info . duplicated_files_detected . clone ( ) ,
331
- self . plan . stage_table_info . stage_info . clone ( ) ,
332
- )
333
- . await ?;
334
- }
335
- Ok ( PipelineBuildResult :: create ( ) )
336
- }
337
- }
338
-
339
- #[ async_trait:: async_trait]
340
- impl Interpreter for CopyIntoTableInterpreter {
341
- fn name ( & self ) -> & str {
342
- "CopyIntoTableInterpreterV2"
343
- }
344
-
345
- fn is_ddl ( & self ) -> bool {
346
- false
347
- }
348
-
349
- #[ fastrace:: trace]
350
- #[ async_backtrace:: framed]
351
- async fn execute2 ( & self ) -> Result < PipelineBuildResult > {
352
- debug ! ( "ctx.id" = self . ctx. get_id( ) . as_str( ) ; "copy_into_table_interpreter_execute_v2" ) ;
353
-
354
- if check_deduplicate_label ( self . ctx . clone ( ) ) . await ? {
355
- return Ok ( PipelineBuildResult :: create ( ) ) ;
356
- }
357
-
358
- if self . plan . no_file_to_copy {
359
- info ! ( "no file to copy" ) ;
360
- return self . on_no_files_to_copy ( ) . await ;
361
- }
362
-
363
- let ( physical_plan, update_stream_meta) = self . build_physical_plan ( & self . plan ) . await ?;
364
- let mut build_res =
365
- build_query_pipeline_without_render_result_set ( & self . ctx , & physical_plan) . await ?;
366
-
367
- // Build commit insertion pipeline.
368
- {
369
- let files_to_copy = self
370
- . plan
371
- . stage_table_info
372
- . files_to_copy
373
- . clone ( )
374
- . unwrap_or_default ( ) ;
375
-
376
- let duplicated_files_detected =
377
- self . plan . stage_table_info . duplicated_files_detected . clone ( ) ;
378
-
379
- self . commit_insertion (
380
- & mut build_res. main_pipeline ,
381
- & self . plan ,
382
- files_to_copy,
383
- duplicated_files_detected,
384
- update_stream_meta,
385
- unsafe { self . ctx . get_settings ( ) . get_deduplicate_label ( ) ? } ,
386
- )
387
- . await ?;
388
- }
389
-
390
- // Execute hook.
391
- {
392
- let hook_operator = HookOperator :: create (
393
- self . ctx . clone ( ) ,
394
- self . plan . catalog_info . catalog_name ( ) . to_string ( ) ,
395
- self . plan . database_name . to_string ( ) ,
396
- self . plan . table_name . to_string ( ) ,
397
- MutationKind :: Insert ,
398
- LockTableOption :: LockNoRetry ,
399
- ) ;
400
- hook_operator. execute ( & mut build_res. main_pipeline ) . await ;
401
- }
402
-
403
- Ok ( build_res)
404
- }
405
-
406
- fn inject_result ( & self ) -> Result < SendableDataBlockStream > {
407
- let blocks = if self . plan . no_file_to_copy {
408
- vec ! [ DataBlock :: empty_with_schema( self . plan. schema( ) ) ]
409
- } else {
410
- self . get_copy_into_table_result ( ) ?
411
- } ;
412
-
413
- Ok ( Box :: pin ( DataBlockStream :: create ( None , blocks) ) )
414
- }
415
232
}
0 commit comments