Skip to content

Commit e92e1c4

Browse files
authored
Merge pull request #7729 from ClSlaid/copy-from-ipfs
feat(query): implement copy from ipfs
2 parents 48294c8 + a250954 commit e92e1c4

File tree

10 files changed

+111
-1
lines changed

10 files changed

+111
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ opendal = { version = "0.17.1", features = [
2222
"layers-retry",
2323
"layers-tracing",
2424
"layers-metrics",
25+
"services-ipfs",
2526
"compress",
2627
] }
2728
percent-encoding = "2.2.0"

src/common/storage/src/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub enum StorageParams {
3939
#[cfg(feature = "storage-hdfs")]
4040
Hdfs(StorageHdfsConfig),
4141
Http(StorageHttpConfig),
42+
Ipfs(StorageIpfsConfig),
4243
Memory,
4344
Obs(StorageObsConfig),
4445
S3(StorageS3Config),
@@ -72,6 +73,9 @@ impl Display for StorageParams {
7273
StorageParams::Http(v) => {
7374
write!(f, "http://endpoint={},paths={:?}", v.endpoint_url, v.paths)
7475
}
76+
StorageParams::Ipfs(c) => {
77+
write!(f, "ipfs://endpoint={},root={}", c.endpoint_url, c.root)
78+
}
7579
StorageParams::Memory => write!(f, "memory://"),
7680
StorageParams::Obs(v) => write!(
7781
f,
@@ -100,6 +104,7 @@ impl StorageParams {
100104
#[cfg(feature = "storage-hdfs")]
101105
StorageParams::Hdfs(_) => false,
102106
StorageParams::Http(v) => v.endpoint_url.starts_with("https://"),
107+
StorageParams::Ipfs(c) => c.endpoint_url.starts_with("https://"),
103108
StorageParams::Memory => false,
104109
StorageParams::Obs(v) => v.endpoint_url.starts_with("https://"),
105110
StorageParams::S3(v) => v.endpoint_url.starts_with("https://"),
@@ -263,6 +268,14 @@ pub struct StorageHttpConfig {
263268
pub paths: Vec<String>,
264269
}
265270

271+
pub const STORAGE_IPFS_DEFAULT_ENDPOINT: &str = "https://ipfs.io";
272+
/// Config for IPFS storage backend
273+
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
274+
pub struct StorageIpfsConfig {
275+
pub endpoint_url: String,
276+
pub root: String,
277+
}
278+
266279
/// Config for storage backend obs.
267280
#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
268281
pub struct StorageObsConfig {

src/common/storage/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ pub use config::StorageFsConfig;
2323
pub use config::StorageGcsConfig;
2424
pub use config::StorageHdfsConfig;
2525
pub use config::StorageHttpConfig;
26+
pub use config::StorageIpfsConfig;
2627
pub use config::StorageObsConfig;
2728
pub use config::StorageParams;
2829
pub use config::StorageS3Config;
2930
pub use config::STORAGE_GCS_DEFAULT_ENDPOINT;
31+
pub use config::STORAGE_IPFS_DEFAULT_ENDPOINT;
3032
pub use config::STORAGE_S3_DEFAULT_ENDPOINT;
3133

3234
mod operator;

src/common/storage/src/location.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use opendal::Scheme;
2222
use percent_encoding::percent_decode_str;
2323

2424
use crate::config::StorageHttpConfig;
25+
use crate::config::StorageIpfsConfig;
26+
use crate::config::STORAGE_IPFS_DEFAULT_ENDPOINT;
2527
use crate::config::STORAGE_S3_DEFAULT_ENDPOINT;
2628
use crate::StorageAzblobConfig;
2729
use crate::StorageParams;
@@ -95,6 +97,14 @@ pub fn parse_uri_location(l: &UriLocation) -> Result<(StorageParams, String)> {
9597
.to_string(),
9698
root: root.to_string(),
9799
}),
100+
Scheme::Ipfs => StorageParams::Ipfs(StorageIpfsConfig {
101+
endpoint_url: l
102+
.connection
103+
.get("endpoint_url")
104+
.cloned()
105+
.unwrap_or_else(|| STORAGE_IPFS_DEFAULT_ENDPOINT.to_string()),
106+
root: "/ipfs/".to_string() + l.name.as_str(),
107+
}),
98108
Scheme::S3 => StorageParams::S3(StorageS3Config {
99109
endpoint_url: l
100110
.connection

src/common/storage/src/operator.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub fn init_operator(cfg: &StorageParams) -> Result<Operator> {
5353
#[cfg(feature = "storage-hdfs")]
5454
StorageParams::Hdfs(cfg) => init_hdfs_operator(cfg)?,
5555
StorageParams::Http(cfg) => init_http_operator(cfg)?,
56+
StorageParams::Ipfs(cfg) => init_ipfs_operator(cfg)?,
5657
StorageParams::Memory => init_memory_operator()?,
5758
StorageParams::Obs(cfg) => init_obs_operator(cfg)?,
5859
StorageParams::S3(cfg) => init_s3_operator(cfg)?,
@@ -122,6 +123,17 @@ pub fn init_hdfs_operator(cfg: &super::StorageHdfsConfig) -> Result<Operator> {
122123
Ok(Operator::new(builder.build()?).layer(LoggingLayer))
123124
}
124125

126+
pub fn init_ipfs_operator(cfg: &super::StorageIpfsConfig) -> Result<Operator> {
127+
use opendal::services::ipfs;
128+
129+
let mut builder = ipfs::Builder::default();
130+
131+
builder.root(&cfg.root);
132+
builder.endpoint(&cfg.endpoint_url);
133+
134+
Ok(Operator::new(builder.build()?).layer(LoggingLayer))
135+
}
136+
125137
pub fn init_http_operator(cfg: &StorageHttpConfig) -> Result<Operator> {
126138
let mut builder = http::Builder::default();
127139

src/common/storage/tests/location.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,52 @@ use std::io::Result;
1818
use common_storage::parse_uri_location;
1919
use common_storage::StorageGcsConfig;
2020
use common_storage::StorageHttpConfig;
21+
use common_storage::StorageIpfsConfig;
2122
use common_storage::StorageParams;
2223
use common_storage::StorageS3Config;
2324
use common_storage::UriLocation;
2425
use common_storage::STORAGE_GCS_DEFAULT_ENDPOINT;
26+
use common_storage::STORAGE_IPFS_DEFAULT_ENDPOINT;
2527
use common_storage::STORAGE_S3_DEFAULT_ENDPOINT;
2628

2729
#[test]
2830
fn test_parse_uri_location() -> Result<()> {
2931
let cases = vec![
32+
(
33+
"ipfs-default-endpoint",
34+
UriLocation {
35+
protocol: "ipfs".to_string(),
36+
name: "too-simple".to_string(),
37+
path: "/".to_string(),
38+
connection: BTreeMap::new(),
39+
},
40+
(
41+
StorageParams::Ipfs(StorageIpfsConfig {
42+
endpoint_url: STORAGE_IPFS_DEFAULT_ENDPOINT.to_string(),
43+
root: "/ipfs/too-simple".to_string(),
44+
}),
45+
"/".to_string(),
46+
),
47+
),
48+
(
49+
"ipfs-change-endpoint",
50+
UriLocation {
51+
protocol: "ipfs".to_string(),
52+
name: "too-naive".to_string(),
53+
path: "/".to_string(),
54+
connection: vec![("endpoint_url", "https://ipfs.filebase.io")]
55+
.into_iter()
56+
.map(|(k, v)| (k.to_string(), v.to_string()))
57+
.collect(),
58+
},
59+
(
60+
StorageParams::Ipfs(StorageIpfsConfig {
61+
endpoint_url: "https://ipfs.filebase.io".to_string(),
62+
root: "/ipfs/too-naive".to_string(),
63+
}),
64+
"/".to_string(),
65+
),
66+
),
3067
(
3168
"s3_with_access_key_id",
3269
UriLocation {

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ impl CopyInterpreterV2 {
334334
bind_context,
335335
..
336336
} => (s_expr, metadata, bind_context),
337-
v => unreachable!("Input plan must be Query, but it's {v}"),
337+
v => unreachable!("Input plan must be Query, but it's {}", v),
338338
};
339339

340340
let select_interpreter = SelectInterpreterV2::try_create(
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
199 2020.0 769
2+
199 2020.0 769
3+
199 2020.0 769
4+
199 2020.0 769
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#!/usr/bin/env bash
2+
3+
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
4+
. "$CURDIR"/../../../shell_env.sh
5+
QMHASH=QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ
6+
7+
echo "drop table if exists ontime_200;" | $MYSQL_CLIENT_CONNECT
8+
9+
## Create table
10+
cat $CURDIR/../ddl/ontime.sql | sed "s/ontime/ontime_200/g" | $MYSQL_CLIENT_CONNECT
11+
12+
copy_from_location_cases=(
13+
# copy csv
14+
"copy into ontime_200 from 'ipfs://$QMHASH/ontime.csv' CONNECTION = (ENDPOINT_URL='https://ipfs.filebase.io') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' record_delimiter = '\n' skip_header = 1)"
15+
# copy gzip csv
16+
"copy into ontime_200 from 'ipfs://$QMHASH/ontime.csv.gz' CONNECTION = (ENDPOINT_URL='https://ipfs.filebase.io') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'gzip' record_delimiter = '\n' skip_header = 1)"
17+
# copy zstd csv
18+
"copy into ontime_200 from 'ipfs://$QMHASH/ontime.csv.zst' CONNECTION = (ENDPOINT_URL='https://ipfs.filebase.io') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'zstd' record_delimiter = '\n' skip_header = 1)"
19+
# copy bz2 csv
20+
"copy into ontime_200 from 'ipfs://$QMHASH/ontime.csv.bz2' CONNECTION = (ENDPOINT_URL='https://ipfs.filebase.io') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'bz2' record_delimiter = '\n' skip_header = 1)"
21+
)
22+
23+
for i in "${copy_from_location_cases[@]}"; do
24+
echo "$i" | $MYSQL_CLIENT_CONNECT
25+
echo "select count(1), avg(Year), sum(DayOfWeek) from ontime_200" | $MYSQL_CLIENT_CONNECT
26+
echo "truncate table ontime_200" | $MYSQL_CLIENT_CONNECT
27+
done
28+
29+
## Drop table
30+
echo "drop table if exists ontime_200;" | $MYSQL_CLIENT_CONNECT

0 commit comments

Comments
 (0)