Skip to content

Commit 4303638

Browse files
authored
Merge branch 'main' into null2
2 parents e04079d + d72d0f2 commit 4303638

File tree

81 files changed

+832
-406
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+832
-406
lines changed

Cargo.lock

Lines changed: 24 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/base/src/base/runtime_tracker.rs

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,9 @@ pub static GLOBAL_MEM_STAT: MemStat = MemStat::empty();
6565
#[thread_local]
6666
static mut TRACKER: ThreadTracker = ThreadTracker::empty();
6767

68-
/// Flag indicating an exceeding limit panic is happening and allocating memory by panic handler is allowed.
69-
///
70-
/// Flag will be reset when `panic!()` returns.
68+
/// Whether to allow unlimited memory. Alloc memory will not panic if it is true.
7169
#[thread_local]
72-
static PANICKING: AtomicBool = AtomicBool::new(false);
70+
static UNLIMITED_FLAG: AtomicBool = AtomicBool::new(false);
7371

7472
static MEM_STAT_BUFFER_SIZE: i64 = 4 * 1024 * 1024;
7573

@@ -85,24 +83,27 @@ impl<'a> Drop for Entered<'a> {
8583
}
8684
}
8785

88-
/// A guard that resets the `PANICKING` flag when dropped.
89-
pub(crate) struct Panicking;
86+
/// A guard that resets the `UNLIMITED_FLAG` flag when dropped.
87+
pub struct UnlimitedMemGuard {
88+
saved: bool,
89+
}
9090

91-
impl Panicking {
92-
#[must_use]
93-
pub(crate) fn enter_panicking() -> Self {
94-
PANICKING.store(true, Ordering::Relaxed);
95-
Self
91+
impl UnlimitedMemGuard {
92+
#[allow(unused)]
93+
pub(crate) fn enter_unlimited() -> Self {
94+
let saved = UNLIMITED_FLAG.load(Ordering::Relaxed);
95+
UNLIMITED_FLAG.store(true, Ordering::Relaxed);
96+
Self { saved }
9697
}
9798

98-
pub(crate) fn is_panicking() -> bool {
99-
PANICKING.load(Ordering::Relaxed)
99+
pub(crate) fn is_unlimited() -> bool {
100+
UNLIMITED_FLAG.load(Ordering::Relaxed)
100101
}
101102
}
102103

103-
impl Drop for Panicking {
104+
impl Drop for UnlimitedMemGuard {
104105
fn drop(&mut self) {
105-
PANICKING.store(false, Ordering::Relaxed);
106+
UNLIMITED_FLAG.store(self.saved, Ordering::Relaxed);
106107
}
107108
}
108109

@@ -205,16 +206,10 @@ impl ThreadTracker {
205206
let res = tracker.flush();
206207

207208
if let Err(out_of_limit) = res {
208-
// NOTE: `PANICKING` only allows allocation inside the following `panic!()`.
209-
// If a `Drop` is called when unwinding, the `Drop` may panic again if it allocates memory over the limit.
210-
if Panicking::is_panicking() {
211-
return;
209+
// https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=03d21a15e52c7c0356fca04ece283cf9
210+
if !std::thread::panicking() && !UnlimitedMemGuard::is_unlimited() {
211+
panic!("{:?}", out_of_limit);
212212
}
213-
214-
// Reset PANICKING when dropped.
215-
let _p = Panicking::enter_panicking();
216-
217-
panic!("{:?}", out_of_limit);
218213
}
219214
}
220215

@@ -420,7 +415,6 @@ impl<T: Future> Future for TrackedFuture<T> {
420415

421416
#[cfg(test)]
422417
mod tests {
423-
424418
mod async_thread_tracker {
425419
use std::future::Future;
426420
use std::pin::Pin;

src/common/exception/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ common-arrow = { path = "../arrow" }
1515

1616
anyhow = { workspace = true }
1717
bincode = { version = "2.0.0-rc.1", features = ["serde", "std", "alloc"] }
18-
opendal = "0.21"
18+
opendal = "0.22"
1919
paste = "1.0.9"
2020
prost = { workspace = true }
2121
serde = { workspace = true }

src/common/io/src/buffer/buffer_read_datetime_ext.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ where R: BufferRead
4747
{
4848
fn read_date_text(&mut self, tz: &Tz) -> Result<NaiveDate> {
4949
// TODO support YYYYMMDD format
50-
self.read_timestamp_text(tz).map(|dt| dt.naive_utc().date())
50+
self.read_timestamp_text(tz)
51+
.map(|dt| dt.naive_local().date())
5152
}
5253

5354
fn read_timestamp_text(&mut self, tz: &Tz) -> Result<DateTime<Tz>> {

src/common/io/src/cursor_ext/cursor_read_datetime_ext.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ where T: AsRef<[u8]>
7575
{
7676
fn read_date_text(&mut self, tz: &Tz) -> Result<NaiveDate> {
7777
// TODO support YYYYMMDD format
78-
self.read_timestamp_text(tz).map(|dt| dt.naive_utc().date())
78+
self.read_timestamp_text(tz)
79+
.map(|dt| dt.naive_local().date())
7980
}
8081

8182
fn read_timestamp_text(&mut self, tz: &Tz) -> Result<DateTime<Tz>> {

src/common/storage/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ common-exception = { path = "../exception" }
1717
anyhow = { workspace = true }
1818
async-trait = "0.1"
1919
backon = "0.2"
20+
bytes = "1"
21+
futures = "0.3"
2022
globiter = "0.1"
2123
once_cell = "1"
22-
opendal = { version = "0.21", features = [
24+
opendal = { version = "0.22", features = [
2325
"layers-tracing",
2426
"layers-metrics",
2527
"services-ipfs",

src/common/storage/src/cache.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use async_trait::async_trait;
18+
use bytes::Bytes;
19+
use futures::future::BoxFuture;
20+
use futures::io::Cursor;
21+
use futures::AsyncReadExt;
22+
use opendal::layers::CachePolicy;
23+
use opendal::raw::Accessor;
24+
use opendal::raw::BytesReader;
25+
use opendal::raw::RpRead;
26+
use opendal::Error;
27+
use opendal::ErrorKind;
28+
use opendal::OpRead;
29+
use opendal::OpWrite;
30+
use opendal::Result;
31+
32+
/// TODO: implement more complex cache logic.
33+
///
34+
/// For example:
35+
///
36+
/// - Implement a top n heap, and only cache files exist in heap.
37+
/// - Only cache data file, and ignore snapshot files.
38+
#[derive(Debug, Default)]
39+
pub struct FuseCachePolicy {}
40+
41+
impl FuseCachePolicy {
42+
pub fn new() -> Self {
43+
FuseCachePolicy::default()
44+
}
45+
46+
fn cache_path(&self, path: &str, args: &OpRead) -> String {
47+
format!("{path}.cache-{}", args.range().to_header())
48+
}
49+
}
50+
51+
#[async_trait]
52+
impl CachePolicy for FuseCachePolicy {
53+
fn on_read(
54+
&self,
55+
inner: Arc<dyn Accessor>,
56+
cache: Arc<dyn Accessor>,
57+
path: &str,
58+
args: OpRead,
59+
) -> BoxFuture<'static, Result<(RpRead, BytesReader)>> {
60+
let path = path.to_string();
61+
let cache_path = self.cache_path(&path, &args);
62+
Box::pin(async move {
63+
match cache.read(&cache_path, OpRead::default()).await {
64+
Ok(v) => Ok(v),
65+
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
66+
let (rp, mut r) = inner.read(&path, args.clone()).await?;
67+
68+
let size = rp.clone().into_metadata().content_length();
69+
// If size < 8MiB, we can optimize by buffer in memory.
70+
// TODO: make this configurable.
71+
if size <= 8 * 1024 * 1024 {
72+
let mut bs = Vec::with_capacity(size as usize);
73+
r.read_to_end(&mut bs).await.map_err(|err| {
74+
Error::new(
75+
ErrorKind::Unexpected,
76+
"read from underlying storage service",
77+
)
78+
.set_source(err)
79+
})?;
80+
let bs = Bytes::from(bs);
81+
82+
// Ignore errors returned by cache services.
83+
let _ = cache
84+
.write(
85+
&cache_path,
86+
OpWrite::new(size),
87+
Box::new(Cursor::new(bs.clone())),
88+
)
89+
.await;
90+
Ok((rp, Box::new(Cursor::new(bs)) as BytesReader))
91+
} else {
92+
// Ignore errors returned by cache services.
93+
let _ = cache.write(&cache_path, OpWrite::new(size), r).await;
94+
95+
match cache.read(&cache_path, OpRead::default()).await {
96+
Ok(v) => Ok(v),
97+
Err(_) => return inner.read(&path, args).await,
98+
}
99+
}
100+
}
101+
Err(_) => return inner.read(&path, args).await,
102+
}
103+
})
104+
}
105+
}

src/common/storage/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,6 @@ pub use metrics::StorageMetricsLayer;
6565

6666
mod runtime_layer;
6767
mod utils;
68+
69+
mod cache;
70+
pub use cache::FuseCachePolicy;

0 commit comments

Comments
 (0)