@@ -20,7 +20,6 @@ use common_datavalues::DataType;
20
20
use common_exception:: ErrorCode ;
21
21
use common_exception:: Result ;
22
22
use common_functions:: scalars:: CastFunction ;
23
- use common_planners:: StageKind ;
24
23
use common_streams:: DataBlockStream ;
25
24
use common_streams:: SendableDataBlockStream ;
26
25
use futures_util:: StreamExt ;
@@ -45,7 +44,6 @@ use crate::pipelines::SourcePipeBuilder;
45
44
use crate :: sessions:: QueryContext ;
46
45
use crate :: sessions:: TableContext ;
47
46
use crate :: sql:: executor:: DistributedInsertSelect ;
48
- use crate :: sql:: executor:: Exchange ;
49
47
use crate :: sql:: executor:: PhysicalPlan ;
50
48
use crate :: sql:: executor:: PhysicalPlanBuilder ;
51
49
use crate :: sql:: plans:: Insert ;
@@ -115,9 +113,13 @@ impl InsertInterpreterV2 {
115
113
InsertInputSource :: SelectPlan ( plan) => {
116
114
if !self . ctx . get_cluster ( ) . is_empty ( ) {
117
115
// distributed insert select
118
- return self
116
+ if let Some ( stream ) = self
119
117
. schedule_insert_select ( plan, self . plan . catalog . clone ( ) , table. clone ( ) )
120
- . await ;
118
+ . await ?
119
+ {
120
+ return Ok ( stream) ;
121
+ }
122
+ // else the plan cannot be executed in cluster mode, fallback to standalone mode
121
123
}
122
124
123
125
let select_interpreter = match & * * plan {
@@ -202,8 +204,9 @@ impl InsertInterpreterV2 {
202
204
plan : & Plan ,
203
205
catalog : String ,
204
206
table : Arc < dyn Table > ,
205
- ) -> Result < SendableDataBlockStream > {
206
- let ( inner_plan, select_column_bindings) = match plan {
207
+ ) -> Result < Option < SendableDataBlockStream > > {
208
+ // select_plan is already distributed optimized
209
+ let ( mut select_plan, select_column_bindings) = match plan {
207
210
Plan :: Query {
208
211
s_expr,
209
212
metadata,
@@ -216,26 +219,44 @@ impl InsertInterpreterV2 {
216
219
_ => unreachable ! ( ) ,
217
220
} ;
218
221
222
+ if !select_plan. is_distributed_plan ( ) {
223
+ return Ok ( None ) ;
224
+ }
225
+
219
226
table. get_table_info ( ) ;
220
227
221
- let insert_select_plan =
222
- PhysicalPlan :: DistributedInsertSelect ( Box :: new ( DistributedInsertSelect {
223
- input : Box :: new ( inner_plan) ,
224
- catalog,
225
- table_info : table. get_table_info ( ) . clone ( ) ,
226
- select_schema : plan. schema ( ) ,
227
- select_column_bindings,
228
- insert_schema : self . plan . schema ( ) ,
229
- cast_needed : self . check_schema_cast ( plan) ?,
230
- } ) ) ;
231
-
232
- let final_plan = PhysicalPlan :: Exchange ( Exchange {
233
- input : Box :: new ( insert_select_plan) ,
234
- kind : StageKind :: Merge ,
235
- keys : vec ! [ ] ,
236
- } ) ;
237
-
238
- let mut build_res = build_schedule_pipepline ( self . ctx . clone ( ) , & final_plan) . await ?;
228
+ let insert_select_plan = match select_plan {
229
+ PhysicalPlan :: Exchange ( ref mut exchange) => {
230
+ // insert can be dispatched to different nodes
231
+ let input = exchange. input . clone ( ) ;
232
+ exchange. input = Box :: new ( PhysicalPlan :: DistributedInsertSelect ( Box :: new (
233
+ DistributedInsertSelect {
234
+ input,
235
+ catalog,
236
+ table_info : table. get_table_info ( ) . clone ( ) ,
237
+ select_schema : plan. schema ( ) ,
238
+ select_column_bindings,
239
+ insert_schema : self . plan . schema ( ) ,
240
+ cast_needed : self . check_schema_cast ( plan) ?,
241
+ } ,
242
+ ) ) ) ;
243
+ select_plan
244
+ }
245
+ other_plan => {
246
+ // insert should wait until all nodes finished
247
+ PhysicalPlan :: DistributedInsertSelect ( Box :: new ( DistributedInsertSelect {
248
+ input : Box :: new ( other_plan) ,
249
+ catalog,
250
+ table_info : table. get_table_info ( ) . clone ( ) ,
251
+ select_schema : plan. schema ( ) ,
252
+ select_column_bindings,
253
+ insert_schema : self . plan . schema ( ) ,
254
+ cast_needed : self . check_schema_cast ( plan) ?,
255
+ } ) )
256
+ }
257
+ } ;
258
+
259
+ let mut build_res = build_schedule_pipepline ( self . ctx . clone ( ) , & insert_select_plan) . await ?;
239
260
240
261
let settings = self . ctx . get_settings ( ) ;
241
262
let query_need_abort = self . ctx . query_need_abort ( ) ;
@@ -256,11 +277,11 @@ impl InsertInterpreterV2 {
256
277
257
278
commit2table ( self . ctx . clone ( ) , table. clone ( ) , self . plan . overwrite ) . await ?;
258
279
259
- Ok ( Box :: pin ( DataBlockStream :: create (
280
+ Ok ( Some ( Box :: pin ( DataBlockStream :: create (
260
281
self . plan . schema ( ) ,
261
282
None ,
262
283
vec ! [ ] ,
263
- ) ) )
284
+ ) ) ) )
264
285
}
265
286
}
266
287
0 commit comments