Skip to content

Commit 569fabe

Browse files
committed
improve with reviewers suggestion
1 parent 615f3f6 commit 569fabe

File tree

3 files changed

+46
-12
lines changed

3 files changed

+46
-12
lines changed

src/meta/types/src/user_stage.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -184,16 +184,7 @@ impl FromStr for StageFileFormatType {
184184

185185
impl ToString for StageFileFormatType {
186186
fn to_string(&self) -> String {
187-
match *self {
188-
StageFileFormatType::Csv => "CSV".to_string(),
189-
StageFileFormatType::Tsv => "Tsv".to_string(),
190-
StageFileFormatType::Json => "Json".to_string(),
191-
StageFileFormatType::NdJson => "NdJson".to_string(),
192-
StageFileFormatType::Avro => "Avro".to_string(),
193-
StageFileFormatType::Orc => "Orc".to_string(),
194-
StageFileFormatType::Parquet => "Parquet".to_string(),
195-
StageFileFormatType::Xml => "Xml".to_string(),
196-
}
187+
format!("{:?}", *self)
197188
}
198189
}
199190

src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use common_formats::ClickhouseFormatType;
2929
use common_formats::FileFormatOptionsExt;
3030
use common_formats::FileFormatTypeExt;
3131
use common_formats::RecordDelimiter;
32+
use common_meta_types::FileFormatOptions;
3233
use common_meta_types::StageFileCompression;
3334
use common_meta_types::StageFileFormatType;
3435
use common_meta_types::UserStageInfo;
@@ -253,6 +254,48 @@ impl InputContext {
253254
})
254255
}
255256

257+
pub async fn try_create_from_insert_v2(
258+
stream_receiver: Receiver<Result<StreamingReadBatch>>,
259+
settings: Arc<Settings>,
260+
file_format_options: FileFormatOptions,
261+
schema: DataSchemaRef,
262+
scan_progress: Arc<Progress>,
263+
is_multi_part: bool,
264+
block_compact_thresholds: BlockCompactThresholds,
265+
) -> Result<Self> {
266+
let read_batch_size = settings.get_input_read_buffer_size()? as usize;
267+
let format_typ = file_format_options.format.clone();
268+
let file_format_options =
269+
StageFileFormatType::get_ext_from_stage(file_format_options, &settings)?;
270+
let file_format_options = format_typ.final_file_format_options(&file_format_options)?;
271+
let format = Self::get_input_format(&format_typ)?;
272+
let field_delimiter = file_format_options.get_field_delimiter();
273+
let record_delimiter = file_format_options.get_record_delimiter()?;
274+
let rows_to_skip = file_format_options.stage.skip_header as usize;
275+
let compression = file_format_options.stage.compression;
276+
277+
let plan = StreamPlan {
278+
is_multi_part,
279+
compression,
280+
};
281+
282+
Ok(InputContext {
283+
format,
284+
schema,
285+
settings,
286+
record_delimiter,
287+
read_batch_size,
288+
rows_to_skip,
289+
field_delimiter,
290+
scan_progress,
291+
source: InputSource::Stream(Mutex::new(Some(stream_receiver))),
292+
plan: InputPlan::StreamingLoad(plan),
293+
splits: vec![],
294+
block_compact_thresholds,
295+
format_options: file_format_options,
296+
})
297+
}
298+
256299
pub fn num_prefetch_splits(&self) -> Result<usize> {
257300
Ok(self.settings.get_max_threads()? as usize)
258301
}

src/query/service/src/servers/http/clickhouse_handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,10 +320,10 @@ pub async fn clickhouse_handler_post(
320320
.map_err(InternalServerError)?;
321321

322322
let input_context = Arc::new(
323-
InputContext::try_create_from_insert(
324-
option_settings.format.to_string().as_str(),
323+
InputContext::try_create_from_insert_v2(
325324
rx,
326325
ctx.get_settings(),
326+
option_settings.clone(),
327327
schema,
328328
ctx.get_scan_progress(),
329329
false,

0 commit comments

Comments
 (0)