Skip to content

Commit d6eca28

Browse files
authored
Merge pull request #9247 from youngsofun/stage
feat(stage): support select from URI.
2 parents fc8987a + 0af6e6f commit d6eca28

File tree

18 files changed

+134
-62
lines changed

18 files changed

+134
-62
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/storage/src/lib.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ pub use operator::init_operator;
5555
pub use operator::CacheOperator;
5656
pub use operator::DataOperator;
5757

58-
mod location;
59-
pub use location::parse_uri_location;
60-
pub use location::UriLocation;
61-
6258
mod metrics;
6359
pub use metrics::StorageMetrics;
6460
pub use metrics::StorageMetricsLayer;

src/query/ast/src/ast/query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use std::fmt::Formatter;
1818
use crate::ast::write_comma_separated_list;
1919
use crate::ast::write_period_separated_list;
2020
use crate::ast::Expr;
21+
use crate::ast::FileLocation;
2122
use crate::ast::Identifier;
22-
use crate::ast::StageLocation;
2323
use crate::parser::token::Token;
2424

2525
/// Root node of a query tree
@@ -178,7 +178,7 @@ pub enum TableReference<'a> {
178178
},
179179
Stage {
180180
span: &'a [Token<'a>],
181-
location: StageLocation,
181+
location: FileLocation,
182182
files: Vec<String>,
183183
alias: Option<TableAlias<'a>>,
184184
},

src/query/ast/src/ast/statements/copy.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,25 @@ impl Display for StageLocation {
213213
}
214214
}
215215

216+
#[derive(Debug, Clone, PartialEq, Eq)]
217+
pub enum FileLocation {
218+
Stage(StageLocation),
219+
Uri(UriLocation),
220+
}
221+
222+
impl Display for FileLocation {
223+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
224+
match self {
225+
FileLocation::Uri(loc) => {
226+
write!(f, "{}", loc)
227+
}
228+
FileLocation::Stage(loc) => {
229+
write!(f, "{}", loc)
230+
}
231+
}
232+
}
233+
}
234+
216235
pub enum CopyOption {
217236
Files(Vec<String>),
218237
Pattern(String),

src/query/ast/src/parser/query.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use pratt::PrattParser;
2222
use pratt::Precedence;
2323

2424
use super::stage::stage_location;
25+
use super::stage::uri_location;
2526
use crate::ast::*;
2627
use crate::input::Input;
2728
use crate::input::WithSpan;
@@ -259,7 +260,7 @@ pub enum TableReferenceElement<'a> {
259260
JoinCondition(JoinCondition<'a>),
260261
Group(TableReference<'a>),
261262
Stage {
262-
location: StageLocation,
263+
location: FileLocation,
263264
files: Vec<String>,
264265
alias: Option<TableAlias<'a>>,
265266
},
@@ -325,9 +326,27 @@ pub fn table_reference_element(i: Input) -> IResult<WithSpan<TableReferenceEleme
325326
|(_, table_ref, _)| TableReferenceElement::Group(table_ref),
326327
);
327328

329+
let stage_location = |i| {
330+
map_res(
331+
rule! {
332+
#stage_location
333+
},
334+
|v| Ok(FileLocation::Stage(v)),
335+
)(i)
336+
};
337+
338+
let uri_location = |i| {
339+
map_res(
340+
rule! {
341+
#uri_location
342+
},
343+
|v| Ok(FileLocation::Uri(v)),
344+
)(i)
345+
};
346+
328347
let aliased_stage = map(
329348
rule! {
330-
#stage_location ~ ( FILES ~ "=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")")? ~ #table_alias?
349+
(#stage_location | #uri_location) ~ ( FILES ~ "=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")")? ~ #table_alias?
331350
},
332351
|(location, files, alias)| TableReferenceElement::Stage {
333352
location,

src/query/sql/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ common-users = { path = "../users" }
4848

4949
# Crates.io dependencies
5050
ahash = { version = "0.8.2", features = ["no-rng"] }
51+
anyhow = { workspace = true }
5152
async-channel = "1.7.1"
5253
async-recursion = "1.0.0"
5354
async-stream = "0.3.3"
@@ -61,6 +62,7 @@ chrono-tz = { workspace = true }
6162
dashmap = "5.4"
6263
futures = "0.3.24"
6364
futures-util = "0.3.24"
65+
globiter = "0.1"
6466
headers = "0.3.8"
6567
http = "0.2.8"
6668
itertools = "0.10.5"
@@ -73,6 +75,7 @@ opendal = { version = "0.22", features = ["layers-tracing", "layers-metrics", "c
7375
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "5e37788" }
7476
openssl = { version = "0.10.41", features = ["vendored"] }
7577
parking_lot = "0.12.1"
78+
percent-encoding = "2"
7679
petgraph = "0.6.2"
7780
poem = { version = "1", features = ["rustls", "multipart", "compression"] }
7881
primitive-types = "0.12.0"

src/query/sql/src/planner/binder/copy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use common_ast::ast::CopyStmt;
2020
use common_ast::ast::CopyUnit;
2121
use common_ast::ast::Query;
2222
use common_ast::ast::Statement;
23+
use common_ast::ast::UriLocation;
2324
use common_ast::parser::parse_sql;
2425
use common_ast::parser::tokenize_sql;
2526
use common_ast::Backtrace;
@@ -35,11 +36,10 @@ use common_exception::Result;
3536
use common_meta_types::FileFormatOptions;
3637
use common_meta_types::StageFileFormatType;
3738
use common_meta_types::UserStageInfo;
38-
use common_storage::parse_uri_location;
39-
use common_storage::UriLocation;
4039
use common_users::UserApiProvider;
4140
use tracing::debug;
4241

42+
use crate::binder::location::parse_uri_location;
4343
use crate::binder::Binder;
4444
use crate::normalize_identifier;
4545
use crate::plans::CopyPlanV2;

src/query/sql/src/planner/binder/ddl/stage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
use std::str::FromStr;
1616

1717
use common_ast::ast::CreateStageStmt;
18+
use common_ast::ast::UriLocation;
1819
use common_exception::ErrorCode;
1920
use common_exception::Result;
2021
use common_meta_types::OnErrorMode;
2122
use common_meta_types::UserStageInfo;
22-
use common_storage::parse_uri_location;
23-
use common_storage::UriLocation;
2423

2524
use super::super::copy::parse_copy_file_format_options;
2625
use super::super::copy::parse_stage_location;
26+
use crate::binder::location::parse_uri_location;
2727
use crate::binder::Binder;
2828
use crate::plans::CreateStagePlan;
2929
use crate::plans::ListPlan;

src/query/sql/src/planner/binder/ddl/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use common_ast::ast::Statement;
4040
use common_ast::ast::TableReference;
4141
use common_ast::ast::TruncateTableStmt;
4242
use common_ast::ast::UndropTableStmt;
43+
use common_ast::ast::UriLocation;
4344
use common_ast::parser::parse_sql;
4445
use common_ast::parser::tokenize_sql;
4546
use common_ast::walk_expr_mut;
@@ -54,15 +55,14 @@ use common_datavalues::TypeFactory;
5455
use common_datavalues::Vu8;
5556
use common_exception::ErrorCode;
5657
use common_exception::Result;
57-
use common_storage::parse_uri_location;
5858
use common_storage::DataOperator;
59-
use common_storage::UriLocation;
6059
use common_storages_table_meta::table::is_reserved_opt_key;
6160
use common_storages_table_meta::table::OPT_KEY_DATABASE_ID;
6261
use common_storages_view::view_table::QUERY;
6362
use common_storages_view::view_table::VIEW_ENGINE;
6463
use tracing::debug;
6564

65+
use crate::binder::location::parse_uri_location;
6666
use crate::binder::scalar::ScalarBinder;
6767
use crate::binder::Binder;
6868
use crate::binder::Visibility;

src/common/storage/src/location.rs renamed to src/query/sql/src/planner/binder/location.rs

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,27 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::BTreeMap;
1615
use std::io::Error;
1716
use std::io::ErrorKind;
1817
use std::io::Result;
1918

2019
use anyhow::anyhow;
20+
use common_ast::ast::UriLocation;
21+
use common_storage::StorageAzblobConfig;
22+
use common_storage::StorageFsConfig;
23+
use common_storage::StorageFtpConfig;
24+
use common_storage::StorageGcsConfig;
25+
use common_storage::StorageHttpConfig;
26+
use common_storage::StorageIpfsConfig;
27+
use common_storage::StorageOssConfig;
28+
use common_storage::StorageParams;
29+
use common_storage::StorageS3Config;
30+
use common_storage::STORAGE_GCS_DEFAULT_ENDPOINT;
31+
use common_storage::STORAGE_IPFS_DEFAULT_ENDPOINT;
32+
use common_storage::STORAGE_S3_DEFAULT_ENDPOINT;
2133
use opendal::Scheme;
2234
use percent_encoding::percent_decode_str;
2335

24-
use crate::config::StorageHttpConfig;
25-
use crate::config::StorageIpfsConfig;
26-
use crate::config::StorageOssConfig;
27-
use crate::config::STORAGE_IPFS_DEFAULT_ENDPOINT;
28-
use crate::config::STORAGE_S3_DEFAULT_ENDPOINT;
29-
use crate::StorageAzblobConfig;
30-
use crate::StorageFsConfig;
31-
use crate::StorageParams;
32-
use crate::StorageS3Config;
33-
use crate::STORAGE_GCS_DEFAULT_ENDPOINT;
34-
35-
#[derive(Clone, Debug)]
36-
pub struct UriLocation {
37-
pub protocol: String,
38-
pub name: String,
39-
pub path: String,
40-
/// connection should carry all connection related options for storage.
41-
pub connection: BTreeMap<String, String>,
42-
}
43-
4436
/// secure_omission will fix omitted endpoint url schemes into 'https://'
4537
#[inline]
4638
fn secure_omission(endpoint: String) -> String {
@@ -85,7 +77,7 @@ pub fn parse_uri_location(l: &UriLocation) -> Result<(StorageParams, String)> {
8577
root: root.to_string(),
8678
})
8779
}
88-
Scheme::Ftp => StorageParams::Ftp(crate::StorageFtpConfig {
80+
Scheme::Ftp => StorageParams::Ftp(StorageFtpConfig {
8981
endpoint: if !l.protocol.is_empty() {
9082
format!("{}://{}", l.protocol, l.name)
9183
} else {
@@ -102,7 +94,7 @@ pub fn parse_uri_location(l: &UriLocation) -> Result<(StorageParams, String)> {
10294
.get("endpoint_url")
10395
.cloned()
10496
.unwrap_or_else(|| STORAGE_GCS_DEFAULT_ENDPOINT.to_string());
105-
StorageParams::Gcs(crate::StorageGcsConfig {
97+
StorageParams::Gcs(StorageGcsConfig {
10698
endpoint_url: secure_omission(endpoint),
10799
bucket: l.name.clone(),
108100
root: l.path.clone(),

0 commit comments

Comments
 (0)