Skip to content

Commit c9bf3a4

Browse files
authored
Merge branch 'main' into fix-agg2
2 parents f1bf5b2 + 63c34e4 commit c9bf3a4

File tree

45 files changed

+616
-440
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+616
-440
lines changed

.github/auto_assign.yml

Lines changed: 0 additions & 17 deletions
This file was deleted.

Cargo.lock

Lines changed: 13 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/doc/14-sql-commands/20-query-syntax/dml-select.md

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ SELECT
1111
[ALL | DISTINCT]
1212
select_expr [[AS] alias], ...
1313
[INTO variable [, ...]]
14-
[ FROM table_references
14+
[EXCLUDE (col_name1 [, col_name2, col_name3, ...] ) ]
15+
[FROM table_references
1516
[AT ...]
1617
[WHERE expr]
1718
[GROUP BY {{col_name | expr | col_alias | col_position}, ...
@@ -22,6 +23,7 @@ SELECT
2223
[OFFSET row_count]
2324
[IGNORE_RESULT]
2425
]
26+
]
2527
```
2628

2729
:::tip
@@ -41,6 +43,47 @@ SELECT number FROM numbers(3);
4143
+--------+
4244
```
4345

46+
### EXCLUDE Parameter
47+
48+
Excludes one or more columns by their names from the result. The parameter is usually used in conjunction with `SELECT * ...` to exclude a few columns from the result instead of retrieving them all.
49+
50+
```sql
51+
SELECT * FROM allemployees ORDER BY id;
52+
53+
---
54+
| id | firstname | lastname | gender |
55+
|----|-----------|----------|--------|
56+
| 1 | Ryan | Tory | M |
57+
| 2 | Oliver | Green | M |
58+
| 3 | Noah | Shuster | M |
59+
| 4 | Lily | McMent | F |
60+
| 5 | Macy | Lee | F |
61+
62+
-- Exclude the column "id" from the result
63+
SELECT * EXCLUDE id FROM allemployees;
64+
65+
---
66+
| firstname | lastname | gender |
67+
|-----------|----------|--------|
68+
| Noah | Shuster | M |
69+
| Ryan | Tory | M |
70+
| Oliver | Green | M |
71+
| Lily | McMent | F |
72+
| Macy | Lee | F |
73+
74+
-- Exclude the columns "id" and "lastname" from the result
75+
SELECT * EXCLUDE (id,lastname) FROM allemployees;
76+
77+
---
78+
| firstname | gender |
79+
|-----------|--------|
80+
| Oliver | M |
81+
| Ryan | M |
82+
| Lily | F |
83+
| Noah | M |
84+
| Macy | F |
85+
```
86+
4487
## FROM Clause
4588

4689
```sql

docs/doc/14-sql-commands/40-show/show-settings.md

Lines changed: 36 additions & 36 deletions
Large diffs are not rendered by default.

docs/doc/14-sql-commands/80-setting-cmds/show-settings.md

Lines changed: 36 additions & 36 deletions
Large diffs are not rendered by default.

src/common/base/src/base/runtime_tracker.rs

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,9 @@ pub static GLOBAL_MEM_STAT: MemStat = MemStat::empty();
6565
#[thread_local]
6666
static mut TRACKER: ThreadTracker = ThreadTracker::empty();
6767

68-
/// Flag indicating an exceeding limit panic is happening and allocating memory by panic handler is allowed.
69-
///
70-
/// Flag will be reset when `panic!()` returns.
68+
/// Whether to allow unlimited memory. Alloc memory will not panic if it is true.
7169
#[thread_local]
72-
static PANICKING: AtomicBool = AtomicBool::new(false);
70+
static UNLIMITED_FLAG: AtomicBool = AtomicBool::new(false);
7371

7472
static MEM_STAT_BUFFER_SIZE: i64 = 4 * 1024 * 1024;
7573

@@ -85,24 +83,27 @@ impl<'a> Drop for Entered<'a> {
8583
}
8684
}
8785

88-
/// A guard that resets the `PANICKING` flag when dropped.
89-
pub(crate) struct Panicking;
86+
/// A guard that resets the `UNLIMITED_FLAG` flag when dropped.
87+
pub struct UnlimitedMemGuard {
88+
saved: bool,
89+
}
9090

91-
impl Panicking {
92-
#[must_use]
93-
pub(crate) fn enter_panicking() -> Self {
94-
PANICKING.store(true, Ordering::Relaxed);
95-
Self
91+
impl UnlimitedMemGuard {
92+
#[allow(unused)]
93+
pub(crate) fn enter_unlimited() -> Self {
94+
let saved = UNLIMITED_FLAG.load(Ordering::Relaxed);
95+
UNLIMITED_FLAG.store(true, Ordering::Relaxed);
96+
Self { saved }
9697
}
9798

98-
pub(crate) fn is_panicking() -> bool {
99-
PANICKING.load(Ordering::Relaxed)
99+
pub(crate) fn is_unlimited() -> bool {
100+
UNLIMITED_FLAG.load(Ordering::Relaxed)
100101
}
101102
}
102103

103-
impl Drop for Panicking {
104+
impl Drop for UnlimitedMemGuard {
104105
fn drop(&mut self) {
105-
PANICKING.store(false, Ordering::Relaxed);
106+
UNLIMITED_FLAG.store(self.saved, Ordering::Relaxed);
106107
}
107108
}
108109

@@ -205,16 +206,10 @@ impl ThreadTracker {
205206
let res = tracker.flush();
206207

207208
if let Err(out_of_limit) = res {
208-
// NOTE: `PANICKING` only allows allocation inside the following `panic!()`.
209-
// If a `Drop` is called when unwinding, the `Drop` may panic again if it allocates memory over the limit.
210-
if Panicking::is_panicking() {
211-
return;
209+
// https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=03d21a15e52c7c0356fca04ece283cf9
210+
if !std::thread::panicking() && !UnlimitedMemGuard::is_unlimited() {
211+
panic!("{:?}", out_of_limit);
212212
}
213-
214-
// Reset PANICKING when dropped.
215-
let _p = Panicking::enter_panicking();
216-
217-
panic!("{:?}", out_of_limit);
218213
}
219214
}
220215

@@ -420,7 +415,6 @@ impl<T: Future> Future for TrackedFuture<T> {
420415

421416
#[cfg(test)]
422417
mod tests {
423-
424418
mod async_thread_tracker {
425419
use std::future::Future;
426420
use std::pin::Pin;

src/common/exception/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ common-arrow = { path = "../arrow" }
1515

1616
anyhow = { workspace = true }
1717
bincode = { version = "2.0.0-rc.1", features = ["serde", "std", "alloc"] }
18-
opendal = "0.21"
18+
opendal = "0.22"
1919
paste = "1.0.9"
2020
prost = { workspace = true }
2121
serde = { workspace = true }

src/common/storage/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ common-exception = { path = "../exception" }
1717
anyhow = { workspace = true }
1818
async-trait = "0.1"
1919
backon = "0.2"
20+
bytes = "1"
21+
futures = "0.3"
2022
globiter = "0.1"
2123
once_cell = "1"
22-
opendal = { version = "0.21", features = [
24+
opendal = { version = "0.22", features = [
2325
"layers-tracing",
2426
"layers-metrics",
2527
"services-ipfs",

src/common/storage/src/cache.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use async_trait::async_trait;
18+
use bytes::Bytes;
19+
use futures::future::BoxFuture;
20+
use futures::io::Cursor;
21+
use futures::AsyncReadExt;
22+
use opendal::layers::CachePolicy;
23+
use opendal::raw::Accessor;
24+
use opendal::raw::BytesReader;
25+
use opendal::raw::RpRead;
26+
use opendal::Error;
27+
use opendal::ErrorKind;
28+
use opendal::OpRead;
29+
use opendal::OpWrite;
30+
use opendal::Result;
31+
32+
/// TODO: implement more complex cache logic.
33+
///
34+
/// For example:
35+
///
36+
/// - Implement a top n heap, and only cache files exist in heap.
37+
/// - Only cache data file, and ignore snapshot files.
38+
#[derive(Debug, Default)]
39+
pub struct FuseCachePolicy {}
40+
41+
impl FuseCachePolicy {
42+
pub fn new() -> Self {
43+
FuseCachePolicy::default()
44+
}
45+
46+
fn cache_path(&self, path: &str, args: &OpRead) -> String {
47+
format!("{path}.cache-{}", args.range().to_header())
48+
}
49+
}
50+
51+
#[async_trait]
52+
impl CachePolicy for FuseCachePolicy {
53+
fn on_read(
54+
&self,
55+
inner: Arc<dyn Accessor>,
56+
cache: Arc<dyn Accessor>,
57+
path: &str,
58+
args: OpRead,
59+
) -> BoxFuture<'static, Result<(RpRead, BytesReader)>> {
60+
let path = path.to_string();
61+
let cache_path = self.cache_path(&path, &args);
62+
Box::pin(async move {
63+
match cache.read(&cache_path, OpRead::default()).await {
64+
Ok(v) => Ok(v),
65+
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
66+
let (rp, mut r) = inner.read(&path, args.clone()).await?;
67+
68+
let size = rp.clone().into_metadata().content_length();
69+
// If size < 8MiB, we can optimize by buffer in memory.
70+
// TODO: make this configurable.
71+
if size <= 8 * 1024 * 1024 {
72+
let mut bs = Vec::with_capacity(size as usize);
73+
r.read_to_end(&mut bs).await.map_err(|err| {
74+
Error::new(
75+
ErrorKind::Unexpected,
76+
"read from underlying storage service",
77+
)
78+
.set_source(err)
79+
})?;
80+
let bs = Bytes::from(bs);
81+
82+
// Ignore errors returned by cache services.
83+
let _ = cache
84+
.write(
85+
&cache_path,
86+
OpWrite::new(size),
87+
Box::new(Cursor::new(bs.clone())),
88+
)
89+
.await;
90+
Ok((rp, Box::new(Cursor::new(bs)) as BytesReader))
91+
} else {
92+
// Ignore errors returned by cache services.
93+
let _ = cache.write(&cache_path, OpWrite::new(size), r).await;
94+
95+
match cache.read(&cache_path, OpRead::default()).await {
96+
Ok(v) => Ok(v),
97+
Err(_) => return inner.read(&path, args).await,
98+
}
99+
}
100+
}
101+
Err(_) => return inner.read(&path, args).await,
102+
}
103+
})
104+
}
105+
}

src/common/storage/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,6 @@ pub use metrics::StorageMetricsLayer;
6565

6666
mod runtime_layer;
6767
mod utils;
68+
69+
mod cache;
70+
pub use cache::FuseCachePolicy;

0 commit comments

Comments
 (0)