Skip to content

Commit cc4d3f2

Browse files
authored
feat: copy support option COLUMN_MATCH_MODE (#16963)
* rm unused code. * remove dup code * feat: parquet support copy option `COLUMN_MATCH_MODE`
1 parent ae0cdc3 commit cc4d3f2

File tree

24 files changed

+359
-168
lines changed

24 files changed

+359
-168
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/app/src/principal/file_format.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use std::fmt::Display;
1818
use std::fmt::Formatter;
1919
use std::str::FromStr;
2020

21+
use databend_common_ast::ast::ColumnMatchMode;
22+
use databend_common_ast::ast::CopyIntoTableOptions;
2123
use databend_common_ast::ast::FileFormatOptions;
2224
use databend_common_ast::ast::FileFormatValue;
2325
use databend_common_exception::ErrorCode;
@@ -107,6 +109,26 @@ impl FileFormatParams {
107109
}
108110
}
109111

112+
pub fn check_copy_options(&self, options: &mut CopyIntoTableOptions) -> Result<()> {
113+
if let Some(m) = &options.column_match_mode {
114+
match self {
115+
FileFormatParams::Parquet(_) => {
116+
if let ColumnMatchMode::Position = m {
117+
return Err(ErrorCode::BadArguments(
118+
"COLUMN_MATCH_MODE=POSITION not supported yet.",
119+
));
120+
}
121+
}
122+
_ => {
123+
return Err(ErrorCode::BadArguments(
124+
"COLUMN_MATCH_MODE can only apply to Parquet for now.",
125+
));
126+
}
127+
}
128+
}
129+
Ok(())
130+
}
131+
110132
pub fn need_field_default(&self) -> bool {
111133
match self {
112134
FileFormatParams::Parquet(v) => v.missing_field_as == NullAs::FieldDefault,

src/meta/app/src/principal/user_stage.rs

Lines changed: 1 addition & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::str::FromStr;
2020

2121
use chrono::DateTime;
2222
use chrono::Utc;
23+
pub use databend_common_ast::ast::OnErrorMode;
2324
use databend_common_exception::ErrorCode;
2425
use databend_common_exception::Result;
2526
use databend_common_io::constants::NAN_BYTES_SNAKE;
@@ -400,95 +401,6 @@ pub struct StageParams {
400401
pub storage: StorageParams,
401402
}
402403

403-
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Copy)]
404-
pub enum OnErrorMode {
405-
Continue,
406-
SkipFileNum(u64),
407-
AbortNum(u64),
408-
}
409-
410-
impl Default for OnErrorMode {
411-
fn default() -> Self {
412-
Self::AbortNum(1)
413-
}
414-
}
415-
416-
impl FromStr for OnErrorMode {
417-
type Err = String;
418-
419-
fn from_str(s: &str) -> std::result::Result<Self, String> {
420-
match s.to_uppercase().as_str() {
421-
"" | "ABORT" => Ok(OnErrorMode::AbortNum(1)),
422-
"CONTINUE" => Ok(OnErrorMode::Continue),
423-
"SKIP_FILE" => Ok(OnErrorMode::SkipFileNum(1)),
424-
v => {
425-
if v.starts_with("ABORT_") {
426-
let num_str = v.replace("ABORT_", "");
427-
let nums = num_str.parse::<u64>();
428-
match nums {
429-
Ok(n) if n < 1 => {
430-
Err("OnError mode `ABORT_<num>` num must be greater than 0".to_string())
431-
}
432-
Ok(n) => Ok(OnErrorMode::AbortNum(n)),
433-
Err(_) => Err(format!(
434-
"Unknown OnError mode:{:?}, must one of {{ CONTINUE | SKIP_FILE | SKIP_FILE_<num> | ABORT | ABORT_<num> }}",
435-
v
436-
)),
437-
}
438-
} else {
439-
let num_str = v.replace("SKIP_FILE_", "");
440-
let nums = num_str.parse::<u64>();
441-
match nums {
442-
Ok(n) if n < 1 => {
443-
Err("OnError mode `SKIP_FILE_<num>` num must be greater than 0"
444-
.to_string())
445-
}
446-
Ok(n) => Ok(OnErrorMode::SkipFileNum(n)),
447-
Err(_) => Err(format!(
448-
"Unknown OnError mode:{:?}, must one of {{ CONTINUE | SKIP_FILE | SKIP_FILE_<num> | ABORT | ABORT_<num> }}",
449-
v
450-
)),
451-
}
452-
}
453-
}
454-
}
455-
}
456-
}
457-
458-
impl Display for OnErrorMode {
459-
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
460-
match self {
461-
OnErrorMode::Continue => {
462-
write!(f, "continue")
463-
}
464-
OnErrorMode::SkipFileNum(n) => {
465-
if *n <= 1 {
466-
write!(f, "skipfile")
467-
} else {
468-
write!(f, "skipfile_{}", n)
469-
}
470-
}
471-
OnErrorMode::AbortNum(n) => {
472-
if *n <= 1 {
473-
write!(f, "abort")
474-
} else {
475-
write!(f, "abort_{}", n)
476-
}
477-
}
478-
}
479-
}
480-
}
481-
482-
impl From<databend_common_ast::ast::OnErrorMode> for OnErrorMode {
483-
fn from(opt: databend_common_ast::ast::OnErrorMode) -> Self {
484-
match opt {
485-
databend_common_ast::ast::OnErrorMode::Continue => OnErrorMode::Continue,
486-
databend_common_ast::ast::OnErrorMode::SkipFileNum(n) => OnErrorMode::SkipFileNum(n),
487-
databend_common_ast::ast::OnErrorMode::AbortNum(n) => OnErrorMode::AbortNum(n),
488-
}
489-
}
490-
}
491-
492404
#[derive(serde::Serialize, serde::Deserialize, Clone, Default, Debug, Eq, PartialEq)]
493405
#[serde(default)]
494406
pub struct CopyOptions {

src/query/ast/src/ast/statements/copy.rs

Lines changed: 70 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ impl CopyIntoTableStmt {
7878
CopyIntoTableOption::DisableVariantCheck(v) => self.options.disable_variant_check = v,
7979
CopyIntoTableOption::ReturnFailedOnly(v) => self.options.return_failed_only = v,
8080
CopyIntoTableOption::OnError(v) => self.options.on_error = OnErrorMode::from_str(&v)?,
81+
CopyIntoTableOption::ColumnMatchMode(v) => {
82+
self.options.column_match_mode = Some(ColumnMatchMode::from_str(&v)?)
83+
}
8184
}
8285
Ok(())
8386
}
@@ -111,37 +114,7 @@ impl Display for CopyIntoTableStmt {
111114
if !self.file_format.is_empty() {
112115
write!(f, " FILE_FORMAT = ({})", self.file_format)?;
113116
}
114-
115-
if !self.options.validation_mode.is_empty() {
116-
write!(f, "VALIDATION_MODE = {}", self.options.validation_mode)?;
117-
}
118-
119-
if self.options.size_limit != 0 {
120-
write!(f, " SIZE_LIMIT = {}", self.options.size_limit)?;
121-
}
122-
123-
if self.options.max_files != 0 {
124-
write!(f, " MAX_FILES = {}", self.options.max_files)?;
125-
}
126-
127-
if self.options.split_size != 0 {
128-
write!(f, " SPLIT_SIZE = {}", self.options.split_size)?;
129-
}
130-
131-
write!(f, " PURGE = {}", self.options.purge)?;
132-
write!(f, " FORCE = {}", self.options.force)?;
133-
write!(
134-
f,
135-
" DISABLE_VARIANT_CHECK = {}",
136-
self.options.disable_variant_check
137-
)?;
138-
write!(f, " ON_ERROR = {}", self.options.on_error)?;
139-
write!(
140-
f,
141-
" RETURN_FAILED_ONLY = {}",
142-
self.options.return_failed_only
143-
)?;
144-
117+
write!(f, " {}", self.options)?;
145118
Ok(())
146119
}
147120
}
@@ -159,6 +132,7 @@ pub struct CopyIntoTableOptions {
159132
pub disable_variant_check: bool,
160133
pub return_failed_only: bool,
161134
pub validation_mode: String,
135+
pub column_match_mode: Option<ColumnMatchMode>,
162136
}
163137

164138
impl CopyIntoTableOptions {
@@ -169,6 +143,10 @@ impl CopyIntoTableOptions {
169143
bool::from_str(v).map_err(|e| format!("can not parse {}={} as bool: {}", k, v, e))
170144
}
171145

146+
pub fn set_column_match_mode(&mut self, mode: ColumnMatchMode) {
147+
self.column_match_mode = Some(mode);
148+
}
149+
172150
pub fn apply(
173151
&mut self,
174152
opts: &BTreeMap<String, String>,
@@ -183,6 +161,10 @@ impl CopyIntoTableOptions {
183161
let on_error = OnErrorMode::from_str(v)?;
184162
self.on_error = on_error;
185163
}
164+
"column_match_mode" => {
165+
let column_match_mode = ColumnMatchMode::from_str(v)?;
166+
self.column_match_mode = Some(column_match_mode);
167+
}
186168
"size_limit" => {
187169
self.size_limit = Self::parse_uint(k, v)?;
188170
}
@@ -214,13 +196,30 @@ impl CopyIntoTableOptions {
214196

215197
impl Display for CopyIntoTableOptions {
216198
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
217-
write!(f, "OnErrorMode {}", self.on_error)?;
218-
write!(f, "SizeLimit {}", self.size_limit)?;
219-
write!(f, "MaxFiles {}", self.max_files)?;
220-
write!(f, "SplitSize {}", self.split_size)?;
221-
write!(f, "Purge {}", self.purge)?;
222-
write!(f, "DisableVariantCheck {}", self.disable_variant_check)?;
223-
write!(f, "ReturnFailedOnly {}", self.return_failed_only)?;
199+
if !self.validation_mode.is_empty() {
200+
write!(f, "VALIDATION_MODE = {}", self.validation_mode)?;
201+
}
202+
203+
if self.size_limit != 0 {
204+
write!(f, " SIZE_LIMIT = {}", self.size_limit)?;
205+
}
206+
207+
if self.max_files != 0 {
208+
write!(f, " MAX_FILES = {}", self.max_files)?;
209+
}
210+
211+
if self.split_size != 0 {
212+
write!(f, " SPLIT_SIZE = {}", self.split_size)?;
213+
}
214+
215+
write!(f, " PURGE = {}", self.purge)?;
216+
write!(f, " FORCE = {}", self.force)?;
217+
write!(f, " DISABLE_VARIANT_CHECK = {}", self.disable_variant_check)?;
218+
write!(f, " ON_ERROR = {}", self.on_error)?;
219+
write!(f, " RETURN_FAILED_ONLY = {}", self.return_failed_only)?;
220+
if let Some(mode) = &self.column_match_mode {
221+
write!(f, " COLUMN_MATCH_MODE = {}", mode)?;
222+
}
224223
Ok(())
225224
}
226225
}
@@ -572,6 +571,7 @@ pub enum CopyIntoTableOption {
572571
DisableVariantCheck(bool),
573572
ReturnFailedOnly(bool),
574573
OnError(String),
574+
ColumnMatchMode(String),
575575
}
576576

577577
pub enum CopyIntoLocationOption {
@@ -714,3 +714,35 @@ impl FromStr for OnErrorMode {
714714
}
715715
}
716716
}
717+
718+
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq)]
719+
pub enum ColumnMatchMode {
720+
CaseSensitive,
721+
CaseInsensitive,
722+
Position,
723+
}
724+
725+
impl Display for ColumnMatchMode {
726+
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
727+
match self {
728+
ColumnMatchMode::CaseSensitive => write!(f, "CASE_SENSITIVE"),
729+
ColumnMatchMode::CaseInsensitive => write!(f, "CASE_INSENSITIVE"),
730+
ColumnMatchMode::Position => write!(f, "POSITION"),
731+
}
732+
}
733+
}
734+
735+
const COLUMN_MATCH_MODE_MSG: &str =
736+
"ColumnMatchMode must be one of {{ CASE_SENSITIVE | CASE_INSENSITIVE | POSITION }}";
737+
impl FromStr for ColumnMatchMode {
738+
type Err = &'static str;
739+
740+
fn from_str(s: &str) -> std::result::Result<Self, &'static str> {
741+
match s.to_uppercase().as_str() {
742+
"CASE_SENSITIVE" => Ok(Self::CaseSensitive),
743+
"CASE_INSENSITIVE" => Ok(Self::CaseInsensitive),
744+
"POSITION" => Ok(Self::Position),
745+
_ => Err(COLUMN_MATCH_MODE_MSG),
746+
}
747+
}
748+
}

src/query/ast/src/ast/statements/stage.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub enum SelectStageOption {
7272
Pattern(LiteralStringOrVariable),
7373
FileFormat(String),
7474
Connection(BTreeMap<String, String>),
75+
CaseSensitive(bool),
7576
}
7677

7778
impl SelectStageOptions {
@@ -83,6 +84,7 @@ impl SelectStageOptions {
8384
SelectStageOption::Pattern(v) => options.pattern = Some(v),
8485
SelectStageOption::FileFormat(v) => options.file_format = Some(v),
8586
SelectStageOption::Connection(v) => options.connection = v,
87+
SelectStageOption::CaseSensitive(v) => options.case_sensitive = Some(v),
8688
}
8789
}
8890
options
@@ -95,6 +97,7 @@ pub struct SelectStageOptions {
9597
pub pattern: Option<LiteralStringOrVariable>,
9698
pub file_format: Option<String>,
9799
pub connection: BTreeMap<String, String>,
100+
pub case_sensitive: Option<bool>,
98101
}
99102

100103
impl SelectStageOptions {
@@ -103,6 +106,7 @@ impl SelectStageOptions {
103106
&& self.pattern.is_none()
104107
&& self.file_format.is_none()
105108
&& self.connection.is_empty()
109+
&& self.case_sensitive.is_none()
106110
}
107111
}
108112

@@ -139,6 +143,10 @@ impl Display for SelectStageOptions {
139143
write!(f, " PATTERN => {},", pattern)?;
140144
}
141145

146+
if let Some(case_sensitive) = self.case_sensitive {
147+
write!(f, " CASE_SENSITIVE => {},", case_sensitive)?;
148+
}
149+
142150
if !self.connection.is_empty() {
143151
write!(f, " CONNECTION => (")?;
144152
write_comma_separated_string_map(f, &self.connection)?;

src/query/ast/src/parser/copy.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,10 @@ fn copy_into_table_option(i: Input) -> IResult<CopyIntoTableOption> {
174174
map(rule! { ON_ERROR ~ "=" ~ #ident }, |(_, _, on_error)| {
175175
CopyIntoTableOption::OnError(on_error.to_string())
176176
}),
177+
map(
178+
rule! { COLUMN_MATCH_MODE ~ "=" ~ #ident },
179+
|(_, _, mode)| CopyIntoTableOption::ColumnMatchMode(mode.to_string()),
180+
),
177181
map(
178182
rule! { DISABLE_VARIANT_CHECK ~ "=" ~ #literal_bool },
179183
|(_, _, disable_variant_check)| {

src/query/ast/src/parser/stage.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,5 +262,9 @@ pub fn select_stage_option(i: Input) -> IResult<SelectStageOption> {
262262
rule! { CONNECTION ~ ^"=>" ~ ^#connection_options },
263263
|(_, _, file_format)| SelectStageOption::Connection(file_format),
264264
),
265+
map(
266+
rule! { CASE_SENSITIVE ~ ^"=>" ~ ^#literal_bool },
267+
|(_, _, case_sensitive)| SelectStageOption::CaseSensitive(case_sensitive),
268+
),
265269
))(i)
266270
}

src/query/ast/src/parser/token.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,8 @@ pub enum TokenKind {
423423
CALL,
424424
#[token("CASE", ignore(ascii_case))]
425425
CASE,
426+
#[token("CASE_SENSITIVE", ignore(ascii_case))]
427+
CASE_SENSITIVE,
426428
#[token("CAST", ignore(ascii_case))]
427429
CAST,
428430
#[token("CATALOG", ignore(ascii_case))]
@@ -455,6 +457,8 @@ pub enum TokenKind {
455457
CHAR,
456458
#[token("COLUMN", ignore(ascii_case))]
457459
COLUMN,
460+
#[token("COLUMN_MATCH_MODE", ignore(ascii_case))]
461+
COLUMN_MATCH_MODE,
458462
#[token("COLUMNS", ignore(ascii_case))]
459463
COLUMNS,
460464
#[token("CHARACTER", ignore(ascii_case))]

0 commit comments

Comments
 (0)