12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
+ use std:: collections:: BTreeMap ;
15
16
use std:: collections:: VecDeque ;
16
17
use std:: io:: BufRead ;
17
18
use std:: io:: Cursor ;
18
19
use std:: ops:: Not ;
20
+ use std:: str:: FromStr ;
19
21
use std:: sync:: Arc ;
20
22
use std:: time:: Instant ;
21
23
@@ -28,6 +30,7 @@ use common_base::runtime::GlobalIORuntime;
28
30
use common_catalog:: plan:: StageFileStatus ;
29
31
use common_catalog:: plan:: StageTableInfo ;
30
32
use common_catalog:: table:: AppendMode ;
33
+ use common_catalog:: table_context:: StageAttachment ;
31
34
use common_datablocks:: DataBlock ;
32
35
use common_datavalues:: prelude:: * ;
33
36
use common_exception:: ErrorCode ;
@@ -36,6 +39,9 @@ use common_formats::parse_timezone;
36
39
use common_formats:: FastFieldDecoderValues ;
37
40
use common_io:: cursor_ext:: ReadBytesExt ;
38
41
use common_io:: cursor_ext:: ReadCheckPointExt ;
42
+ use common_meta_types:: OnErrorMode ;
43
+ use common_meta_types:: StageFileCompression ;
44
+ use common_meta_types:: StageFileFormatType ;
39
45
use common_meta_types:: UserStageInfo ;
40
46
use common_pipeline_core:: Pipeline ;
41
47
use common_pipeline_sources:: processors:: sources:: AsyncSource ;
@@ -112,11 +118,74 @@ impl InsertInterpreterV2 {
112
118
Ok ( cast_needed)
113
119
}
114
120
115
- // TODO:(everpcpc)
121
+ fn apply_stage_options (
122
+ & self ,
123
+ stage : & mut UserStageInfo ,
124
+ params : & BTreeMap < String , String > ,
125
+ ) -> Result < ( ) > {
126
+ for ( k, v) in params. iter ( ) {
127
+ match k. as_str ( ) {
128
+ // file format options
129
+ "format" => {
130
+ let format = StageFileFormatType :: from_str ( v) ?;
131
+ stage. file_format_options . format = format;
132
+ }
133
+ "skip_header" => {
134
+ let skip_header = u64:: from_str ( v) ?;
135
+ stage. file_format_options . skip_header = skip_header;
136
+ }
137
+ "field_delimiter" => stage. file_format_options . field_delimiter = v. clone ( ) ,
138
+ "record_delimiter" => stage. file_format_options . record_delimiter = v. clone ( ) ,
139
+ "nan_display" => stage. file_format_options . nan_display = v. clone ( ) ,
140
+ "escape" => stage. file_format_options . escape = v. clone ( ) ,
141
+ "compression" => {
142
+ let compression = StageFileCompression :: from_str ( v) ?;
143
+ stage. file_format_options . compression = compression;
144
+ }
145
+ "row_tag" => stage. file_format_options . row_tag = v. clone ( ) ,
146
+ "quote" => stage. file_format_options . quote = v. clone ( ) ,
147
+
148
+ // copy options
149
+ "on_error" => {
150
+ let on_error = OnErrorMode :: from_str ( v) ?;
151
+ stage. copy_options . on_error = on_error;
152
+ }
153
+ "size_limit" => {
154
+ let size_limit = usize:: from_str ( v) ?;
155
+ stage. copy_options . size_limit = size_limit;
156
+ }
157
+ "split_size" => {
158
+ let split_size = usize:: from_str ( v) ?;
159
+ stage. copy_options . split_size = split_size;
160
+ }
161
+ "purge" => {
162
+ let purge = bool:: from_str ( v) . map_err ( |_| {
163
+ ErrorCode :: StrParseError ( format ! ( "Cannot parse purge: {} as bool" , v) )
164
+ } ) ?;
165
+ stage. copy_options . purge = purge;
166
+ }
167
+ "single" => {
168
+ let single = bool:: from_str ( v) . map_err ( |_| {
169
+ ErrorCode :: StrParseError ( format ! ( "Cannot parse single: {} as bool" , v) )
170
+ } ) ?;
171
+ stage. copy_options . single = single;
172
+ }
173
+ "max_file_size" => {
174
+ let max_file_size = usize:: from_str ( v) ?;
175
+ stage. copy_options . max_file_size = max_file_size;
176
+ }
177
+
178
+ _ => { }
179
+ }
180
+ }
181
+
182
+ Ok ( ( ) )
183
+ }
184
+
116
185
async fn build_insert_from_stage_pipeline (
117
186
& self ,
118
187
table : Arc < dyn Table > ,
119
- stage_location : & str ,
188
+ attachment : Arc < StageAttachment > ,
120
189
pipeline : & mut Pipeline ,
121
190
) -> Result < ( ) > {
122
191
let start = Instant :: now ( ) ;
@@ -127,7 +196,8 @@ impl InsertInterpreterV2 {
127
196
let catalog_name = self . plan . catalog . clone ( ) ;
128
197
let overwrite = self . plan . overwrite ;
129
198
130
- let ( stage_info, path) = parse_stage_location ( & self . ctx , stage_location) . await ?;
199
+ let ( mut stage_info, path) = parse_stage_location ( & self . ctx , & attachment. location ) . await ?;
200
+ self . apply_stage_options ( & mut stage_info, & attachment. params ) ?;
131
201
132
202
let mut stage_table_info = StageTableInfo {
133
203
schema : source_schema. clone ( ) ,
@@ -140,7 +210,7 @@ impl InsertInterpreterV2 {
140
210
141
211
let all_source_file_infos = StageTable :: list_files ( & table_ctx, & stage_table_info) . await ?;
142
212
143
- // TODO: color_copied_files
213
+ // TODO:(everpcpc) color_copied_files
144
214
145
215
let mut need_copied_file_infos = vec ! [ ] ;
146
216
for file in & all_source_file_infos {
@@ -149,9 +219,8 @@ impl InsertInterpreterV2 {
149
219
}
150
220
}
151
221
152
- // DEBUG:
153
- tracing:: warn!(
154
- "insert: read all sideload files finished, all:{}, need copy:{}, elapsed:{}" ,
222
+ tracing:: info!(
223
+ "insert: read all stage attachment files finished, all:{}, need copy:{}, elapsed:{}" ,
155
224
all_source_file_infos. len( ) ,
156
225
need_copied_file_infos. len( ) ,
157
226
start. elapsed( ) . as_secs( )
@@ -199,19 +268,20 @@ impl InsertInterpreterV2 {
199
268
None => {
200
269
let append_entries = ctx. consume_precommit_blocks ( ) ;
201
270
// We must put the commit operation to global runtime, which will avoid the "dispatch dropped without returning error" in tower
202
- return GlobalIORuntime :: instance ( ) . block_on ( async move {
203
- // DEBUG:
204
- tracing:: warn!(
271
+ GlobalIORuntime :: instance ( ) . block_on ( async move {
272
+ tracing:: info!(
205
273
"insert: try to commit append entries:{}, elapsed:{}" ,
206
274
append_entries. len( ) ,
207
275
start. elapsed( ) . as_secs( )
208
276
) ;
209
277
table
210
278
. commit_insertion ( ctx, append_entries, overwrite)
211
279
. await ?;
280
+
281
+ // TODO:(everpcpc) purge copied files
282
+
212
283
Ok ( ( ) )
213
- // TODO: purge copied files
214
- } ) ;
284
+ } )
215
285
}
216
286
}
217
287
} ) ;
@@ -273,25 +343,15 @@ impl Interpreter for InsertInterpreterV2 {
273
343
. format
274
344
. exec_stream ( input_context. clone ( ) , & mut build_res. main_pipeline ) ?;
275
345
}
276
- InsertInputSource :: Sideload ( opts) => {
346
+ InsertInputSource :: Stage ( opts) => {
277
347
// DEBUG:
278
- tracing:: warn!( "==> sideload insert: {:?}" , opts) ;
279
-
280
- match & opts. stage {
281
- None => {
282
- return Err ( ErrorCode :: BadDataValueType (
283
- "No stage location provided" . to_string ( ) ,
284
- ) ) ;
285
- }
286
- Some ( stage_location) => {
287
- self . build_insert_from_stage_pipeline (
288
- table. clone ( ) ,
289
- stage_location,
290
- & mut build_res. main_pipeline ,
291
- )
292
- . await ?;
293
- }
294
- }
348
+ tracing:: warn!( "==> insert from stage: {:?}" , opts) ;
349
+ self . build_insert_from_stage_pipeline (
350
+ table. clone ( ) ,
351
+ opts. clone ( ) ,
352
+ & mut build_res. main_pipeline ,
353
+ )
354
+ . await ?;
295
355
return Ok ( build_res) ;
296
356
}
297
357
InsertInputSource :: SelectPlan ( plan) => {
@@ -741,7 +801,8 @@ async fn exprs_to_datavalue<'a>(
741
801
Ok ( datavalues)
742
802
}
743
803
744
- // FIXME: tmp copy from src/query/sql/src/planner/binder/copy.rs
804
+ // TODO:(everpcpc) tmp copy from src/query/sql/src/planner/binder/copy.rs
805
+ // move to user stage module
745
806
async fn parse_stage_location (
746
807
ctx : & Arc < QueryContext > ,
747
808
location : & str ,
0 commit comments