12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
+ use std:: collections:: BTreeMap ;
15
16
use std:: str:: FromStr ;
17
+ use std:: sync:: Arc ;
16
18
17
19
use common_ast:: ast:: CopyStmt ;
18
20
use common_ast:: ast:: CopyUnit ;
@@ -22,22 +24,26 @@ use common_ast::parser::parse_sql;
22
24
use common_ast:: parser:: tokenize_sql;
23
25
use common_ast:: Backtrace ;
24
26
use common_ast:: Dialect ;
27
+ use common_catalog:: table_context:: TableContext ;
25
28
use common_exception:: ErrorCode ;
26
29
use common_exception:: Result ;
30
+ use common_io:: prelude:: parse_escape_string;
27
31
use common_legacy_planners:: ReadDataSourcePlan ;
28
32
use common_legacy_planners:: SourceInfo ;
29
33
use common_legacy_planners:: StageTableInfo ;
34
+ use common_meta_types:: FileFormatOptions ;
35
+ use common_meta_types:: StageFileFormatType ;
30
36
use common_meta_types:: UserStageInfo ;
31
37
use common_storage:: parse_uri_location;
32
38
use common_storage:: UriLocation ;
39
+ use common_users:: UserApiProvider ;
40
+ use tracing:: debug;
33
41
34
42
use crate :: sql:: binder:: Binder ;
35
43
use crate :: sql:: normalize_identifier;
36
44
use crate :: sql:: plans:: CopyPlanV2 ;
37
45
use crate :: sql:: plans:: Plan ;
38
46
use crate :: sql:: plans:: ValidationMode ;
39
- use crate :: sql:: statements:: parse_copy_file_format_options;
40
- use crate :: sql:: statements:: parse_stage_location_v2;
41
47
use crate :: sql:: BindContext ;
42
48
43
49
impl < ' a > Binder {
@@ -500,3 +506,123 @@ impl<'a> Binder {
500
506
Ok ( ( ) )
501
507
}
502
508
}
509
+
510
+ /// Named stage(start with `@`):
511
+ ///
512
+ /// ```sql
513
+ /// copy into mytable from @my_ext_stage
514
+ /// file_format = (type = csv);
515
+ /// ```
516
+ ///
517
+ /// Returns user's stage info and relative path towards the stage's root.
518
+ ///
519
+ /// If input location is empty we will convert it to `/` means the root of stage
520
+ ///
521
+ /// - @mystage => (mystage, "/")
522
+ ///
523
+ /// If input location is endswith `/`, it's a folder.
524
+ ///
525
+ /// - @mystage/ => (mystage, "/")
526
+ ///
527
+ /// Otherwise, it's a file
528
+ ///
529
+ /// - @mystage/abc => (mystage, "abc")
530
+ ///
531
+ /// For internal stage, we will also add prefix `/stage/<stage>/`
532
+ ///
533
+ /// - @internal/abc => (internal, "/stage/internal/abc")
534
+ pub async fn parse_stage_location (
535
+ ctx : & Arc < dyn TableContext > ,
536
+ location : & str ,
537
+ ) -> Result < ( UserStageInfo , String ) > {
538
+ let s: Vec < & str > = location. split ( '@' ) . collect ( ) ;
539
+ // @my_ext_stage/abc/
540
+ let names: Vec < & str > = s[ 1 ] . splitn ( 2 , '/' ) . filter ( |v| !v. is_empty ( ) ) . collect ( ) ;
541
+ let stage = UserApiProvider :: instance ( )
542
+ . get_stage ( & ctx. get_tenant ( ) , names[ 0 ] )
543
+ . await ?;
544
+
545
+ let path = names. get ( 1 ) . unwrap_or ( & "" ) . trim_start_matches ( '/' ) ;
546
+
547
+ let prefix = stage. get_prefix ( ) ;
548
+ let relative_path = format ! ( "{prefix}{path}" ) ;
549
+
550
+ debug ! ( "parsed stage: {stage:?}, path: {relative_path}" ) ;
551
+ Ok ( ( stage, relative_path) )
552
+ }
553
+
554
+ /// parse_stage_location_v2 work similar to parse_stage_location.
555
+ ///
556
+ /// Difference is input location has already been parsed by parser.
557
+ pub async fn parse_stage_location_v2 (
558
+ ctx : & Arc < dyn TableContext > ,
559
+ name : & str ,
560
+ path : & str ,
561
+ ) -> Result < ( UserStageInfo , String ) > {
562
+ debug_assert ! ( path. starts_with( '/' ) , "path should starts with '/'" ) ;
563
+
564
+ let stage = UserApiProvider :: instance ( )
565
+ . get_stage ( & ctx. get_tenant ( ) , name)
566
+ . await ?;
567
+
568
+ let prefix = stage. get_prefix ( ) ;
569
+ debug_assert ! ( prefix. ends_with( '/' ) , "prefix should ends with '/'" ) ;
570
+
571
+ // prefix must be endswith `/`, so we should trim path here.
572
+ let relative_path = format ! ( "{prefix}{}" , path. trim_start_matches( '/' ) ) ;
573
+
574
+ debug ! ( "parsed stage: {stage:?}, path: {relative_path}" ) ;
575
+ Ok ( ( stage, relative_path) )
576
+ }
577
+
578
+ /// TODO(xuanwo): Move those logic into parser
579
+ pub fn parse_copy_file_format_options (
580
+ file_format_options : & BTreeMap < String , String > ,
581
+ ) -> Result < FileFormatOptions > {
582
+ // File format type.
583
+ let format = file_format_options
584
+ . get ( "type" )
585
+ . ok_or_else ( || ErrorCode :: SyntaxException ( "File format type must be specified" ) ) ?;
586
+ let file_format = StageFileFormatType :: from_str ( format)
587
+ . map_err ( |e| ErrorCode :: SyntaxException ( format ! ( "File format type error:{:?}" , e) ) ) ?;
588
+
589
+ // Skip header.
590
+ let skip_header = file_format_options
591
+ . get ( "skip_header" )
592
+ . unwrap_or ( & "0" . to_string ( ) )
593
+ . parse :: < u64 > ( ) ?;
594
+
595
+ // Field delimiter.
596
+ let field_delimiter = parse_escape_string (
597
+ file_format_options
598
+ . get ( "field_delimiter" )
599
+ . unwrap_or ( & "" . to_string ( ) )
600
+ . as_bytes ( ) ,
601
+ ) ;
602
+
603
+ // Record delimiter.
604
+ let record_delimiter = parse_escape_string (
605
+ file_format_options
606
+ . get ( "record_delimiter" )
607
+ . unwrap_or ( & "" . to_string ( ) )
608
+ . as_bytes ( ) ,
609
+ ) ;
610
+
611
+ // Compression delimiter.
612
+ let compression = parse_escape_string (
613
+ file_format_options
614
+ . get ( "compression" )
615
+ . unwrap_or ( & "none" . to_string ( ) )
616
+ . as_bytes ( ) ,
617
+ )
618
+ . parse ( )
619
+ . map_err ( ErrorCode :: UnknownCompressionType ) ?;
620
+
621
+ Ok ( FileFormatOptions {
622
+ format : file_format,
623
+ skip_header,
624
+ field_delimiter,
625
+ record_delimiter,
626
+ compression,
627
+ } )
628
+ }
0 commit comments