Skip to content

Commit 29407ce

Browse files
authored
fix: insert Plan should not have schema. (#15330)
1 parent 05e702f commit 29407ce

File tree

5 files changed

+92
-90
lines changed

5 files changed

+92
-90
lines changed

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl Interpreter for InsertInterpreter {
114114
InsertInputSource::Values(InsertValue::Values { rows }) => {
115115
build_res.main_pipeline.add_source(
116116
|output| {
117-
let inner = ValueSource::new(rows.clone(), self.plan.schema());
117+
let inner = ValueSource::new(rows.clone(), self.plan.dest_schema());
118118
AsyncSourcer::create(self.ctx.clone(), output, inner)
119119
},
120120
1,
@@ -131,7 +131,7 @@ impl Interpreter for InsertInterpreter {
131131
data.to_string(),
132132
self.ctx.clone(),
133133
name_resolution_ctx,
134-
self.plan.schema(),
134+
self.plan.dest_schema(),
135135
*start,
136136
);
137137
AsyncSourcer::create(self.ctx.clone(), output, inner)
@@ -147,7 +147,7 @@ impl Interpreter for InsertInterpreter {
147147

148148
match StageFileFormatType::from_str(format) {
149149
Ok(f) if f.has_inner_schema() => {
150-
let dest_schema = self.plan.schema();
150+
let dest_schema = self.plan.dest_schema();
151151
let func_ctx = self.ctx.get_function_context()?;
152152

153153
build_res.main_pipeline.add_transform(
@@ -175,7 +175,7 @@ impl Interpreter for InsertInterpreter {
175175
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
176176

177177
if format.get_type().has_inner_schema() {
178-
let dest_schema = self.plan.schema();
178+
let dest_schema = self.plan.dest_schema();
179179
let func_ctx = self.ctx.get_function_context()?;
180180

181181
build_res.main_pipeline.add_transform(
@@ -230,7 +230,7 @@ impl Interpreter for InsertInterpreter {
230230
table_info: table1.get_table_info().clone(),
231231
select_schema: plan.schema(),
232232
select_column_bindings,
233-
insert_schema: self.plan.schema(),
233+
insert_schema: self.plan.dest_schema(),
234234
cast_needed: self.check_schema_cast(plan)?,
235235
},
236236
)));
@@ -247,7 +247,7 @@ impl Interpreter for InsertInterpreter {
247247
table_info: table1.get_table_info().clone(),
248248
select_schema: plan.schema(),
249249
select_column_bindings,
250-
insert_schema: self.plan.schema(),
250+
insert_schema: self.plan.dest_schema(),
251251
cast_needed: self.check_schema_cast(plan)?,
252252
}))
253253
}
@@ -294,7 +294,7 @@ impl Interpreter for InsertInterpreter {
294294
self.ctx.clone(),
295295
&mut build_res.main_pipeline,
296296
table.clone(),
297-
self.plan.schema(),
297+
self.plan.dest_schema(),
298298
None,
299299
vec![],
300300
self.plan.overwrite,

src/query/service/src/servers/http/clickhouse_handler.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,9 +356,10 @@ pub async fn clickhouse_handler_post(
356356
.await
357357
.map_err(BadRequest)?;
358358

359-
let schema = plan.schema();
360359
let mut handle = None;
360+
let output_schema = plan.schema();
361361
if let Plan::Insert(insert) = &mut plan {
362+
let dest_schema = insert.dest_schema();
362363
if let InsertInputSource::StreamingWithFormat(format, start, input_context_ref) =
363364
&mut insert.source
364365
{
@@ -368,7 +369,7 @@ pub async fn clickhouse_handler_post(
368369
.await
369370
.map_err(InternalServerError)?;
370371

371-
let table_schema = infer_table_schema(&schema)
372+
let table_schema = infer_table_schema(&dest_schema)
372373
.map_err(|err| err.display_with_sql(&sql))
373374
.map_err(InternalServerError)?;
374375
let input_context = Arc::new(
@@ -419,7 +420,7 @@ pub async fn clickhouse_handler_post(
419420
.await
420421
.map_err(InternalServerError)?;
421422

422-
let table_schema = infer_table_schema(&schema)
423+
let table_schema = infer_table_schema(&dest_schema)
423424
.map_err(|err| err.display_with_sql(&sql))
424425
.map_err(InternalServerError)?;
425426
let input_context = Arc::new(
@@ -468,7 +469,7 @@ pub async fn clickhouse_handler_post(
468469
.map_err(|err| err.display_with_sql(&sql))
469470
.map_err(BadRequest)?;
470471

471-
execute(ctx, interpreter, schema, format, params, handle)
472+
execute(ctx, interpreter, output_schema, format, params, handle)
472473
.await
473474
.map_err(|err| err.display_with_sql(&sql))
474475
.map_err(InternalServerError)

src/query/service/src/servers/http/v1/load.rs

Lines changed: 79 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -141,89 +141,91 @@ pub async fn streaming_load(
141141
.await
142142
.map_err(InternalServerError)?;
143143

144-
let schema = plan.schema();
145144
match &mut plan {
146-
Plan::Insert(insert) => match &mut insert.source {
147-
InsertInputSource::StreamingWithFileFormat {
148-
format,
149-
on_error_mode,
150-
start,
151-
input_context_option,
152-
} => {
153-
let sql_rest = &insert_sql[*start..].trim();
154-
if !sql_rest.is_empty() {
155-
return Err(poem::Error::from_string(
156-
"should NOT have data after `FILE_FORMAT` in streaming load.",
157-
StatusCode::BAD_REQUEST,
158-
));
159-
};
160-
let to_table = context
161-
.get_table(&insert.catalog, &insert.database, &insert.table)
162-
.await
163-
.map_err(|err| err.display_with_sql(insert_sql))
164-
.map_err(InternalServerError)?;
165-
let (tx, rx) = tokio::sync::mpsc::channel(2);
145+
Plan::Insert(insert) => {
146+
let schema = insert.dest_schema();
147+
match &mut insert.source {
148+
InsertInputSource::StreamingWithFileFormat {
149+
format,
150+
on_error_mode,
151+
start,
152+
input_context_option,
153+
} => {
154+
let sql_rest = &insert_sql[*start..].trim();
155+
if !sql_rest.is_empty() {
156+
return Err(poem::Error::from_string(
157+
"should NOT have data after `FILE_FORMAT` in streaming load.",
158+
StatusCode::BAD_REQUEST,
159+
));
160+
};
161+
let to_table = context
162+
.get_table(&insert.catalog, &insert.database, &insert.table)
163+
.await
164+
.map_err(|err| err.display_with_sql(insert_sql))
165+
.map_err(InternalServerError)?;
166+
let (tx, rx) = tokio::sync::mpsc::channel(2);
166167

167-
let table_schema = infer_table_schema(&schema)
168-
.map_err(|err| err.display_with_sql(insert_sql))
169-
.map_err(InternalServerError)?;
170-
let input_context = Arc::new(
171-
InputContext::try_create_from_insert_file_format(
172-
context.clone(),
173-
rx,
174-
context.get_settings(),
175-
format.clone(),
176-
table_schema,
177-
context.get_scan_progress(),
178-
false,
179-
to_table.get_block_thresholds(),
180-
on_error_mode.clone(),
181-
)
182-
.await
183-
.map_err(|err| err.display_with_sql(insert_sql))
184-
.map_err(InternalServerError)?,
185-
);
186-
*input_context_option = Some(input_context.clone());
187-
info!("streaming load with file_format {:?}", input_context);
168+
let table_schema = infer_table_schema(&schema)
169+
.map_err(|err| err.display_with_sql(insert_sql))
170+
.map_err(InternalServerError)?;
171+
let input_context = Arc::new(
172+
InputContext::try_create_from_insert_file_format(
173+
context.clone(),
174+
rx,
175+
context.get_settings(),
176+
format.clone(),
177+
table_schema,
178+
context.get_scan_progress(),
179+
false,
180+
to_table.get_block_thresholds(),
181+
on_error_mode.clone(),
182+
)
183+
.await
184+
.map_err(|err| err.display_with_sql(insert_sql))
185+
.map_err(InternalServerError)?,
186+
);
187+
*input_context_option = Some(input_context.clone());
188+
info!("streaming load with file_format {:?}", input_context);
188189

189-
let query_id = context.get_id();
190-
let handler = context.spawn(query_id, execute_query(context.clone(), plan));
191-
let files = read_multi_part(multipart, tx, &input_context).await?;
190+
let query_id = context.get_id();
191+
let handler = context.spawn(query_id, execute_query(context.clone(), plan));
192+
let files = read_multi_part(multipart, tx, &input_context).await?;
192193

193-
match handler.await {
194-
Ok(Ok(_)) => Ok(Json(LoadResponse {
195-
error: None,
196-
state: "SUCCESS".to_string(),
197-
id: uuid::Uuid::new_v4().to_string(),
198-
stats: context.get_scan_progress_value(),
199-
files,
200-
})),
201-
Ok(Err(cause)) => Err(poem::Error::from_string(
202-
format!(
203-
"execute fail: {}",
204-
cause.display_with_sql(insert_sql).message()
205-
),
206-
StatusCode::BAD_REQUEST,
207-
)),
208-
Err(_) => Err(poem::Error::from_string(
209-
"Maybe panic.",
210-
StatusCode::INTERNAL_SERVER_ERROR,
211-
)),
194+
match handler.await {
195+
Ok(Ok(_)) => Ok(Json(LoadResponse {
196+
error: None,
197+
state: "SUCCESS".to_string(),
198+
id: uuid::Uuid::new_v4().to_string(),
199+
stats: context.get_scan_progress_value(),
200+
files,
201+
})),
202+
Ok(Err(cause)) => Err(poem::Error::from_string(
203+
format!(
204+
"execute fail: {}",
205+
cause.display_with_sql(insert_sql).message()
206+
),
207+
StatusCode::BAD_REQUEST,
208+
)),
209+
Err(_) => Err(poem::Error::from_string(
210+
"Maybe panic.",
211+
StatusCode::INTERNAL_SERVER_ERROR,
212+
)),
213+
}
212214
}
213-
}
214-
InsertInputSource::StreamingWithFormat(_, _, _) => Err(poem::Error::from_string(
215-
"'INSERT INTO $table FORMAT <type> is now only supported in clickhouse handler,\
215+
InsertInputSource::StreamingWithFormat(_, _, _) => Err(poem::Error::from_string(
216+
"'INSERT INTO $table FORMAT <type> is now only supported in clickhouse handler,\
216217
please use 'FILE_FORMAT = (type = <type> ...)' instead.",
217-
StatusCode::BAD_REQUEST,
218-
)),
219-
_non_supported_source => Err(poem::Error::from_string(
220-
format!(
221-
"streaming upload only support 'INSERT INTO $table FILE_FORMAT = (type = <type> ...)' got {}.",
222-
plan
223-
),
224-
StatusCode::BAD_REQUEST,
225-
)),
226-
},
218+
StatusCode::BAD_REQUEST,
219+
)),
220+
_non_supported_source => Err(poem::Error::from_string(
221+
format!(
222+
"streaming upload only support 'INSERT INTO $table FILE_FORMAT = (type = <type> ...)' got {}.",
223+
plan
224+
),
225+
StatusCode::BAD_REQUEST,
226+
)),
227+
}
228+
}
227229
non_insert_plan => Err(poem::Error::from_string(
228230
format!(
229231
"Only supports INSERT statement in streaming load, but got {}",

src/query/sql/src/planner/plans/insert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl PartialEq for Insert {
7575
}
7676

7777
impl Insert {
78-
pub fn schema(&self) -> DataSchemaRef {
78+
pub fn dest_schema(&self) -> DataSchemaRef {
7979
Arc::new(self.schema.clone().into())
8080
}
8181

src/query/sql/src/planner/plans/plan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,6 @@ impl Plan {
454454
Plan::ShowRoles(plan) => plan.schema(),
455455
Plan::ShowGrants(plan) => plan.schema(),
456456
Plan::ShowFileFormats(plan) => plan.schema(),
457-
Plan::Insert(plan) => plan.schema(),
458457
Plan::Replace(plan) => plan.schema(),
459458
Plan::Presign(plan) => plan.schema(),
460459
Plan::ShowShareEndpoint(plan) => plan.schema(),

0 commit comments

Comments
 (0)