Skip to content

Commit 8011bd0

Browse files
authored
Merge pull request #9062 from Xuanwo/cache-policy
feat: Cache Policy
2 parents dc63755 + acc564a commit 8011bd0

File tree

24 files changed

+186
-49
lines changed

24 files changed

+186
-49
lines changed

Cargo.lock

Lines changed: 8 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/exception/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ common-arrow = { path = "../arrow" }
1515

1616
anyhow = { workspace = true }
1717
bincode = { version = "2.0.0-rc.1", features = ["serde", "std", "alloc"] }
18-
opendal = "0.21"
18+
opendal = "0.22"
1919
paste = "1.0.9"
2020
prost = { workspace = true }
2121
serde = { workspace = true }

src/common/storage/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ common-exception = { path = "../exception" }
1717
anyhow = { workspace = true }
1818
async-trait = "0.1"
1919
backon = "0.2"
20+
bytes = "1"
21+
futures = "0.3"
2022
globiter = "0.1"
2123
once_cell = "1"
22-
opendal = { version = "0.21", features = [
24+
opendal = { version = "0.22", features = [
2325
"layers-tracing",
2426
"layers-metrics",
2527
"services-ipfs",

src/common/storage/src/cache.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use async_trait::async_trait;
18+
use bytes::Bytes;
19+
use futures::future::BoxFuture;
20+
use futures::io::Cursor;
21+
use futures::AsyncReadExt;
22+
use opendal::layers::CachePolicy;
23+
use opendal::raw::Accessor;
24+
use opendal::raw::BytesReader;
25+
use opendal::raw::RpRead;
26+
use opendal::Error;
27+
use opendal::ErrorKind;
28+
use opendal::OpRead;
29+
use opendal::OpWrite;
30+
use opendal::Result;
31+
32+
/// TODO: implement more complex cache logic.
33+
///
34+
/// For example:
35+
///
36+
/// - Implement a top n heap, and only cache files exist in heap.
37+
/// - Only cache data file, and ignore snapshot files.
38+
#[derive(Debug, Default)]
39+
pub struct FuseCachePolicy {}
40+
41+
impl FuseCachePolicy {
42+
pub fn new() -> Self {
43+
FuseCachePolicy::default()
44+
}
45+
46+
fn cache_path(&self, path: &str, args: &OpRead) -> String {
47+
format!("{path}.cache-{}", args.range().to_header())
48+
}
49+
}
50+
51+
#[async_trait]
52+
impl CachePolicy for FuseCachePolicy {
53+
fn on_read(
54+
&self,
55+
inner: Arc<dyn Accessor>,
56+
cache: Arc<dyn Accessor>,
57+
path: &str,
58+
args: OpRead,
59+
) -> BoxFuture<'static, Result<(RpRead, BytesReader)>> {
60+
let path = path.to_string();
61+
let cache_path = self.cache_path(&path, &args);
62+
Box::pin(async move {
63+
match cache.read(&cache_path, OpRead::default()).await {
64+
Ok(v) => Ok(v),
65+
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
66+
let (rp, mut r) = inner.read(&path, args.clone()).await?;
67+
68+
let size = rp.clone().into_metadata().content_length();
69+
// If size < 8MiB, we can optimize by buffer in memory.
70+
// TODO: make this configurable.
71+
if size <= 8 * 1024 * 1024 {
72+
let mut bs = Vec::with_capacity(size as usize);
73+
r.read_to_end(&mut bs).await.map_err(|err| {
74+
Error::new(
75+
ErrorKind::Unexpected,
76+
"read from underlying storage service",
77+
)
78+
.set_source(err)
79+
})?;
80+
let bs = Bytes::from(bs);
81+
82+
// Ignore errors returned by cache services.
83+
let _ = cache
84+
.write(
85+
&cache_path,
86+
OpWrite::new(size),
87+
Box::new(Cursor::new(bs.clone())),
88+
)
89+
.await;
90+
Ok((rp, Box::new(Cursor::new(bs)) as BytesReader))
91+
} else {
92+
// Ignore errors returned by cache services.
93+
let _ = cache.write(&cache_path, OpWrite::new(size), r).await;
94+
95+
match cache.read(&cache_path, OpRead::default()).await {
96+
Ok(v) => Ok(v),
97+
Err(_) => return inner.read(&path, args).await,
98+
}
99+
}
100+
}
101+
Err(_) => return inner.read(&path, args).await,
102+
}
103+
})
104+
}
105+
}

src/common/storage/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,6 @@ pub use metrics::StorageMetricsLayer;
6565

6666
mod runtime_layer;
6767
mod utils;
68+
69+
mod cache;
70+
pub use cache::FuseCachePolicy;

src/common/storage/src/operator.rs

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,30 @@ use crate::StorageRedisConfig;
6060

6161
/// init_operator will init an opendal operator based on storage config.
6262
pub fn init_operator(cfg: &StorageParams) -> Result<Operator> {
63+
let op = init_operator_without_layers(cfg)?;
64+
65+
let op = op
66+
// Add retry
67+
.layer(RetryLayer::new(ExponentialBackoff::default().with_jitter()))
68+
// Add metrics
69+
.layer(MetricsLayer)
70+
// Add logging
71+
.layer(LoggingLayer::default())
72+
// Add tracing
73+
.layer(TracingLayer)
74+
// NOTE
75+
//
76+
// Magic happens here. We will add a layer upon original
77+
// storage operator so that all underlying storage operations
78+
// will send to storage runtime.
79+
.layer(RuntimeLayer::new(GlobalIORuntime::instance().inner()));
80+
81+
Ok(op)
82+
}
83+
84+
/// init_operator_without_layers will init an opendal operator based
85+
/// on storage config with any layers.
86+
pub fn init_operator_without_layers(cfg: &StorageParams) -> Result<Operator> {
6387
let op = match &cfg {
6488
StorageParams::Azblob(cfg) => init_azblob_operator(cfg)?,
6589
StorageParams::Fs(cfg) => init_fs_operator(cfg)?,
@@ -83,22 +107,6 @@ pub fn init_operator(cfg: &StorageParams) -> Result<Operator> {
83107
}
84108
};
85109

86-
let op = op
87-
// Add retry
88-
.layer(RetryLayer::new(ExponentialBackoff::default().with_jitter()))
89-
// Add metrics
90-
.layer(MetricsLayer)
91-
// Add logging
92-
.layer(LoggingLayer)
93-
// Add tracing
94-
.layer(TracingLayer)
95-
// NOTE
96-
//
97-
// Magic happens here. We will add a layer upon original
98-
// storage operator so that all underlying storage operations
99-
// will send to storage runtime.
100-
.layer(RuntimeLayer::new(GlobalIORuntime::instance().inner()));
101-
102110
Ok(op)
103111
}
104112

@@ -442,7 +450,25 @@ impl CacheOperator {
442450
return Ok(CacheOperator { op: None });
443451
}
444452

445-
let operator = init_operator(&conf.params)?;
453+
let operator = init_operator_without_layers(&conf.params)?
454+
// Add retry
455+
.layer(RetryLayer::new(ExponentialBackoff::default().with_jitter()))
456+
// Add metrics
457+
.layer(MetricsLayer)
458+
// Add logging
459+
.layer(
460+
LoggingLayer::default()
461+
// Ingore expected errors for logging.
462+
.with_error_level(None),
463+
)
464+
// Add tracing
465+
.layer(TracingLayer)
466+
// NOTE
467+
//
468+
// Magic happens here. We will add a layer upon original
469+
// storage operator so that all underlying storage operations
470+
// will send to storage runtime.
471+
.layer(RuntimeLayer::new(GlobalIORuntime::instance().inner()));
446472

447473
// OpenDAL will send a real request to underlying storage to check whether it works or not.
448474
// If this check failed, it's highly possible that the users have configured it wrongly.

src/query/pipeline/sources/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ crossbeam-channel = "0.5.6"
3131
csv-core = "0.1.10"
3232
futures = "0.3.24"
3333
futures-util = "0.3.24"
34-
opendal = { version = "0.21", features = ["compress"] }
34+
opendal = { version = "0.22", features = ["compress"] }
3535
parking_lot = "0.12.1"
3636
serde_json = { workspace = true }
3737
similar-asserts = "1.4.2"

src/query/service/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ lz4 = "1.24.0"
102102
metrics = "0.20.1"
103103
naive-cityhash = "0.2.0"
104104
once_cell = "1.15.0"
105-
opendal = { version = "0.21", features = ["layers-tracing", "layers-metrics", "compress"] }
105+
opendal = { version = "0.22", features = ["layers-tracing", "layers-metrics", "compress"] }
106106
opensrv-mysql = "0.2.0"
107107
openssl = { version = "0.10.41", features = ["vendored"] }
108108
parking_lot = "0.12.1"

src/query/service/tests/it/sessions/query_ctx.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use wiremock::ResponseTemplate;
2727
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
2828
async fn test_get_storage_accessor_s3() -> Result<()> {
2929
let mock_server = MockServer::start().await;
30-
Mock::given(method("HEAD"))
31-
.and(path("/bucket/.opendal"))
30+
Mock::given(method("GET"))
31+
.and(path("/bucket"))
3232
.respond_with(ResponseTemplate::new(404))
3333
.mount(&mock_server)
3434
.await;
@@ -39,6 +39,7 @@ async fn test_get_storage_accessor_s3() -> Result<()> {
3939
region: "us-east-2".to_string(),
4040
endpoint_url: mock_server.uri(),
4141
bucket: "bucket".to_string(),
42+
disable_credential_loader: true,
4243
..Default::default()
4344
});
4445

src/query/sharing/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ common-auth = { path = "../../common/auth" }
1919
http = "0.2"
2020
log = "0.4"
2121
moka = "0.9"
22-
opendal = "0.21"
22+
opendal = "0.22"
2323
reqwest = "0.11"
2424
serde = { workspace = true }
2525
serde_json = { workspace = true }

0 commit comments

Comments
 (0)