Skip to content

Commit df5c265

Browse files
committed
fix: separate format & copy options for stage
1 parent ef514e3 commit df5c265

File tree

4 files changed

+96
-75
lines changed

4 files changed

+96
-75
lines changed

src/meta/types/src/user_stage.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::fmt;
1617
use std::str::FromStr;
1718

1819
use chrono::DateTime;
1920
use chrono::Utc;
21+
use common_exception::ErrorCode;
22+
use common_exception::Result;
2023
use common_io::consts::NAN_BYTES_SNAKE;
2124
use common_storage::StorageParams;
2225

@@ -338,6 +341,88 @@ impl UserStageInfo {
338341
StageType::User => format!("/stage/user/{}/", self.stage_name),
339342
}
340343
}
344+
345+
/// Apply the file format options.
346+
pub fn apply_format_options(&mut self, opts: &BTreeMap<String, String>) -> Result<()> {
347+
if opts.is_empty() {
348+
return Ok(());
349+
}
350+
for (k, v) in opts.iter() {
351+
match k.as_str() {
352+
"format" => {
353+
let format = StageFileFormatType::from_str(v)?;
354+
self.file_format_options.format = format;
355+
}
356+
"skip_header" => {
357+
let skip_header = u64::from_str(v)?;
358+
self.file_format_options.skip_header = skip_header;
359+
}
360+
"field_delimiter" => self.file_format_options.field_delimiter = v.clone(),
361+
"record_delimiter" => self.file_format_options.record_delimiter = v.clone(),
362+
"nan_display" => self.file_format_options.nan_display = v.clone(),
363+
"escape" => self.file_format_options.escape = v.clone(),
364+
"compression" => {
365+
let compression = StageFileCompression::from_str(v)?;
366+
self.file_format_options.compression = compression;
367+
}
368+
"row_tag" => self.file_format_options.row_tag = v.clone(),
369+
"quote" => self.file_format_options.quote = v.clone(),
370+
_ => {
371+
return Err(ErrorCode::BadArguments(format!(
372+
"Unknown stage file format option {}",
373+
k
374+
)));
375+
}
376+
}
377+
}
378+
Ok(())
379+
}
380+
381+
/// Apply the copy options.
382+
pub fn apply_copy_options(&mut self, opts: &BTreeMap<String, String>) -> Result<()> {
383+
if opts.is_empty() {
384+
return Ok(());
385+
}
386+
for (k, v) in opts.iter() {
387+
match k.as_str() {
388+
"on_error" => {
389+
let on_error = OnErrorMode::from_str(v)?;
390+
self.copy_options.on_error = on_error;
391+
}
392+
"size_limit" => {
393+
let size_limit = usize::from_str(v)?;
394+
self.copy_options.size_limit = size_limit;
395+
}
396+
"split_size" => {
397+
let split_size = usize::from_str(v)?;
398+
self.copy_options.split_size = split_size;
399+
}
400+
"purge" => {
401+
let purge = bool::from_str(v).map_err(|_| {
402+
ErrorCode::StrParseError(format!("Cannot parse purge: {} as bool", v))
403+
})?;
404+
self.copy_options.purge = purge;
405+
}
406+
"single" => {
407+
let single = bool::from_str(v).map_err(|_| {
408+
ErrorCode::StrParseError(format!("Cannot parse single: {} as bool", v))
409+
})?;
410+
self.copy_options.single = single;
411+
}
412+
"max_file_size" => {
413+
let max_file_size = usize::from_str(v)?;
414+
self.copy_options.max_file_size = max_file_size;
415+
}
416+
_ => {
417+
return Err(ErrorCode::BadArguments(format!(
418+
"Unknown stage copy option {}",
419+
k
420+
)));
421+
}
422+
}
423+
}
424+
Ok(())
425+
}
341426
}
342427

343428
#[derive(Default, Debug, Clone, PartialEq, Eq)]

src/query/catalog/src/table_context.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ pub struct ProcessInfo {
5858
#[derive(Debug, Clone)]
5959
pub struct StageAttachment {
6060
pub location: String,
61-
pub params: BTreeMap<String, String>,
61+
pub format_options: BTreeMap<String, String>,
62+
pub copy_options: BTreeMap<String, String>,
6263
}
6364

6465
#[async_trait::async_trait]

src/query/service/src/interpreters/interpreter_insert_v2.rs

Lines changed: 2 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::BTreeMap;
1615
use std::collections::VecDeque;
1716
use std::io::BufRead;
1817
use std::io::Cursor;
1918
use std::ops::Not;
20-
use std::str::FromStr;
2119
use std::sync::Arc;
2220
use std::time::Instant;
2321

@@ -38,9 +36,6 @@ use common_formats::parse_timezone;
3836
use common_formats::FastFieldDecoderValues;
3937
use common_io::cursor_ext::ReadBytesExt;
4038
use common_io::cursor_ext::ReadCheckPointExt;
41-
use common_meta_types::OnErrorMode;
42-
use common_meta_types::StageFileCompression;
43-
use common_meta_types::StageFileFormatType;
4439
use common_meta_types::UserStageInfo;
4540
use common_pipeline_core::Pipeline;
4641
use common_pipeline_sources::processors::sources::AsyncSource;
@@ -117,70 +112,6 @@ impl InsertInterpreterV2 {
117112
Ok(cast_needed)
118113
}
119114

120-
fn apply_stage_options(
121-
&self,
122-
stage: &mut UserStageInfo,
123-
params: &BTreeMap<String, String>,
124-
) -> Result<()> {
125-
for (k, v) in params.iter() {
126-
match k.as_str() {
127-
// file format options
128-
"format" => {
129-
let format = StageFileFormatType::from_str(v)?;
130-
stage.file_format_options.format = format;
131-
}
132-
"skip_header" => {
133-
let skip_header = u64::from_str(v)?;
134-
stage.file_format_options.skip_header = skip_header;
135-
}
136-
"field_delimiter" => stage.file_format_options.field_delimiter = v.clone(),
137-
"record_delimiter" => stage.file_format_options.record_delimiter = v.clone(),
138-
"nan_display" => stage.file_format_options.nan_display = v.clone(),
139-
"escape" => stage.file_format_options.escape = v.clone(),
140-
"compression" => {
141-
let compression = StageFileCompression::from_str(v)?;
142-
stage.file_format_options.compression = compression;
143-
}
144-
"row_tag" => stage.file_format_options.row_tag = v.clone(),
145-
"quote" => stage.file_format_options.quote = v.clone(),
146-
147-
// copy options
148-
"on_error" => {
149-
let on_error = OnErrorMode::from_str(v)?;
150-
stage.copy_options.on_error = on_error;
151-
}
152-
"size_limit" => {
153-
let size_limit = usize::from_str(v)?;
154-
stage.copy_options.size_limit = size_limit;
155-
}
156-
"split_size" => {
157-
let split_size = usize::from_str(v)?;
158-
stage.copy_options.split_size = split_size;
159-
}
160-
"purge" => {
161-
let purge = bool::from_str(v).map_err(|_| {
162-
ErrorCode::StrParseError(format!("Cannot parse purge: {} as bool", v))
163-
})?;
164-
stage.copy_options.purge = purge;
165-
}
166-
"single" => {
167-
let single = bool::from_str(v).map_err(|_| {
168-
ErrorCode::StrParseError(format!("Cannot parse single: {} as bool", v))
169-
})?;
170-
stage.copy_options.single = single;
171-
}
172-
"max_file_size" => {
173-
let max_file_size = usize::from_str(v)?;
174-
stage.copy_options.max_file_size = max_file_size;
175-
}
176-
177-
_ => {}
178-
}
179-
}
180-
181-
Ok(())
182-
}
183-
184115
async fn build_insert_from_stage_pipeline(
185116
&self,
186117
table: Arc<dyn Table>,
@@ -196,7 +127,8 @@ impl InsertInterpreterV2 {
196127
let overwrite = self.plan.overwrite;
197128

198129
let (mut stage_info, path) = parse_stage_location(&self.ctx, &attachment.location).await?;
199-
self.apply_stage_options(&mut stage_info, &attachment.params)?;
130+
stage_info.apply_format_options(&attachment.format_options)?;
131+
stage_info.apply_copy_options(&attachment.copy_options)?;
200132

201133
let mut stage_table_info = StageTableInfo {
202134
schema: source_schema.clone(),
@@ -209,8 +141,6 @@ impl InsertInterpreterV2 {
209141

210142
let all_source_file_infos = StageTable::list_files(&table_ctx, &stage_table_info).await?;
211143

212-
// TODO:(everpcpc) color_copied_files
213-
214144
tracing::info!(
215145
"insert: read all stage attachment files finished: {}, elapsed:{}",
216146
all_source_file_infos.len(),

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ impl HttpSessionConf {
146146
#[derive(Deserialize, Debug, Clone)]
147147
pub struct StageAttachmentConf {
148148
pub(crate) location: String,
149-
pub(crate) params: Option<BTreeMap<String, String>>,
149+
pub(crate) format_options: Option<BTreeMap<String, String>>,
150+
pub(crate) copy_options: Option<BTreeMap<String, String>>,
150151
}
151152

152153
#[derive(Debug, Clone)]
@@ -240,7 +241,11 @@ impl HttpQuery {
240241
match &request.stage_attachment {
241242
Some(attachment) => ctx.attach_stage(StageAttachment {
242243
location: attachment.location.clone(),
243-
params: match attachment.params {
244+
format_options: match attachment.format_options {
245+
Some(ref params) => params.clone(),
246+
None => BTreeMap::new(),
247+
},
248+
copy_options: match attachment.copy_options {
244249
Some(ref params) => params.clone(),
245250
None => BTreeMap::new(),
246251
},

0 commit comments

Comments
 (0)