Skip to content

Commit 74e80fc

Browse files
authored
Merge pull request #7629 from Xuanwo/bump-opendal
feat: Bump OpenDAL to v0.17
2 parents f484c6a + 8ac74f9 commit 74e80fc

File tree

15 files changed

+72
-53
lines changed

15 files changed

+72
-53
lines changed

Cargo.lock

Lines changed: 21 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/contexts/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ test = false
1212
common-base = { path = "../base" }
1313

1414
async-trait = "0.1.56"
15-
opendal = { version = "0.16.0", features = ["layers-retry"] }
15+
opendal = { version = "0.17.0", features = ["layers-retry"] }

src/common/contexts/src/dal/dal_context.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,14 @@ impl Accessor for DalContext {
8383
.metadata()
8484
}
8585

86-
fn presign(&self, args: &OpPresign) -> Result<PresignedRequest> {
87-
self.get_inner()?.presign(args)
86+
async fn create(&self, path: &str, args: OpCreate) -> Result<()> {
87+
self.get_inner()?.create(path, args).await
8888
}
8989

90-
async fn create(&self, args: &OpCreate) -> Result<()> {
91-
self.get_inner()?.create(args).await
92-
}
93-
94-
async fn read(&self, args: &OpRead) -> Result<BytesReader> {
90+
async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
9591
let metric = self.metrics.clone();
9692

97-
self.get_inner()?.read(args).await.map(|r| {
93+
self.get_inner()?.read(path, args).await.map(|r| {
9894
let mut last_pending = None;
9995
let r = observe_read(r, move |e| {
10096
let start = match last_pending {
@@ -117,7 +113,7 @@ impl Accessor for DalContext {
117113
})
118114
}
119115

120-
async fn write(&self, args: &OpWrite, r: BytesReader) -> Result<u64> {
116+
async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result<u64> {
121117
let metric = self.metrics.clone();
122118

123119
let mut last_pending = None;
@@ -139,18 +135,22 @@ impl Accessor for DalContext {
139135
metric.inc_write_bytes_cost(start.elapsed().as_millis() as u64);
140136
});
141137

142-
self.get_inner()?.write(args, Box::new(r)).await
138+
self.get_inner()?.write(path, args, Box::new(r)).await
139+
}
140+
141+
async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
142+
self.get_inner()?.stat(path, args).await
143143
}
144144

145-
async fn stat(&self, args: &OpStat) -> Result<ObjectMetadata> {
146-
self.get_inner()?.stat(args).await
145+
async fn delete(&self, path: &str, args: OpDelete) -> Result<()> {
146+
self.get_inner()?.delete(path, args).await
147147
}
148148

149-
async fn delete(&self, args: &OpDelete) -> Result<()> {
150-
self.get_inner()?.delete(args).await
149+
async fn list(&self, path: &str, args: OpList) -> Result<DirStreamer> {
150+
self.get_inner()?.list(path, args).await
151151
}
152152

153-
async fn list(&self, args: &OpList) -> Result<DirStreamer> {
154-
self.get_inner()?.list(args).await
153+
fn presign(&self, path: &str, args: OpPresign) -> Result<PresignedRequest> {
154+
self.get_inner()?.presign(path, args)
155155
}
156156
}

src/common/contexts/src/dal/dal_runtime.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -83,61 +83,61 @@ impl Accessor for DalRuntime {
8383
.metadata()
8484
}
8585

86-
fn presign(&self, args: &OpPresign) -> Result<PresignedRequest> {
87-
self.get_inner()?.presign(args)
88-
}
89-
90-
async fn create(&self, args: &OpCreate) -> Result<()> {
86+
async fn create(&self, path: &str, args: OpCreate) -> Result<()> {
9187
let op = self.get_inner()?;
92-
let args = args.clone();
88+
let path = path.to_string();
9389
self.runtime
94-
.spawn(async move { op.create(&args).await })
90+
.spawn(async move { op.create(&path, args).await })
9591
.await
9692
.expect("join must success")
9793
}
9894

99-
async fn read(&self, args: &OpRead) -> Result<BytesReader> {
95+
async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
10096
let op = self.get_inner()?;
101-
let args = args.clone();
97+
let path = path.to_string();
10298
self.runtime
103-
.spawn(async move { op.read(&args).await })
99+
.spawn(async move { op.read(&path, args).await })
104100
.await
105101
.expect("join must success")
106102
}
107103

108-
async fn write(&self, args: &OpWrite, r: BytesReader) -> Result<u64> {
104+
async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result<u64> {
109105
let op = self.get_inner()?;
110-
let args = args.clone();
106+
let path = path.to_string();
111107
self.runtime
112-
.spawn(async move { op.write(&args, r).await })
108+
.spawn(async move { op.write(&path, args, r).await })
113109
.await
114110
.expect("join must success")
115111
}
116112

117-
async fn stat(&self, args: &OpStat) -> Result<ObjectMetadata> {
113+
async fn stat(&self, path: &str, args: OpStat) -> Result<ObjectMetadata> {
118114
let op = self.get_inner()?;
119-
let args = args.clone();
115+
let path = path.to_string();
120116
self.runtime
121-
.spawn(async move { op.stat(&args).await })
117+
.spawn(async move { op.stat(&path, args).await })
122118
.await
123119
.expect("join must success")
124120
}
125121

126-
async fn delete(&self, args: &OpDelete) -> Result<()> {
122+
async fn delete(&self, path: &str, args: OpDelete) -> Result<()> {
127123
let op = self.get_inner()?;
128-
let args = args.clone();
124+
let path = path.to_string();
129125
self.runtime
130-
.spawn(async move { op.delete(&args).await })
126+
.spawn(async move { op.delete(&path, args).await })
131127
.await
132128
.expect("join must success")
133129
}
134130

135-
async fn list(&self, args: &OpList) -> Result<DirStreamer> {
131+
async fn list(&self, path: &str, args: OpList) -> Result<DirStreamer> {
136132
let op = self.get_inner()?;
137-
let args = args.clone();
133+
let path = path.to_string();
138134
self.runtime
139-
.spawn(async move { op.list(&args).await })
135+
.spawn(async move { op.list(&path, args).await })
140136
.await
141137
.expect("join must success")
142138
}
139+
140+
fn presign(&self, path: &str, args: OpPresign) -> Result<PresignedRequest> {
141+
self.get_inner()?.presign(path, args)
142+
}
143143
}

src/common/storage/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ anyhow = "1.0.64"
1818
backon = "0.1.0"
1919
globiter = "0.1.0"
2020
once_cell = "1.12.0"
21-
opendal = { version = "0.16.0", features = [
21+
opendal = { version = "0.17.0", features = [
2222
"layers-retry",
2323
"layers-tracing",
2424
"layers-metrics",

src/query/catalog/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@ common-users = { path = "../users" }
2626

2727
async-trait = "0.1.56"
2828
dyn-clone = "1.0.6"
29-
opendal = { version = "0.16.0", features = ["layers-retry"] }
29+
opendal = { version = "0.17.0", features = ["layers-retry"] }
3030
parking_lot = "0.12.1"

src/query/config/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ common-users = { path = "../users" }
2121
clap = { version = "3.2.5", features = ["derive", "env"] }
2222
hex = "0.4.3"
2323
once_cell = "1.12.0"
24-
opendal = { version = "0.16.0", features = ["layers-retry", "compress"], optional = true }
24+
opendal = { version = "0.17.0", features = ["layers-retry", "compress"], optional = true }
2525
semver = "1.0.10"
2626
serde = { version = "1.0.137", features = ["derive"] }
2727
serfig = "0.0.2"

src/query/pipeline/sources/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ common-streams = { path = "../../streams" }
2323
async-trait = { version = "0.1.0", package = "async-trait-fn" }
2424
futures = "0.3.21"
2525
futures-util = "0.3.21"
26-
opendal = { version = "0.16.0", features = ["layers-retry", "compress"] }
26+
opendal = { version = "0.17.0", features = ["layers-retry", "compress"] }
2727
parking_lot = "0.12.1"

src/query/service/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ num = "0.4.0"
111111
num_cpus = "1.13.1"
112112
octocrab = "0.16.0"
113113
once_cell = "1.12.0"
114-
opendal = { version = "0.16.0", features = ["layers-retry", "layers-tracing", "layers-metrics", "compress"] }
114+
opendal = { version = "0.17.0", features = ["layers-retry", "layers-tracing", "layers-metrics", "compress"] }
115115
opensrv-mysql = "0.2.0"
116116
openssl = { version = "0.10.40", features = ["vendored"] }
117117
parking_lot = "0.12.1"

src/query/service/tests/it/storages/fuse/io.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,14 +227,14 @@ where
227227
P: Debug + Send,
228228
P: Iterator<Item = IOError>,
229229
{
230-
async fn read(&self, _args: &OpRead) -> std::io::Result<BytesReader> {
230+
async fn read(&self, _: &str, _: OpRead) -> std::io::Result<BytesReader> {
231231
let v = &mut (*self.err.lock());
232232
let err = v.take_err();
233233
v.increase_read_op_count();
234234
Err(err)
235235
}
236236

237-
async fn write(&self, _args: &OpWrite, _r: BytesReader) -> std::io::Result<u64> {
237+
async fn write(&self, _: &str, _: OpWrite, _: BytesReader) -> std::io::Result<u64> {
238238
let v = &mut (*self.err.lock());
239239
let err = v.take_err();
240240
v.increase_write_op_count();

0 commit comments

Comments
 (0)