Skip to content

Commit 0d8217c

Browse files
authored
Merge pull request #9063 from ariesdevil/stream-load-refactor
feat(streaming_load): Support file_format syntax in streaming load insert sql
2 parents 6c270ba + b182a49 commit 0d8217c

File tree

13 files changed

+251
-13
lines changed

13 files changed

+251
-13
lines changed

src/meta/types/src/user_stage.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,24 @@ impl FromStr for StageFileCompression {
127127
}
128128
}
129129

130+
impl ToString for StageFileCompression {
131+
fn to_string(&self) -> String {
132+
match *self {
133+
StageFileCompression::Auto => "auto".to_string(),
134+
StageFileCompression::Gzip => "gzip".to_string(),
135+
StageFileCompression::Bz2 => "bz2".to_string(),
136+
StageFileCompression::Brotli => "brotli".to_string(),
137+
StageFileCompression::Zstd => "zstd".to_string(),
138+
StageFileCompression::Deflate => "deflate".to_string(),
139+
StageFileCompression::RawDeflate => "raw_deflate".to_string(),
140+
StageFileCompression::Lzo => "lzo".to_string(),
141+
StageFileCompression::Snappy => "snappy".to_string(),
142+
StageFileCompression::Xz => "xz".to_string(),
143+
StageFileCompression::None => "none".to_string(),
144+
}
145+
}
146+
}
147+
130148
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
131149
pub enum StageFileFormatType {
132150
Csv,
@@ -164,6 +182,12 @@ impl FromStr for StageFileFormatType {
164182
}
165183
}
166184

185+
impl ToString for StageFileFormatType {
186+
fn to_string(&self) -> String {
187+
format!("{:?}", *self)
188+
}
189+
}
190+
167191
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
168192
#[serde(default)]
169193
pub struct FileFormatOptions {

src/query/ast/src/ast/format/ast_format.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -963,6 +963,24 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor {
963963
let streaming_node = FormatTreeNode::new(streaming_format_ctx);
964964
self.children.push(streaming_node);
965965
}
966+
InsertSource::StreamingV2 { settings, .. } => {
967+
let mut file_formats_children = Vec::with_capacity(settings.len());
968+
for (k, v) in settings.iter() {
969+
let file_format_name = format!("FileFormat {} = {:?}", k, v);
970+
let file_format_format_ctx = AstFormatContext::new(file_format_name);
971+
let file_format_node = FormatTreeNode::new(file_format_format_ctx);
972+
file_formats_children.push(file_format_node);
973+
}
974+
let file_formats_format_name = "StreamSourceFileFormats".to_string();
975+
let files_formats_format_ctx = AstFormatContext::with_children(
976+
file_formats_format_name,
977+
file_formats_children.len(),
978+
);
979+
let files_formats_node =
980+
FormatTreeNode::with_children(files_formats_format_ctx, file_formats_children);
981+
982+
self.children.push(files_formats_node);
983+
}
966984
InsertSource::Values { .. } => {
967985
let values_name = "ValueSource".to_string();
968986
let values_format_ctx = AstFormatContext::new(values_name);

src/query/ast/src/ast/format/syntax/dml.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,25 @@ fn pretty_source(source: InsertSource) -> RcDoc {
8484
.append(RcDoc::text(rest_str.to_string()))
8585
.append(RcDoc::text(start.to_string())),
8686
),
87+
InsertSource::StreamingV2 { settings, start } => RcDoc::text("FILE_FORMAT").append(
88+
RcDoc::line()
89+
.append(RcDoc::text("FILE_FORMAT_SETTINGS = "))
90+
.append(parenthenized(
91+
interweave_comma(settings.iter().map(|(k, v)| {
92+
RcDoc::text(k.to_string())
93+
.append(RcDoc::space())
94+
.append(RcDoc::text("="))
95+
.append(RcDoc::space())
96+
.append(RcDoc::text(format!("{:?}", v)))
97+
}))
98+
.group(),
99+
))
100+
.append(
101+
RcDoc::text("start:")
102+
.append(RcDoc::space())
103+
.append(RcDoc::text(start.to_string())),
104+
),
105+
),
87106
InsertSource::Values { rest_str } => RcDoc::text("VALUES").append(
88107
RcDoc::line()
89108
.nest(NEST_FACTOR)

src/query/ast/src/ast/statements/insert.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::fmt::Display;
1617
use std::fmt::Formatter;
1718

@@ -61,6 +62,10 @@ pub enum InsertSource<'a> {
6162
rest_str: &'a str,
6263
start: usize,
6364
},
65+
StreamingV2 {
66+
settings: BTreeMap<String, String>,
67+
start: usize,
68+
},
6469
Values {
6570
rest_str: &'a str,
6671
},
@@ -76,8 +81,13 @@ impl Display for InsertSource<'_> {
7681
format,
7782
rest_str,
7883
start: _,
79-
} => {
80-
write!(f, "FORMAT {format} {rest_str}")
84+
} => write!(f, "FORMAT {format} {rest_str}"),
85+
InsertSource::StreamingV2 { settings, start: _ } => {
86+
write!(f, " FILE_FORMAT = (")?;
87+
for (k, v) in settings.iter() {
88+
write!(f, " {} = '{}'", k, v)?;
89+
}
90+
write!(f, " )")
8191
}
8292
InsertSource::Values { rest_str } => write!(f, "VALUES {rest_str}"),
8393
InsertSource::Select { query } => write!(f, "{query}"),

src/query/ast/src/parser/statement.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1091,14 +1091,23 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
10911091
pub fn insert_source(i: Input) -> IResult<InsertSource> {
10921092
let streaming = map(
10931093
rule! {
1094-
FORMAT ~ #ident ~ #rest_str
1094+
FORMAT ~ #ident ~ #rest_str
10951095
},
10961096
|(_, format, (rest_str, start))| InsertSource::Streaming {
10971097
format: format.name,
10981098
rest_str,
10991099
start,
11001100
},
11011101
);
1102+
let streaming_v2 = map(
1103+
rule! {
1104+
FILE_FORMAT ~ "=" ~ #options ~ #rest_str
1105+
},
1106+
|(_, _, options, (_, start))| InsertSource::StreamingV2 {
1107+
settings: options,
1108+
start,
1109+
},
1110+
);
11021111
let values = map(
11031112
rule! {
11041113
VALUES ~ #rest_str
@@ -1111,6 +1120,7 @@ pub fn insert_source(i: Input) -> IResult<InsertSource> {
11111120

11121121
rule!(
11131122
#streaming
1123+
| #streaming_v2
11141124
| #values
11151125
| #query
11161126
)(i)

src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use common_formats::ClickhouseFormatType;
2929
use common_formats::FileFormatOptionsExt;
3030
use common_formats::FileFormatTypeExt;
3131
use common_formats::RecordDelimiter;
32+
use common_meta_types::FileFormatOptions;
3233
use common_meta_types::StageFileCompression;
3334
use common_meta_types::StageFileFormatType;
3435
use common_meta_types::UserStageInfo;
@@ -253,6 +254,48 @@ impl InputContext {
253254
})
254255
}
255256

257+
pub async fn try_create_from_insert_v2(
258+
stream_receiver: Receiver<Result<StreamingReadBatch>>,
259+
settings: Arc<Settings>,
260+
file_format_options: FileFormatOptions,
261+
schema: DataSchemaRef,
262+
scan_progress: Arc<Progress>,
263+
is_multi_part: bool,
264+
block_compact_thresholds: BlockCompactThresholds,
265+
) -> Result<Self> {
266+
let read_batch_size = settings.get_input_read_buffer_size()? as usize;
267+
let format_typ = file_format_options.format.clone();
268+
let file_format_options =
269+
StageFileFormatType::get_ext_from_stage(file_format_options, &settings)?;
270+
let file_format_options = format_typ.final_file_format_options(&file_format_options)?;
271+
let format = Self::get_input_format(&format_typ)?;
272+
let field_delimiter = file_format_options.get_field_delimiter();
273+
let record_delimiter = file_format_options.get_record_delimiter()?;
274+
let rows_to_skip = file_format_options.stage.skip_header as usize;
275+
let compression = file_format_options.stage.compression;
276+
277+
let plan = StreamPlan {
278+
is_multi_part,
279+
compression,
280+
};
281+
282+
Ok(InputContext {
283+
format,
284+
schema,
285+
settings,
286+
record_delimiter,
287+
read_batch_size,
288+
rows_to_skip,
289+
field_delimiter,
290+
scan_progress,
291+
source: InputSource::Stream(Mutex::new(Some(stream_receiver))),
292+
plan: InputPlan::StreamingLoad(plan),
293+
splits: vec![],
294+
block_compact_thresholds,
295+
format_options: file_format_options,
296+
})
297+
}
298+
256299
pub fn num_prefetch_splits(&self) -> Result<usize> {
257300
Ok(self.settings.get_max_threads()? as usize)
258301
}

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ impl Interpreter for InsertInterpreterV2 {
149149
.format
150150
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
151151
}
152+
InsertInputSource::StreamingWithFileFormat(_, _, input_context) => {
153+
let input_context = input_context.as_ref().expect("must success").clone();
154+
input_context
155+
.format
156+
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
157+
}
152158
InsertInputSource::SelectPlan(plan) => {
153159
let table1 = table.clone();
154160
let (mut select_plan, select_column_bindings) = match plan.as_ref() {
@@ -228,7 +234,8 @@ impl Interpreter for InsertInterpreterV2 {
228234
}
229235

230236
let append_mode = match &self.plan.source {
231-
InsertInputSource::StreamingWithFormat(_, _, _) => AppendMode::Copy,
237+
InsertInputSource::StreamingWithFormat(..)
238+
| InsertInputSource::StreamingWithFileFormat(..) => AppendMode::Copy,
232239
_ => AppendMode::Normal,
233240
};
234241

src/query/service/src/servers/http/clickhouse_handler.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,47 @@ pub async fn clickhouse_handler_post(
290290
"clickhouse insert with format {:?}, value {}",
291291
input_context, *start
292292
);
293+
let compression_alg = input_context.get_compression_alg("").map_err(BadRequest)?;
294+
let start = *start;
295+
handle = Some(ctx.spawn(async move {
296+
gen_batches(
297+
sql,
298+
start,
299+
input_context.read_batch_size,
300+
tx,
301+
compression_alg,
302+
)
303+
.await
304+
}));
305+
} else if let InsertInputSource::StreamingWithFileFormat(
306+
option_settings,
307+
start,
308+
input_context_ref,
309+
) = &mut insert.source
310+
{
311+
let (tx, rx) = tokio::sync::mpsc::channel(2);
312+
let to_table = ctx
313+
.get_table(&insert.catalog, &insert.database, &insert.table)
314+
.await
315+
.map_err(InternalServerError)?;
316+
317+
let input_context = Arc::new(
318+
InputContext::try_create_from_insert_v2(
319+
rx,
320+
ctx.get_settings(),
321+
option_settings.clone(),
322+
schema,
323+
ctx.get_scan_progress(),
324+
false,
325+
to_table.get_block_compact_thresholds(),
326+
)
327+
.await
328+
.map_err(InternalServerError)?,
329+
);
330+
331+
*input_context_ref = Some(input_context.clone());
332+
info!("clickhouse insert with file_format {:?}", input_context);
333+
293334
let compression_alg = input_context.get_compression_alg("").map_err(BadRequest)?;
294335
let start = *start;
295336
handle = Some(ctx.spawn(async move {

src/query/service/src/servers/http/v1/load.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub async fn streaming_load(
100100
.unwrap_or("");
101101

102102
let settings = context.get_settings();
103+
103104
for (key, value) in req.headers().iter() {
104105
if settings.has_setting(key.as_str()) {
105106
let value = value.to_str().map_err(InternalServerError)?;
@@ -135,6 +136,7 @@ pub async fn streaming_load(
135136
.await
136137
.map_err(InternalServerError)?;
137138
let (tx, rx) = tokio::sync::mpsc::channel(2);
139+
138140
let input_context = Arc::new(
139141
InputContext::try_create_from_insert(
140142
format.as_str(),
@@ -172,8 +174,63 @@ pub async fn streaming_load(
172174
)),
173175
}
174176
}
177+
InsertInputSource::StreamingWithFileFormat(
178+
option_settings,
179+
start,
180+
input_context_ref,
181+
) => {
182+
let sql_rest = &insert_sql[*start..].trim();
183+
if !sql_rest.is_empty() {
184+
return Err(poem::Error::from_string(
185+
"should NOT have data after `FILE_FORMAT` in streaming load.",
186+
StatusCode::BAD_REQUEST,
187+
));
188+
};
189+
let to_table = context
190+
.get_table(&insert.catalog, &insert.database, &insert.table)
191+
.await
192+
.map_err(InternalServerError)?;
193+
let (tx, rx) = tokio::sync::mpsc::channel(2);
194+
195+
let input_context = Arc::new(
196+
InputContext::try_create_from_insert_v2(
197+
rx,
198+
context.get_settings(),
199+
option_settings.clone(),
200+
schema,
201+
context.get_scan_progress(),
202+
false,
203+
to_table.get_block_compact_thresholds(),
204+
)
205+
.await
206+
.map_err(InternalServerError)?,
207+
);
208+
*input_context_ref = Some(input_context.clone());
209+
tracing::info!("streaming load with file_format {:?}", input_context);
210+
211+
let handler = context.spawn(execute_query(context.clone(), plan));
212+
let files = read_multi_part(multipart, tx, &input_context).await?;
213+
214+
match handler.await {
215+
Ok(Ok(_)) => Ok(Json(LoadResponse {
216+
error: None,
217+
state: "SUCCESS".to_string(),
218+
id: uuid::Uuid::new_v4().to_string(),
219+
stats: context.get_scan_progress_value(),
220+
files,
221+
})),
222+
Ok(Err(cause)) => Err(poem::Error::from_string(
223+
format!("execute fail: {}", cause.message()),
224+
StatusCode::BAD_REQUEST,
225+
)),
226+
Err(_) => Err(poem::Error::from_string(
227+
"Maybe panic.",
228+
StatusCode::INTERNAL_SERVER_ERROR,
229+
)),
230+
}
231+
}
175232
_non_supported_source => Err(poem::Error::from_string(
176-
"Only supports streaming upload. e.g. INSERT INTO $table FORMAT CSV, got insert ... select.",
233+
"Only supports streaming upload. e.g. INSERT INTO $table FILE_FORMAT = (type = 'CSV'), got insert ... select.",
177234
StatusCode::BAD_REQUEST,
178235
)),
179236
},

src/query/sql/src/planner/binder/insert.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::normalize_identifier;
2525
use crate::optimizer::optimize;
2626
use crate::optimizer::OptimizerConfig;
2727
use crate::optimizer::OptimizerContext;
28+
use crate::planner::binder::copy::parse_copy_file_format_options;
2829
use crate::plans::Insert;
2930
use crate::plans::InsertInputSource;
3031
use crate::plans::Plan;
@@ -89,6 +90,12 @@ impl<'a> Binder {
8990
Ok(InsertInputSource::StreamingWithFormat(format, start, None))
9091
}
9192
}
93+
InsertSource::StreamingV2 { settings, start } => {
94+
let opts = parse_copy_file_format_options(&settings)?;
95+
Ok(InsertInputSource::StreamingWithFileFormat(
96+
opts, start, None,
97+
))
98+
}
9299
InsertSource::Values { rest_str } => {
93100
let data = rest_str.trim_end_matches(';').trim_start().to_owned();
94101
Ok(InsertInputSource::Values(data))

0 commit comments

Comments
 (0)