Skip to content

Commit 410fe6f

Browse files
committed
fix schema
1 parent 7c30c8d commit 410fe6f

File tree

9 files changed

+23
-19
lines changed

9 files changed

+23
-19
lines changed

src/query/service/src/interpreters/interpreter_append.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,15 @@ impl Interpreter for AppendInterpreter {
6565
#[fastrace::trace]
6666
#[async_backtrace::framed]
6767
async fn execute2(&self) -> Result<PipelineBuildResult> {
68-
debug!("ctx.id" = self.ctx.get_id().as_str(); "copy_into_table_interpreter_execute_v2");
68+
debug!("ctx.id" = self.ctx.get_id().as_str(); "append_interpreter_execute");
6969
if check_deduplicate_label(self.ctx.clone()).await? {
7070
return Ok(PipelineBuildResult::create());
7171
}
7272

73-
let copy_into_table: Append = self.s_expr.plan().clone().try_into()?;
73+
let append: Append = self.s_expr.plan().clone().try_into()?;
7474
let (target_table, catalog, database, table) = {
7575
let metadata = self.metadata.read();
76-
let t = metadata.table(copy_into_table.table_index);
76+
let t = metadata.table(append.table_index);
7777
(
7878
t.table(),
7979
t.catalog().to_string(),
@@ -162,8 +162,8 @@ impl Interpreter for AppendInterpreter {
162162
}
163163

164164
fn inject_result(&self) -> Result<SendableDataBlockStream> {
165-
let copy_into_table: Append = self.s_expr.plan().clone().try_into()?;
166-
match &copy_into_table.append_type {
165+
let append: Append = self.s_expr.plan().clone().try_into()?;
166+
match &append.append_type {
167167
AppendType::CopyInto => {
168168
let blocks = self.get_copy_into_table_result()?;
169169
Ok(Box::pin(DataBlockStream::create(None, blocks)))

src/query/service/src/pipelines/builders/builder_append.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::pipelines::PipelineBuilder;
4141
use crate::sessions::QueryContext;
4242

4343
impl PipelineBuilder {
44-
pub(crate) fn build_copy_into_table(&mut self, copy: &PhysicalAppend) -> Result<()> {
44+
pub(crate) fn build_append(&mut self, copy: &PhysicalAppend) -> Result<()> {
4545
let to_table = self.ctx.build_table_by_table_info(&copy.table_info, None)?;
4646
self.ctx
4747
.set_read_block_thresholds(to_table.get_block_thresholds());

src/query/service/src/pipelines/pipeline_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ impl PipelineBuilder {
195195
}
196196

197197
// Copy into.
198-
PhysicalPlan::Append(copy) => self.build_copy_into_table(copy),
198+
PhysicalPlan::Append(append) => self.build_append(append),
199199
PhysicalPlan::CopyIntoLocation(copy) => self.build_copy_into_location(copy),
200200

201201
// Replace.

src/query/service/src/schedulers/fragments/fragmenter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ impl PhysicalPlanReplacer for Fragmenter {
178178
})))
179179
}
180180

181-
fn replace_copy_into_table(&mut self, plan: &PhysicalAppend) -> Result<PhysicalPlan> {
181+
fn replace_append(&mut self, plan: &PhysicalAppend) -> Result<PhysicalPlan> {
182182
let input = self.replace(&plan.input)?;
183183
Ok(PhysicalPlan::Append(Box::new(PhysicalAppend {
184184
plan_id: plan.plan_id,

src/query/service/src/schedulers/fragments/plan_fragment.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ impl PhysicalPlanReplacer for ReplaceReadSource {
535535
}))
536536
}
537537

538-
fn replace_copy_into_table(&mut self, plan: &PhysicalAppend) -> Result<PhysicalPlan> {
538+
fn replace_append(&mut self, plan: &PhysicalAppend) -> Result<PhysicalPlan> {
539539
let input = self.replace(&plan.input)?;
540540
Ok(PhysicalPlan::Append(Box::new(PhysicalAppend {
541541
plan_id: plan.plan_id,

src/query/sql/src/executor/physical_plan_builder.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,7 @@ impl PhysicalPlanBuilder {
142142
}
143143
RelOperator::Recluster(recluster) => self.build_recluster(recluster).await,
144144
RelOperator::CompactBlock(compact) => self.build_compact_block(compact).await,
145-
RelOperator::Append(copy_into_table) => {
146-
self.build_copy_into_table(s_expr, copy_into_table).await
147-
}
145+
RelOperator::Append(append) => self.build_append(s_expr, append).await,
148146
RelOperator::ValueScan(value_scan) => self.build_value_scan(value_scan).await,
149147
}
150148
}

src/query/sql/src/executor/physical_plan_visitor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub trait PhysicalPlanReplacer {
9292
PhysicalPlan::CompactSource(plan) => self.replace_compact_source(plan),
9393
PhysicalPlan::CommitSink(plan) => self.replace_commit_sink(plan),
9494
PhysicalPlan::RangeJoin(plan) => self.replace_range_join(plan),
95-
PhysicalPlan::Append(plan) => self.replace_copy_into_table(plan),
95+
PhysicalPlan::Append(plan) => self.replace_append(plan),
9696
PhysicalPlan::CopyIntoLocation(plan) => self.replace_copy_into_location(plan),
9797
PhysicalPlan::ReplaceAsyncSourcer(plan) => self.replace_async_sourcer(plan),
9898
PhysicalPlan::ReplaceDeduplicate(plan) => self.replace_deduplicate(plan),
@@ -402,7 +402,7 @@ pub trait PhysicalPlanReplacer {
402402
}))
403403
}
404404

405-
fn replace_copy_into_table(&mut self, plan: &PhysicalAppend) -> Result<PhysicalPlan> {
405+
fn replace_append(&mut self, plan: &PhysicalAppend) -> Result<PhysicalPlan> {
406406
let input = self.replace(&plan.input)?;
407407

408408
Ok(PhysicalPlan::Append(Box::new(PhysicalAppend {

src/query/sql/src/planner/binder/copy_into_table.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ impl<'a> Binder {
257257
async fn bind_copy_into_table_from_location(
258258
&mut self,
259259
bind_ctx: &BindContext,
260-
copy_into_table_plan: Append,
260+
mut copy_into_table_plan: Append,
261261
stage_table_info: StageTableInfo,
262262
) -> Result<Plan> {
263263
let use_query = matches!(&stage_table_info.stage_info.file_format_params,
@@ -304,7 +304,7 @@ impl<'a> Binder {
304304
)
305305
.await
306306
} else {
307-
let (scan, _) = self
307+
let (scan, bind_context) = self
308308
.bind_stage_table(
309309
self.ctx.clone(),
310310
bind_ctx,
@@ -314,6 +314,7 @@ impl<'a> Binder {
314314
stage_table_info.files_to_copy.clone(),
315315
)
316316
.await?;
317+
copy_into_table_plan.project_columns = Some(bind_context.columns.clone());
317318

318319
let copy_into =
319320
SExpr::create_unary(Arc::new(copy_into_table_plan.into()), Arc::new(scan));

src/query/sql/src/planner/plans/append.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,14 +301,19 @@ impl Operator for Append {
301301
}
302302

303303
impl PhysicalPlanBuilder {
304-
pub async fn build_copy_into_table(
304+
pub async fn build_append(
305305
&mut self,
306306
s_expr: &SExpr,
307307
plan: &crate::plans::Append,
308308
) -> Result<PhysicalPlan> {
309309
let target_table = self.metadata.read().table(plan.table_index).table();
310310

311-
let source = self.build(s_expr.child(0)?, Default::default()).await?;
311+
let column_set = plan
312+
.project_columns
313+
.as_ref()
314+
.map(|project_columns| project_columns.iter().map(|c| c.index).collect())
315+
.unwrap_or_default();
316+
let source = self.build(s_expr.child(0)?, column_set).await?;
312317

313318
Ok(PhysicalPlan::Append(Box::new(PhysicalAppend {
314319
plan_id: 0,
@@ -317,7 +322,7 @@ impl PhysicalPlanBuilder {
317322
values_consts: plan.values_consts.clone(),
318323
required_source_schema: plan.required_source_schema.clone(),
319324
table_info: target_table.get_table_info().clone(),
320-
project_columns: None,
325+
project_columns: plan.project_columns.clone(),
321326
})))
322327
}
323328
}

0 commit comments

Comments
 (0)