Skip to content

Commit d58b8bd

Browse files
committed
feat: add directory support
1. now support directories 2. https://ipfs.io is too slow, replace it into https://ipfs.filebase.io Signed-off-by: ClSlaid <cailue@bupt.edu.cn>
1 parent d24244b commit d58b8bd

File tree

7 files changed

+58
-26
lines changed

7 files changed

+58
-26
lines changed

src/common/storage/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ edition = "2021"
88

99
[features]
1010
storage-hdfs = ["opendal/services-hdfs"]
11-
storage-ipfs = ["opendal/services-ipfs"]
1211

1312
[dependencies]
1413
common-base = { path = "../base" }
@@ -23,6 +22,7 @@ opendal = { version = "0.17.0", features = [
2322
"layers-retry",
2423
"layers-tracing",
2524
"layers-metrics",
25+
"services-ipfs",
2626
"compress",
2727
] }
2828
percent-encoding = "2.1.0"

src/common/storage/src/config.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ pub enum StorageParams {
3939
#[cfg(feature = "storage-hdfs")]
4040
Hdfs(StorageHdfsConfig),
4141
Http(StorageHttpConfig),
42-
#[cfg(feature = "storage-ipfs")]
4342
Ipfs(StorageIpfsConfig),
4443
Memory,
4544
Obs(StorageObsConfig),
@@ -74,7 +73,6 @@ impl Display for StorageParams {
7473
StorageParams::Http(v) => {
7574
write!(f, "http://endpoint={},paths={:?}", v.endpoint_url, v.paths)
7675
}
77-
#[cfg(feature = "storage-ipfs")]
7876
StorageParams::Ipfs(c) => {
7977
write!(f, "ipfs://endpoint={},root={}", c.endpoint_url, c.root)
8078
}
@@ -106,7 +104,6 @@ impl StorageParams {
106104
#[cfg(feature = "storage-hdfs")]
107105
StorageParams::Hdfs(_) => false,
108106
StorageParams::Http(v) => v.endpoint_url.starts_with("https://"),
109-
#[cfg(feature = "storage-ipfs")]
110107
StorageParams::Ipfs(c) => c.endpoint_url.starts_with("https://"),
111108
StorageParams::Memory => false,
112109
StorageParams::Obs(v) => v.endpoint_url.starts_with("https://"),
@@ -262,8 +259,7 @@ pub struct StorageHttpConfig {
262259
pub paths: Vec<String>,
263260
}
264261

265-
#[cfg(feature = "storage-ipfs")]
266-
pub const STORAGE_IPFS_DEFAULT_ENDPOINT: &str = "https://ipfs.io";
262+
pub const STORAGE_IPFS_DEFAULT_ENDPOINT: &str = "https://ipfs.filebase.io";
267263
/// Config for IPFS storage backend
268264
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
269265
pub struct StorageIpfsConfig {

src/common/storage/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ pub use config::StorageObsConfig;
2828
pub use config::StorageParams;
2929
pub use config::StorageS3Config;
3030
pub use config::STORAGE_GCS_DEFAULT_ENDPOINT;
31-
#[cfg(feature = "storage-ipfs")]
3231
pub use config::STORAGE_IPFS_DEFAULT_ENDPOINT;
3332
pub use config::STORAGE_S3_DEFAULT_ENDPOINT;
3433

src/common/storage/src/location.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use opendal::Scheme;
2222
use percent_encoding::percent_decode_str;
2323

2424
use crate::config::StorageHttpConfig;
25+
use crate::config::StorageIpfsConfig;
26+
use crate::config::STORAGE_IPFS_DEFAULT_ENDPOINT;
2527
use crate::config::STORAGE_S3_DEFAULT_ENDPOINT;
2628
use crate::StorageAzblobConfig;
2729
use crate::StorageParams;
@@ -42,10 +44,10 @@ pub fn parse_uri_location(l: &UriLocation) -> Result<(StorageParams, String)> {
4244
// Path endswith `/` means it's a directory, otherwise it's a file.
4345
// If the path is a directory, we will use this path as root.
4446
// If the path is a file, we will use `/` as root (which is the default value)
45-
let root = if l.path.ends_with('/') {
46-
l.path.as_str()
47+
let (mut root, mut path) = if l.path.ends_with('/') {
48+
(l.path.as_str(), "/")
4749
} else {
48-
"/"
50+
("/", l.path.as_str())
4951
};
5052

5153
let protocol = l.protocol.parse::<Scheme>()?;
@@ -95,13 +97,15 @@ pub fn parse_uri_location(l: &UriLocation) -> Result<(StorageParams, String)> {
9597
.to_string(),
9698
root: root.to_string(),
9799
}),
98-
#[cfg(feature = "storage-ipfs")]
99100
Scheme::Ipfs => {
100-
use crate::config::StorageIpfsConfig;
101-
use crate::config::STORAGE_IPFS_DEFAULT_ENDPOINT;
101+
if l.name.ends_with('/') {
102+
root = l.name.as_str();
103+
} else {
104+
path = l.name.as_str();
105+
}
102106
StorageParams::Ipfs(StorageIpfsConfig {
103107
endpoint_url: STORAGE_IPFS_DEFAULT_ENDPOINT.to_string(),
104-
root: "/ipfs/".to_string(),
108+
root: "/ipfs/".to_string() + root,
105109
})
106110
}
107111
Scheme::S3 => StorageParams::S3(StorageS3Config {
@@ -167,11 +171,5 @@ pub fn parse_uri_location(l: &UriLocation) -> Result<(StorageParams, String)> {
167171
}
168172
};
169173

170-
let path = if cfg!(feature = "storage-ipfs") {
171-
&l.name
172-
} else {
173-
"/"
174-
};
175-
176174
Ok((sp, path.to_string()))
177175
}

src/common/storage/src/operator.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ pub fn init_operator(cfg: &StorageParams) -> Result<Operator> {
5353
#[cfg(feature = "storage-hdfs")]
5454
StorageParams::Hdfs(cfg) => init_hdfs_operator(cfg)?,
5555
StorageParams::Http(cfg) => init_http_operator(cfg)?,
56-
#[cfg(feature = "storage-ipfs")]
5756
StorageParams::Ipfs(cfg) => init_ipfs_operator(cfg)?,
5857
StorageParams::Memory => init_memory_operator()?,
5958
StorageParams::Obs(cfg) => init_obs_operator(cfg)?,
@@ -124,7 +123,6 @@ pub fn init_hdfs_operator(cfg: &super::StorageHdfsConfig) -> Result<Operator> {
124123
Ok(Operator::new(builder.build()?).layer(LoggingLayer))
125124
}
126125

127-
#[cfg(feature = "storage-ipfs")]
128126
pub fn init_ipfs_operator(cfg: &super::StorageIpfsConfig) -> Result<Operator> {
129127
use opendal::services::ipfs;
130128

src/common/storage/tests/location.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,49 @@ use std::io::Result;
1818
use common_storage::parse_uri_location;
1919
use common_storage::StorageGcsConfig;
2020
use common_storage::StorageHttpConfig;
21+
use common_storage::StorageIpfsConfig;
2122
use common_storage::StorageParams;
2223
use common_storage::StorageS3Config;
2324
use common_storage::UriLocation;
2425
use common_storage::STORAGE_GCS_DEFAULT_ENDPOINT;
26+
use common_storage::STORAGE_IPFS_DEFAULT_ENDPOINT;
2527
use common_storage::STORAGE_S3_DEFAULT_ENDPOINT;
2628

2729
#[test]
2830
fn test_parse_uri_location() -> Result<()> {
2931
let cases = vec![
32+
(
33+
"ipfs-directory",
34+
UriLocation {
35+
protocol: "ipfs".to_string(),
36+
name: "too-simple/".to_string(),
37+
path: "/".to_string(),
38+
connection: BTreeMap::new(),
39+
},
40+
(
41+
StorageParams::Ipfs(StorageIpfsConfig {
42+
endpoint_url: STORAGE_IPFS_DEFAULT_ENDPOINT.to_string(),
43+
root: "/ipfs/too-simple/".to_string(),
44+
}),
45+
"/".to_string(),
46+
),
47+
),
48+
(
49+
"ipfs-file",
50+
UriLocation {
51+
protocol: "ipfs".to_string(),
52+
name: "too-naive".to_string(),
53+
path: "/".to_string(),
54+
connection: Default::default(),
55+
},
56+
(
57+
StorageParams::Ipfs(StorageIpfsConfig {
58+
endpoint_url: STORAGE_IPFS_DEFAULT_ENDPOINT.to_string(),
59+
root: "/ipfs//".to_string(),
60+
}),
61+
"too-naive".to_string(),
62+
),
63+
),
3064
(
3165
"s3_with_access_key_id",
3266
UriLocation {
@@ -176,12 +210,8 @@ fn test_parse_uri_location() -> Result<()> {
176210
Ok(())
177211
}
178212

179-
#[cfg(feature = "storage-ipfs")]
180213
#[test]
181214
fn test_parse_ipfs_uri_location() {
182-
use common_storage::StorageIpfsConfig;
183-
use common_storage::STORAGE_IPFS_DEFAULT_ENDPOINT;
184-
185215
let l = UriLocation {
186216
protocol: "ipfs".to_string(),
187217
name: "somename".to_string(),

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ impl CopyInterpreterV2 {
6565
match &from.source_info {
6666
SourceInfo::StageSource(table_info) => {
6767
let path = &table_info.path;
68+
tracing::info!("path is :{}", path);
69+
6870
// Here we add the path to the file: /path/to/path/file1.
6971
let files_with_path = if !files.is_empty() {
7072
let mut files_with_path = vec![];
@@ -74,6 +76,7 @@ impl CopyInterpreterV2 {
7476
}
7577
files_with_path
7678
} else if !path.ends_with('/') {
79+
tracing::info!("path not ends with '/'");
7780
let rename_me: Arc<dyn TableContext> = self.ctx.clone();
7881
let op = StageSourceHelper::get_op(&rename_me, &table_info.stage_info).await?;
7982
if op.object(path).is_exist().await? {
@@ -82,6 +85,7 @@ impl CopyInterpreterV2 {
8285
vec![]
8386
}
8487
} else {
88+
tracing::info!("shit");
8589
let rename_me: Arc<dyn TableContext> = self.ctx.clone();
8690
let op = StageSourceHelper::get_op(&rename_me, &table_info.stage_info).await?;
8791

@@ -91,10 +95,13 @@ impl CopyInterpreterV2 {
9195

9296
// TODO: we could rewrite into try_collect.
9397
let mut objects = op.batch().walk_top_down(path)?;
98+
tracing::info!("common here we go top down");
9499
while let Some(de) = objects.try_next().await? {
95100
if de.mode().is_dir() {
101+
tracing::info!("dir continue");
96102
continue;
97103
}
104+
tracing::info!("insert path to list");
98105
list.insert(de.path().to_string());
99106
}
100107

@@ -303,6 +310,10 @@ impl Interpreter for CopyInterpreterV2 {
303310

304311
// Pattern match check.
305312
let pattern = &pattern;
313+
tracing::info!(
314+
"parttern should be empty right, tell me, it is {}",
315+
pattern.is_empty()
316+
);
306317
if !pattern.is_empty() {
307318
let regex = Regex::new(pattern).map_err(|e| {
308319
ErrorCode::SyntaxException(format!(

0 commit comments

Comments
 (0)