@@ -23,12 +23,14 @@ use common_functions::scalars::CastFunction;
23
23
use common_planners:: StageKind ;
24
24
use common_streams:: DataBlockStream ;
25
25
use common_streams:: SendableDataBlockStream ;
26
+ use futures_util:: StreamExt ;
26
27
use parking_lot:: Mutex ;
27
28
28
29
use super :: commit2table;
29
30
use super :: commit2table_with_append_entries;
30
31
use super :: interpreter_common:: append2table;
31
32
use super :: plan_schedulers:: build_schedule_pipepline;
33
+ use super :: ProcessorExecutorStream ;
32
34
use crate :: clusters:: ClusterHelper ;
33
35
use crate :: interpreters:: Interpreter ;
34
36
use crate :: interpreters:: InterpreterPtr ;
@@ -198,11 +200,11 @@ impl InsertInterpreterV2 {
198
200
199
201
async fn schedule_insert_select (
200
202
& self ,
201
- plan : & Box < Plan > ,
203
+ plan : & Plan ,
202
204
catalog : String ,
203
205
table : Arc < dyn Table > ,
204
206
) -> Result < SendableDataBlockStream > {
205
- let inner_plan = match & * * plan {
207
+ let inner_plan = match plan {
206
208
Plan :: Query {
207
209
s_expr, metadata, ..
208
210
} => {
@@ -214,14 +216,15 @@ impl InsertInterpreterV2 {
214
216
215
217
table. get_table_info ( ) ;
216
218
217
- let insert_select_plan = PhysicalPlan :: DistributedInsertSelect ( DistributedInsertSelect {
218
- input : Box :: new ( inner_plan) ,
219
- catalog,
220
- table_info : table. get_table_info ( ) . clone ( ) ,
221
- select_schema : plan. schema ( ) ,
222
- insert_schema : self . plan . schema ( ) ,
223
- cast_needed : self . check_schema_cast ( plan) ?,
224
- } ) ;
219
+ let insert_select_plan =
220
+ PhysicalPlan :: DistributedInsertSelect ( Box :: new ( DistributedInsertSelect {
221
+ input : Box :: new ( inner_plan) ,
222
+ catalog,
223
+ table_info : table. get_table_info ( ) . clone ( ) ,
224
+ select_schema : plan. schema ( ) ,
225
+ insert_schema : self . plan . schema ( ) ,
226
+ cast_needed : self . check_schema_cast ( plan) ?,
227
+ } ) ) ;
225
228
226
229
let final_plan = PhysicalPlan :: Exchange ( Exchange {
227
230
input : Box :: new ( insert_select_plan) ,
@@ -242,8 +245,20 @@ impl InsertInterpreterV2 {
242
245
executor_settings,
243
246
) ?;
244
247
245
- commit2table_with_append_entries ( self . ctx . clone ( ) , table, self . plan . overwrite , vec ! [ ] )
246
- . await ?;
248
+ let mut append_entries = vec ! [ ] ;
249
+ let mut stream: SendableDataBlockStream =
250
+ Box :: pin ( ProcessorExecutorStream :: create ( executor) ?) ;
251
+ while let Some ( block) = stream. next ( ) . await {
252
+ append_entries. push ( block?) ;
253
+ }
254
+
255
+ commit2table_with_append_entries (
256
+ self . ctx . clone ( ) ,
257
+ table,
258
+ self . plan . overwrite ,
259
+ append_entries,
260
+ )
261
+ . await ?;
247
262
248
263
Ok ( Box :: pin ( DataBlockStream :: create (
249
264
self . plan . schema ( ) ,
0 commit comments