@@ -18,6 +18,8 @@ use std::fmt::Display;
18
18
use std:: fmt:: Formatter ;
19
19
use std:: str:: FromStr ;
20
20
21
+ use databend_common_ast:: ast:: FileFormatOptions ;
22
+ use databend_common_ast:: ast:: FileFormatValue ;
21
23
use databend_common_exception:: ErrorCode ;
22
24
use databend_common_exception:: Result ;
23
25
use databend_common_io:: constants:: NULL_BYTES_ESCAPE ;
@@ -46,63 +48,6 @@ const NULL_IF: &str = "null_if";
46
48
const OPT_EMPTY_FIELD_AS : & str = "empty_field_as" ;
47
49
const OPT_BINARY_FORMAT : & str = "binary_format" ;
48
50
49
- #[ derive( Clone , Debug , PartialEq , Eq , Serialize , Deserialize ) ]
50
- pub struct FileFormatOptionsAst {
51
- pub options : BTreeMap < String , String > ,
52
- }
53
-
54
- impl FileFormatOptionsAst {
55
- pub fn new ( options : BTreeMap < String , String > ) -> Self {
56
- FileFormatOptionsAst { options }
57
- }
58
-
59
- fn take_string ( & mut self , key : & str , default : String ) -> String {
60
- self . options . remove ( key) . unwrap_or ( default)
61
- }
62
-
63
- fn take_type ( & mut self ) -> Result < StageFileFormatType > {
64
- match ( self . options . remove ( "type" ) , self . options . remove ( "format" ) ) {
65
- ( Some ( t) , None ) | ( None , Some ( t) ) => {
66
- StageFileFormatType :: from_str ( & t) . map_err ( ErrorCode :: IllegalFileFormat )
67
- }
68
- ( Some ( _) , Some ( _) ) => Err ( ErrorCode :: IllegalFileFormat (
69
- "Invalid FILE_FORMAT options: both TYPE and FORMAT option are present. \
70
- Please only use the TYPE to specify the file format type. The FORMAT option is deprecated.",
71
- ) ) ,
72
- ( None , None ) => Err ( ErrorCode :: IllegalFileFormat (
73
- "Invalid FILE_FORMAT options: FILE_FORMAT must include at least one of the TYPE or NAME option. \
74
- Currently, neither is specified.",
75
- ) ) ,
76
- }
77
- }
78
-
79
- fn take_compression ( & mut self ) -> Result < StageFileCompression > {
80
- match self . options . remove ( "compression" ) {
81
- Some ( c) => StageFileCompression :: from_str ( & c) . map_err ( ErrorCode :: IllegalFileFormat ) ,
82
- None => Ok ( StageFileCompression :: None ) ,
83
- }
84
- }
85
-
86
- fn take_u64 ( & mut self , key : & str , default : u64 ) -> Result < u64 > {
87
- match self . options . remove ( key) {
88
- Some ( v) => Ok ( u64:: from_str ( & v) ?) ,
89
- None => Ok ( default) ,
90
- }
91
- }
92
-
93
- fn take_bool ( & mut self , key : & str , default : bool ) -> Result < bool > {
94
- match self . options . remove ( key) {
95
- Some ( v) => Ok ( bool:: from_str ( & v. to_lowercase ( ) ) . map_err ( |_| {
96
- ErrorCode :: IllegalFileFormat ( format ! (
97
- "Invalid boolean value {} for option {}." ,
98
- v, key
99
- ) )
100
- } ) ?) ,
101
- None => Ok ( default) ,
102
- }
103
- }
104
- }
105
-
106
51
/// File format parameters after checking and parsing.
107
52
#[ derive( Clone , Debug , PartialEq , Eq , Serialize , Deserialize ) ]
108
53
#[ serde( tag = "type" ) ]
@@ -159,28 +104,27 @@ impl FileFormatParams {
159
104
}
160
105
}
161
106
162
- pub fn try_from_ast ( ast : FileFormatOptionsAst , old : bool ) -> Result < Self > {
163
- let mut ast = ast;
164
- let typ = ast. take_type ( ) ?;
107
+ pub fn try_from_reader ( mut reader : FileFormatOptionsReader , old : bool ) -> Result < Self > {
108
+ let typ = reader. take_type ( ) ?;
165
109
let params = match typ {
166
110
StageFileFormatType :: Xml => {
167
111
let default = XmlFileFormatParams :: default ( ) ;
168
- let row_tag = ast . take_string ( OPT_ROW_TAG , default. row_tag ) ;
169
- let compression = ast . take_compression ( ) ?;
112
+ let row_tag = reader . take_string ( OPT_ROW_TAG , default. row_tag ) ;
113
+ let compression = reader . take_compression ( ) ?;
170
114
FileFormatParams :: Xml ( XmlFileFormatParams {
171
115
compression,
172
116
row_tag,
173
117
} )
174
118
}
175
119
StageFileFormatType :: Json => {
176
- let compression = ast . take_compression ( ) ?;
120
+ let compression = reader . take_compression ( ) ?;
177
121
FileFormatParams :: Json ( JsonFileFormatParams { compression } )
178
122
}
179
123
StageFileFormatType :: NdJson => {
180
- let compression = ast . take_compression ( ) ?;
181
- let missing_field_as = ast . options . remove ( MISSING_FIELD_AS ) ;
182
- let null_field_as = ast . options . remove ( NULL_FIELD_AS ) ;
183
- let null_if = ast . options . remove ( NULL_IF ) ;
124
+ let compression = reader . take_compression ( ) ?;
125
+ let missing_field_as = reader . options . remove ( MISSING_FIELD_AS ) ;
126
+ let null_field_as = reader . options . remove ( NULL_FIELD_AS ) ;
127
+ let null_if = reader . options . remove ( NULL_IF ) ;
184
128
let null_if = match null_if {
185
129
None => {
186
130
vec ! [ ]
@@ -201,39 +145,40 @@ impl FileFormatParams {
201
145
) ?)
202
146
}
203
147
StageFileFormatType :: Parquet => {
204
- let missing_field_as = ast . options . remove ( MISSING_FIELD_AS ) ;
148
+ let missing_field_as = reader . options . remove ( MISSING_FIELD_AS ) ;
205
149
FileFormatParams :: Parquet ( ParquetFileFormatParams :: try_create (
206
150
missing_field_as. as_deref ( ) ,
207
151
) ?)
208
152
}
209
153
StageFileFormatType :: Csv => {
210
154
let default = CsvFileFormatParams :: default ( ) ;
211
- let compression = ast. take_compression ( ) ?;
212
- let headers = ast. take_u64 ( OPT_SKIP_HEADER , default. headers ) ?;
213
- let field_delimiter = ast. take_string ( OPT_FIELD_DELIMITER , default. field_delimiter ) ;
155
+ let compression = reader. take_compression ( ) ?;
156
+ let headers = reader. take_u64 ( OPT_SKIP_HEADER , default. headers ) ?;
157
+ let field_delimiter =
158
+ reader. take_string ( OPT_FIELD_DELIMITER , default. field_delimiter ) ;
214
159
let record_delimiter =
215
- ast . take_string ( OPT_RECORDE_DELIMITER , default. record_delimiter ) ;
216
- let nan_display = ast . take_string ( OPT_NAN_DISPLAY , default. nan_display ) ;
217
- let escape = ast . take_string ( OPT_ESCAPE , default. escape ) ;
218
- let quote = ast . take_string ( OPT_QUOTE , default. quote ) ;
219
- let null_display = ast . take_string ( OPT_NULL_DISPLAY , default. null_display ) ;
220
- let empty_field_as = ast
160
+ reader . take_string ( OPT_RECORDE_DELIMITER , default. record_delimiter ) ;
161
+ let nan_display = reader . take_string ( OPT_NAN_DISPLAY , default. nan_display ) ;
162
+ let escape = reader . take_string ( OPT_ESCAPE , default. escape ) ;
163
+ let quote = reader . take_string ( OPT_QUOTE , default. quote ) ;
164
+ let null_display = reader . take_string ( OPT_NULL_DISPLAY , default. null_display ) ;
165
+ let empty_field_as = reader
221
166
. options
222
167
. remove ( OPT_EMPTY_FIELD_AS )
223
168
. map ( |s| EmptyFieldAs :: from_str ( & s) )
224
169
. transpose ( ) ?
225
170
. unwrap_or_default ( ) ;
226
- let binary_format = ast
171
+ let binary_format = reader
227
172
. options
228
173
. remove ( OPT_BINARY_FORMAT )
229
174
. map ( |s| BinaryFormat :: from_str ( & s) )
230
175
. transpose ( ) ?
231
176
. unwrap_or_default ( ) ;
232
- let error_on_column_count_mismatch = ast . take_bool (
177
+ let error_on_column_count_mismatch = reader . take_bool (
233
178
OPT_ERROR_ON_COLUMN_COUNT_MISMATCH ,
234
179
default. error_on_column_count_mismatch ,
235
180
) ?;
236
- let output_header = ast . take_bool ( OPT_OUTPUT_HEADER , default. output_header ) ?;
181
+ let output_header = reader . take_bool ( OPT_OUTPUT_HEADER , default. output_header ) ?;
237
182
FileFormatParams :: Csv ( CsvFileFormatParams {
238
183
compression,
239
184
headers,
@@ -252,14 +197,15 @@ impl FileFormatParams {
252
197
}
253
198
StageFileFormatType :: Tsv => {
254
199
let default = TsvFileFormatParams :: default ( ) ;
255
- let compression = ast. take_compression ( ) ?;
256
- let headers = ast. take_u64 ( OPT_SKIP_HEADER , default. headers ) ?;
257
- let field_delimiter = ast. take_string ( OPT_FIELD_DELIMITER , default. field_delimiter ) ;
200
+ let compression = reader. take_compression ( ) ?;
201
+ let headers = reader. take_u64 ( OPT_SKIP_HEADER , default. headers ) ?;
202
+ let field_delimiter =
203
+ reader. take_string ( OPT_FIELD_DELIMITER , default. field_delimiter ) ;
258
204
let record_delimiter =
259
- ast . take_string ( OPT_RECORDE_DELIMITER , default. record_delimiter ) ;
260
- let nan_display = ast . take_string ( OPT_NAN_DISPLAY , default. nan_display ) ;
261
- let escape = ast . take_string ( OPT_ESCAPE , default. escape ) ;
262
- let quote = ast . take_string ( OPT_QUOTE , default. quote ) ;
205
+ reader . take_string ( OPT_RECORDE_DELIMITER , default. record_delimiter ) ;
206
+ let nan_display = reader . take_string ( OPT_NAN_DISPLAY , default. nan_display ) ;
207
+ let escape = reader . take_string ( OPT_ESCAPE , default. escape ) ;
208
+ let quote = reader . take_string ( OPT_QUOTE , default. quote ) ;
263
209
FileFormatParams :: Tsv ( TsvFileFormatParams {
264
210
compression,
265
211
headers,
@@ -285,12 +231,12 @@ impl FileFormatParams {
285
231
params. get_type( ) . to_string( )
286
232
) )
287
233
} ) ?;
288
- if ast . options . is_empty ( ) {
234
+ if reader . options . is_empty ( ) {
289
235
Ok ( params)
290
236
} else {
291
237
Err ( ErrorCode :: IllegalFileFormat ( format ! (
292
238
"Unsupported options for {:?}: {:?}" ,
293
- typ, ast . options
239
+ typ, reader . options
294
240
) ) )
295
241
}
296
242
}
@@ -342,11 +288,79 @@ impl Default for FileFormatParams {
342
288
}
343
289
}
344
290
345
- impl TryFrom < FileFormatOptionsAst > for FileFormatParams {
346
- type Error = ErrorCode ;
291
+ pub struct FileFormatOptionsReader {
292
+ pub options : BTreeMap < String , String > ,
293
+ }
294
+
295
+ impl FileFormatOptionsReader {
296
+ pub fn from_ast ( options : & FileFormatOptions ) -> Self {
297
+ let options = options
298
+ . options
299
+ . iter ( )
300
+ . map ( |( k, v) | {
301
+ let v = match v {
302
+ FileFormatValue :: Keyword ( v) => v. clone ( ) ,
303
+ FileFormatValue :: Bool ( v) => v. to_string ( ) ,
304
+ FileFormatValue :: U64 ( v) => v. to_string ( ) ,
305
+ FileFormatValue :: String ( v) => v. clone ( ) ,
306
+ FileFormatValue :: StringList ( v) => serde_json:: to_string ( & v) . unwrap ( ) ,
307
+ } ;
308
+
309
+ ( k. clone ( ) , v)
310
+ } )
311
+ . collect ( ) ;
312
+
313
+ FileFormatOptionsReader { options }
314
+ }
347
315
348
- fn try_from ( ast : FileFormatOptionsAst ) -> Result < Self > {
349
- FileFormatParams :: try_from_ast ( ast, false )
316
+ pub fn from_map ( options : BTreeMap < String , String > ) -> Self {
317
+ FileFormatOptionsReader { options }
318
+ }
319
+
320
+ fn take_string ( & mut self , key : & str , default : String ) -> String {
321
+ self . options . remove ( key) . unwrap_or ( default)
322
+ }
323
+
324
+ fn take_type ( & mut self ) -> Result < StageFileFormatType > {
325
+ match ( self . options . remove ( "type" ) , self . options . remove ( "format" ) ) {
326
+ ( Some ( t) , None ) | ( None , Some ( t) ) => {
327
+ StageFileFormatType :: from_str ( & t) . map_err ( ErrorCode :: IllegalFileFormat )
328
+ }
329
+ ( Some ( _) , Some ( _) ) => Err ( ErrorCode :: IllegalFileFormat (
330
+ "Invalid FILE_FORMAT options: both TYPE and FORMAT option are present. \
331
+ Please only use the TYPE to specify the file format type. The FORMAT option is deprecated.",
332
+ ) ) ,
333
+ ( None , None ) => Err ( ErrorCode :: IllegalFileFormat (
334
+ "Invalid FILE_FORMAT options: FILE_FORMAT must include at least one of the TYPE or NAME option. \
335
+ Currently, neither is specified.",
336
+ ) ) ,
337
+ }
338
+ }
339
+
340
+ fn take_compression ( & mut self ) -> Result < StageFileCompression > {
341
+ match self . options . remove ( "compression" ) {
342
+ Some ( c) => StageFileCompression :: from_str ( & c) . map_err ( ErrorCode :: IllegalFileFormat ) ,
343
+ None => Ok ( StageFileCompression :: None ) ,
344
+ }
345
+ }
346
+
347
+ fn take_u64 ( & mut self , key : & str , default : u64 ) -> Result < u64 > {
348
+ match self . options . remove ( key) {
349
+ Some ( v) => Ok ( u64:: from_str ( & v) ?) ,
350
+ None => Ok ( default) ,
351
+ }
352
+ }
353
+
354
+ fn take_bool ( & mut self , key : & str , default : bool ) -> Result < bool > {
355
+ match self . options . remove ( key) {
356
+ Some ( v) => Ok ( bool:: from_str ( & v. to_lowercase ( ) ) . map_err ( |_| {
357
+ ErrorCode :: IllegalFileFormat ( format ! (
358
+ "Invalid boolean value {} for option {}." ,
359
+ v, key
360
+ ) )
361
+ } ) ?) ,
362
+ None => Ok ( default) ,
363
+ }
350
364
}
351
365
}
352
366
0 commit comments