Skip to content

Commit 3a38218

Browse files
authored
Merge pull request #7756 from youngsofun/load
refactor(query): streaming load use planner v2.
2 parents a74637c + f80c709 commit 3a38218

File tree

3 files changed

+67
-133
lines changed

3 files changed

+67
-133
lines changed

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,8 @@ impl Interpreter for InsertInterpreterV2 {
171171
}
172172
build_res.main_pipeline.add_pipe(builder.finalize());
173173
}
174-
InsertInputSource::StreamingWithFormat(_) => {
175-
build_res.main_pipeline.add_pipe(
176-
((*self.source_pipe_builder.lock()).clone())
177-
.ok_or_else(|| ErrorCode::EmptyData("empty source pipe builder"))?
178-
.finalize(),
179-
);
174+
InsertInputSource::StreamingWithFormat(_, pipe) => {
175+
build_res.main_pipeline.add_pipe(pipe.clone());
180176
}
181177
InsertInputSource::SelectPlan(plan) => {
182178
let table1 = table.clone();

src/query/service/src/servers/http/v1/load.rs

Lines changed: 61 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,10 @@ use std::sync::Arc;
1717

1818
use common_base::base::ProgressValues;
1919
use common_base::base::TrySpawn;
20-
use common_datavalues::DataSchemaRef;
2120
use common_exception::ErrorCode;
2221
use common_exception::Result;
2322
use common_formats::FormatFactory;
2423
use common_io::prelude::parse_escape_string;
25-
use common_io::prelude::FormatSettings;
26-
use common_legacy_planners::InsertInputSource;
27-
use common_legacy_planners::PlanNode;
2824
use futures::StreamExt;
2925
use poem::error::InternalServerError;
3026
use poem::error::Result as PoemResult;
@@ -37,14 +33,14 @@ use serde::Serialize;
3733
use tracing::error;
3834

3935
use super::HttpQueryContext;
40-
use crate::interpreters::InterpreterFactory;
41-
use crate::pipelines::SourcePipeBuilder;
36+
use crate::interpreters::InterpreterFactoryV2;
4237
use crate::servers::http::v1::multipart_format::MultipartFormat;
43-
use crate::servers::http::v1::multipart_format::MultipartWorker;
4438
use crate::sessions::QueryContext;
4539
use crate::sessions::SessionType;
4640
use crate::sessions::TableContext;
47-
use crate::sql::PlanParser;
41+
use crate::sql::plans::InsertInputSource;
42+
use crate::sql::plans::Plan;
43+
use crate::sql::Planner;
4844

4945
#[derive(Serialize, Deserialize, Debug)]
5046
pub struct LoadResponse {
@@ -55,32 +51,15 @@ pub struct LoadResponse {
5551
pub files: Vec<String>,
5652
}
5753

58-
fn get_input_format(node: &PlanNode) -> Result<&str> {
59-
match node {
60-
PlanNode::Insert(insert) => match &insert.source {
61-
InsertInputSource::StreamingWithFormat(format) => Ok(format),
62-
_ => Err(ErrorCode::UnknownFormat("Not found format name in plan")),
63-
},
64-
_ => Err(ErrorCode::UnknownFormat("Not found format name in plan")),
65-
}
66-
}
67-
6854
#[allow(clippy::manual_async_fn)]
69-
fn execute_query(
70-
context: Arc<QueryContext>,
71-
node: PlanNode,
72-
source_builder: SourcePipeBuilder,
73-
) -> impl Future<Output = Result<()>> {
55+
fn execute_query(context: Arc<QueryContext>, plan: Plan) -> impl Future<Output = Result<()>> {
7456
async move {
75-
let interpreter = InterpreterFactory::get(context.clone(), node).await?;
57+
let interpreter = InterpreterFactoryV2::get(context.clone(), &plan).await?;
7658

7759
if let Err(cause) = interpreter.start().await {
7860
error!("interpreter.start error: {:?}", cause);
7961
}
8062

81-
// TODO(Winter): very hack code. need remove it.
82-
interpreter.set_source_pipe_builder(Option::from(source_builder))?;
83-
8463
let mut data_stream = interpreter.execute(context).await?;
8564

8665
while let Some(_block) = data_stream.next().await {}
@@ -94,37 +73,6 @@ fn execute_query(
9473
}
9574
}
9675

97-
async fn new_processor_format(
98-
ctx: &Arc<QueryContext>,
99-
node: &PlanNode,
100-
multipart: Multipart,
101-
) -> Result<Json<LoadResponse>> {
102-
let format = get_input_format(node)?;
103-
let format_settings = ctx.get_format_settings()?;
104-
105-
let (mut worker, builder) =
106-
format_source_pipe_builder(format, ctx, node.schema(), multipart, &format_settings)?;
107-
108-
let handler = ctx.spawn(execute_query(ctx.clone(), node.clone(), builder));
109-
110-
worker.work().await;
111-
let files = worker.get_files();
112-
113-
match handler.await {
114-
Ok(Ok(_)) => Ok(()),
115-
Ok(Err(cause)) => Err(cause),
116-
Err(_) => Err(ErrorCode::TokioError("Maybe panic.")),
117-
}?;
118-
119-
Ok(Json(LoadResponse {
120-
error: None,
121-
state: "SUCCESS".to_string(),
122-
id: uuid::Uuid::new_v4().to_string(),
123-
stats: ctx.get_scan_progress_value(),
124-
files,
125-
}))
126-
}
127-
12876
#[poem::handler]
12977
pub async fn streaming_load(
13078
ctx: &HttpQueryContext,
@@ -155,95 +103,83 @@ pub async fn streaming_load(
155103
}
156104
}
157105

158-
let mut plan = PlanParser::parse(context.clone(), insert_sql)
106+
let mut planner = Planner::new(context.clone());
107+
let (mut plan, _, _) = planner
108+
.plan_sql(insert_sql)
159109
.await
160110
.map_err(InternalServerError)?;
161111
context.attach_query_str(insert_sql);
162112

163-
// Block size.
164-
let _max_block_size = settings.get_max_block_size().map_err(InternalServerError)? as usize;
165-
166113
let format_settings = context.get_format_settings().map_err(InternalServerError)?;
167-
let source_pipe_builder: SourcePipeBuilder = match &mut plan {
168-
PlanNode::Insert(insert) => match &mut insert.source {
169-
InsertInputSource::StreamingWithFormat(format) => {
170-
if FormatFactory::instance().has_input(format.as_str()) {
114+
let schema = plan.schema();
115+
match &mut plan {
116+
Plan::Insert(insert) => match &mut insert.source {
117+
InsertInputSource::StrWithFormat((sql_rest, format)) => {
118+
if !sql_rest.is_empty() {
119+
Err(poem::Error::from_string(
120+
"should NOT have data after `Format` in streaming load.",
121+
StatusCode::BAD_REQUEST,
122+
))
123+
} else if FormatFactory::instance().has_input(format.as_str()) {
171124
let new_format = format!("{}WithNames", format);
172125
if format_settings.skip_header > 0
173126
&& FormatFactory::instance().has_input(new_format.as_str())
174127
{
175128
*format = new_format;
176129
}
177130

178-
return match new_processor_format(&context, &plan, multipart).await {
179-
Ok(res) => Ok(res),
180-
Err(cause) => Err(InternalServerError(cause)),
181-
};
131+
let format_settings =
132+
context.get_format_settings().map_err(InternalServerError)?;
133+
134+
let (mut worker, builder) = MultipartFormat::input_sources(
135+
format,
136+
context.clone(),
137+
multipart,
138+
schema,
139+
format_settings.clone(),
140+
)
141+
.map_err(InternalServerError)?;
142+
143+
insert.source = InsertInputSource::StreamingWithFormat(
144+
format.to_string(),
145+
builder.finalize(),
146+
);
147+
148+
let handler = context.spawn(execute_query(context.clone(), plan));
149+
150+
worker.work().await;
151+
let files = worker.get_files();
152+
153+
match handler.await {
154+
Ok(Ok(_)) => Ok(Json(LoadResponse {
155+
error: None,
156+
state: "SUCCESS".to_string(),
157+
id: uuid::Uuid::new_v4().to_string(),
158+
stats: context.get_scan_progress_value(),
159+
files,
160+
})),
161+
Ok(Err(cause)) => Err(cause),
162+
Err(_) => Err(ErrorCode::TokioError("Maybe panic.")),
163+
}
164+
.map_err(InternalServerError)
165+
} else {
166+
Err(poem::Error::from_string(
167+
format!("format not supported for streaming load {}", format),
168+
StatusCode::BAD_REQUEST,
169+
))
182170
}
183-
184-
Err(poem::Error::from_string(
185-
format!(
186-
"Streaming load only supports csv format, but got {}",
187-
format
188-
),
189-
StatusCode::BAD_REQUEST,
190-
))
191171
}
192172
_non_supported_source => Err(poem::Error::from_string(
193-
"Only supports streaming upload. e.g. INSERT INTO $table FORMAT CSV",
173+
"Only supports streaming upload. e.g. INSERT INTO $table FORMAT CSV, got insert ... select.",
194174
StatusCode::BAD_REQUEST,
195175
)),
196176
},
197177
non_insert_plan => Err(poem::Error::from_string(
198178
format!(
199179
"Only supports INSERT statement in streaming load, but got {}",
200-
non_insert_plan.name()
180+
non_insert_plan
201181
),
202182
StatusCode::BAD_REQUEST,
203183
)),
204-
}?;
205-
let interpreter = InterpreterFactory::get(context.clone(), plan.clone())
206-
.await
207-
.map_err(InternalServerError)?;
208-
let _ = interpreter
209-
.set_source_pipe_builder(Some(source_pipe_builder))
210-
.map_err(|e| error!("interpreter.set_source_pipe_builder.error: {:?}", e));
211-
interpreter.start().await.map_err(InternalServerError)?;
212-
let mut data_stream = interpreter
213-
.execute(context.clone())
214-
.await
215-
.map_err(InternalServerError)?;
216-
while let Some(_block) = data_stream.next().await {}
217-
// Write Finish to query log table.
218-
let _ = interpreter
219-
.finish()
220-
.await
221-
.map_err(|e| error!("interpreter.finish error: {:?}", e));
222-
223-
// TODO generate id
224-
// TODO duplicate by insert_label
225-
let mut id = uuid::Uuid::new_v4().to_string();
226-
Ok(Json(LoadResponse {
227-
id,
228-
state: "SUCCESS".to_string(),
229-
stats: context.get_scan_progress_value(),
230-
error: None,
231-
files: vec![],
232-
}))
233-
}
234-
235-
fn format_source_pipe_builder(
236-
format: &str,
237-
context: &Arc<QueryContext>,
238-
schema: DataSchemaRef,
239-
multipart: Multipart,
240-
format_settings: &FormatSettings,
241-
) -> Result<(Box<dyn MultipartWorker>, SourcePipeBuilder)> {
242-
MultipartFormat::input_sources(
243-
format,
244-
context.clone(),
245-
multipart,
246-
schema,
247-
format_settings.clone(),
248-
)
184+
}
249185
}

src/query/service/src/sql/planner/plans/insert.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use common_datablocks::DataBlock;
1616
use common_datavalues::DataSchemaRef;
1717
use common_meta_types::MetaId;
18+
use common_pipeline_core::Pipe;
1819

1920
use super::Plan;
2021

@@ -23,7 +24,8 @@ pub enum InsertInputSource {
2324
#[serde(skip)]
2425
SelectPlan(Box<Plan>),
2526
// From outside streaming source
26-
StreamingWithFormat(String),
27+
#[serde(skip)]
28+
StreamingWithFormat(String, Pipe),
2729
// From cloned String and format
2830
StrWithFormat((String, String)),
2931
}
@@ -66,7 +68,7 @@ impl Insert {
6668
pub fn format(&self) -> Option<&str> {
6769
match &self.source {
6870
InsertInputSource::SelectPlan(_) => None,
69-
InsertInputSource::StreamingWithFormat(v) => Some(v.as_str()),
71+
InsertInputSource::StreamingWithFormat(v, _) => Some(v.as_str()),
7072
InsertInputSource::StrWithFormat((_, v)) => Some(v.as_str()),
7173
}
7274
}

0 commit comments

Comments
 (0)