Skip to content

Commit 7b51ed1

Browse files
authored
feat(query): iceberg data cache (#17787)
* feat(query): upgrade iceberg * feat(query): upgrade iceberg * fix * fix * fix * chore(query): update * chore(query): update * chore(query): update * overflow check opt * overflow optimize * update * update * update * update * update * differ schema * differ schema * differ schema * differ schema * differ schema * add small files partition * add small files partition * add small files partition * use root location range as key cache * update
1 parent 6add381 commit 7b51ed1

File tree

48 files changed

+1156
-1149
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1156
-1149
lines changed

Cargo.lock

Lines changed: 294 additions & 151 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -337,12 +337,15 @@ http = "1"
337337
humantime = "2.1.0"
338338
hyper = "1"
339339
hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] }
340-
iceberg = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "864a8b05f", features = [
340+
341+
## in branch dev
342+
iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "cce89a3", features = [
341343
"storage-all",
342344
] }
343-
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "864a8b05f" }
344-
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "864a8b05f" }
345-
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "864a8b05f" }
345+
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "cce89a3" }
346+
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "cce89a3" }
347+
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "cce89a3" }
348+
346349
indexmap = "2.0.0"
347350
indicatif = "0.17.5"
348351
itertools = "0.13.0"
@@ -450,7 +453,7 @@ reqwest = { version = "0.12", default-features = false, features = [
450453
reqwest-hickory-resolver = "0.2"
451454
ringbuffer = "0.14.2"
452455
rmp-serde = "1.1.1"
453-
roaring = { version = "0.10.1", features = ["serde"] }
456+
roaring = { version = "^0.10", features = ["serde"] }
454457
rotbl = { version = "0.1.2", features = [] }
455458
rust_decimal = "1.26"
456459
rustix = "0.38.37"

src/common/base/src/rangemap/range_merger.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,15 @@ pub struct RangeMerger {
4646
}
4747

4848
impl RangeMerger {
49-
pub fn from_iter<I>(iter: I, max_gap_size: u64, max_range_size: u64) -> Self
50-
where I: IntoIterator<Item = Range<u64>> {
49+
pub fn from_iter<I>(
50+
iter: I,
51+
max_gap_size: u64,
52+
max_range_size: u64,
53+
whole_read_size: Option<u64>,
54+
) -> Self
55+
where
56+
I: IntoIterator<Item = Range<u64>>,
57+
{
5158
let mut raw_ranges: Vec<_> = iter.into_iter().collect();
5259
raw_ranges.sort_by(|a, b| a.start.cmp(&b.start));
5360

@@ -57,6 +64,18 @@ impl RangeMerger {
5764
ranges: Vec::with_capacity(raw_ranges.len()),
5865
};
5966

67+
if let Some(whole_read_size) = whole_read_size {
68+
if !raw_ranges.is_empty() {
69+
let max_end = raw_ranges.iter().map(|r| r.end).max().unwrap_or(0);
70+
71+
if max_end - raw_ranges[0].start <= whole_read_size {
72+
let r = raw_ranges.first().unwrap().start..max_end;
73+
rs.ranges = vec![r];
74+
return rs;
75+
}
76+
}
77+
}
78+
6079
for range in &raw_ranges {
6180
rs.add(range);
6281
}

src/common/base/tests/it/range_merger.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl fmt::Display for Array {
3232
fn test_range_merger() -> Result<()> {
3333
let v = [3..6, 1..5, 7..11, 8..9, 9..12, 4..8, 13..15, 18..20];
3434

35-
let mr = RangeMerger::from_iter(v, 0, 100);
35+
let mr = RangeMerger::from_iter(v, 0, 100, None);
3636
let actual = format!("{}", Array(mr.ranges()));
3737
let expect = "[1,12] [13,15] [18,20] ";
3838
assert_eq!(actual, expect);
@@ -47,7 +47,7 @@ fn test_range_merger_with_gap() -> Result<()> {
4747

4848
// max_gap_size = 1
4949
{
50-
let mr = RangeMerger::from_iter(v.clone(), 1, 100);
50+
let mr = RangeMerger::from_iter(v.clone(), 1, 100, None);
5151
let actual = format!("{}", Array(mr.ranges()));
5252
let expect = "[1,15] [18,20] ";
5353
assert_eq!(actual, expect);
@@ -65,7 +65,7 @@ fn test_range_merger_with_gap() -> Result<()> {
6565

6666
// max_gap_size = 2
6767
{
68-
let mr = RangeMerger::from_iter(v.clone(), 2, 100);
68+
let mr = RangeMerger::from_iter(v.clone(), 2, 100, None);
6969
let actual = format!("{}", Array(mr.ranges()));
7070
let expect = "[1,15] [18,20] ";
7171
assert_eq!(actual, expect);
@@ -83,7 +83,7 @@ fn test_range_merger_with_gap() -> Result<()> {
8383

8484
// max_gap_size = 3
8585
{
86-
let mr = RangeMerger::from_iter(v.clone(), 3, 100);
86+
let mr = RangeMerger::from_iter(v.clone(), 3, 100, None);
8787
let actual = format!("{}", Array(mr.ranges()));
8888
let expect = "[1,20] ";
8989
assert_eq!(actual, expect);
@@ -101,7 +101,7 @@ fn test_range_merger_with_gap() -> Result<()> {
101101

102102
// max_gap_size = 3, max_range_size = 5
103103
{
104-
let mr = RangeMerger::from_iter(v.clone(), 3, 4);
104+
let mr = RangeMerger::from_iter(v.clone(), 3, 4, None);
105105
let actual = format!("{}", Array(mr.ranges()));
106106
let expect = "[1,5] [3,8] [7,11] [8,12] [13,20] ";
107107
assert_eq!(actual, expect);

src/common/storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ databend-common-native = { workspace = true }
2626
databend-enterprise-storage-encryption = { workspace = true }
2727
futures = { workspace = true }
2828
http = { workspace = true }
29+
iceberg = { workspace = true }
2930
log = { workspace = true }
3031
opendal = { workspace = true }
3132
parquet = { workspace = true }

src/common/storage/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub use operator::build_operator;
4343
pub use operator::check_operator;
4444
pub use operator::init_operator;
4545
pub use operator::DataOperator;
46+
pub use operator::OperatorRegistry;
4647

4748
pub mod metrics;
4849
pub use crate::metrics::StorageMetrics;

src/common/storage/src/operator.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,3 +535,30 @@ pub async fn check_operator(
535535
))
536536
})
537537
}
538+
539+
pub trait OperatorRegistry: Send + Sync {
540+
fn get_operator_path<'a>(&self, _location: &'a str) -> Result<(Operator, &'a str)>;
541+
}
542+
543+
impl OperatorRegistry for Operator {
544+
fn get_operator_path<'a>(&self, location: &'a str) -> Result<(Operator, &'a str)> {
545+
Ok((self.clone(), location))
546+
}
547+
}
548+
549+
impl OperatorRegistry for DataOperator {
550+
fn get_operator_path<'a>(&self, location: &'a str) -> Result<(Operator, &'a str)> {
551+
Ok((self.operator.clone(), location))
552+
}
553+
}
554+
555+
impl OperatorRegistry for iceberg::io::FileIO {
556+
fn get_operator_path<'a>(&self, location: &'a str) -> Result<(Operator, &'a str)> {
557+
let file_io = self
558+
.new_input(location)
559+
.map_err(|err| std::io::Error::new(ErrorKind::Unsupported, err.message()))?;
560+
561+
let pos = file_io.relative_path_pos();
562+
Ok((file_io.get_operator().clone(), &location[pos..]))
563+
}
564+
}

src/common/storage/src/stage.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,20 @@ impl StageFileInfo {
6666
}
6767
}
6868

69-
pub fn dedup_key(&self) -> Option<String> {
69+
pub fn dedup_key(&self) -> String {
7070
// should not use last_modified because the accuracy is in seconds for S3.
7171
if let Some(md5) = &self.md5 {
72-
Some(md5.clone())
72+
md5.clone()
7373
} else {
74-
self.etag.clone()
74+
let last_modified = self
75+
.last_modified
76+
.as_ref()
77+
.map(|x| x.to_string())
78+
.unwrap_or_default();
79+
80+
self.etag
81+
.clone()
82+
.unwrap_or(format!("{}/{last_modified}/{}", self.path, self.size))
7583
}
7684
}
7785
}

src/query/catalog/src/catalog/manager.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use databend_common_meta_store::MetaStore;
3838
use databend_common_meta_store::MetaStoreProvider;
3939
use databend_common_meta_types::anyerror::func_name;
4040
use databend_storages_common_session::SessionState;
41+
use parking_lot::RwLock;
4142

4243
use super::Catalog;
4344
use super::CatalogCreator;
@@ -54,6 +55,7 @@ pub struct CatalogManager {
5455

5556
/// catalog_creators is the catalog creators that registered.
5657
pub catalog_creators: HashMap<CatalogType, Arc<dyn CatalogCreator>>,
58+
pub catalog_caches: RwLock<HashMap<String, Arc<dyn Catalog>>>,
5759
}
5860

5961
impl CatalogManager {
@@ -121,6 +123,7 @@ impl CatalogManager {
121123
default_catalog,
122124
external_catalogs,
123125
catalog_creators,
126+
catalog_caches: Default::default(),
124127
};
125128

126129
Ok(Arc::new(catalog_manager))
@@ -141,16 +144,33 @@ impl CatalogManager {
141144
session_state: SessionState,
142145
) -> Result<Arc<dyn Catalog>> {
143146
let typ = info.meta.catalog_option.catalog_type();
144-
145147
if typ == CatalogType::Default {
146148
return self.get_default_catalog(session_state);
147149
}
148150

151+
let tid = std::thread::current().id();
152+
let key = format!(
153+
"{:?}_{}_{:?}",
154+
info.catalog_name(),
155+
info.meta.created_on.timestamp(),
156+
tid
157+
);
158+
159+
{
160+
let r = self.catalog_caches.read();
161+
if let Some(v) = r.get(&key) {
162+
return Ok(v.clone());
163+
}
164+
}
149165
let creator = self
150166
.catalog_creators
151167
.get(&typ)
152168
.ok_or_else(|| ErrorCode::BadArguments(format!("unknown catalog type: {:?}", typ)))?;
153-
creator.try_create(info)
169+
170+
let v = creator.try_create(info)?;
171+
let mut w = self.catalog_caches.write();
172+
w.insert(key, v.clone());
173+
Ok(v)
154174
}
155175

156176
/// Get a catalog from manager.
@@ -174,10 +194,8 @@ impl CatalogManager {
174194
if let Some(ctl) = self.external_catalogs.get(catalog_name) {
175195
return Ok(ctl.clone());
176196
}
177-
178197
let tenant = Tenant::new_or_err(tenant, func_name!())?;
179198
let ident = CatalogNameIdent::new(tenant, catalog_name);
180-
181199
// Get catalog from metasrv.
182200
let info = self.meta.get_catalog(&ident).await?;
183201
self.build_catalog(info, session_state)

src/query/catalog/src/plan/datasource/datasource_info/parquet.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::io::Cursor;
1717
use std::sync::Arc;
1818

1919
use arrow_schema::Schema as ArrowSchema;
20-
use databend_common_base::base::tokio::sync::Mutex;
2120
use databend_common_expression::ColumnId;
2221
use databend_common_expression::TableField;
2322
use databend_common_expression::TableSchema;
@@ -73,10 +72,6 @@ pub struct ParquetTableInfo {
7372
pub compression_ratio: f64,
7473
pub leaf_fields: Arc<Vec<TableField>>,
7574

76-
// These fields are only used in coordinator node of the cluster,
77-
// so we don't need to serialize them.
78-
#[serde(skip)]
79-
pub parquet_metas: Arc<Mutex<Vec<Arc<FullParquetMeta>>>>,
8075
#[serde(skip)]
8176
pub need_stats_provider: bool,
8277
#[serde(skip)]
@@ -136,7 +131,6 @@ mod tests {
136131
use std::sync::Arc;
137132

138133
use arrow_schema::Schema as ArrowSchema;
139-
use databend_common_base::base::tokio::sync::Mutex;
140134
use databend_common_storage::StageFilesInfo;
141135
use parquet::basic::ConvertedType;
142136
use parquet::basic::Repetition;
@@ -214,7 +208,6 @@ mod tests {
214208
files_to_read: None,
215209
schema_from: "".to_string(),
216210
compression_ratio: 0.0,
217-
parquet_metas: Arc::new(Mutex::new(vec![])),
218211
need_stats_provider: false,
219212
max_threads: 1,
220213
max_memory_usage: 10000,

0 commit comments

Comments
 (0)