Skip to content

Commit 44d9181

Browse files
authored
refactor(meta-service): upgrade snapshot storage(rotbl) (#18326)
* refactor(meta-service): upgrade snapshot storage(rotbl) # `rotbl` crate Changelog ## Version 0.2.1 **What's Changed for Users:** - **Storage is now pluggable** - You'll need to provide a storage instance when creating or opening Rotbl files. This change makes it possible to use different storage backends beyond just the filesystem - **Codec functionality moved** - The encoding/decoding features have been moved to a separate `codeq` library, which means better code sharing across projects - **Better async support** - Fixed thread safety issues so Rotbl works more reliably in async applications - **Improved backwards compatibility** - Added extensive testing to ensure your existing data files will always work with newer versions **New Capabilities:** - **Flexible storage backends** - The new Storage trait means we can add support for cloud storage, databases, or other backends in the future - **Better testing infrastructure** - Completely rebuilt our test system to work across different storage types - **Runtime configuration** - Config values are now calculated when you need them rather than upfront, making the library more efficient **What We Removed:** - Some internal APIs that weren't being used - Redundant configuration methods that were causing confusion * chore: fix snapshot install bug
1 parent 7f10d1c commit 44d9181

17 files changed

+185
-102
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ reqwest-hickory-resolver = "0.2"
464464
ringbuffer = "0.14.2"
465465
rmp-serde = "1.1.1"
466466
roaring = { version = "^0.10", features = ["serde"] }
467-
rotbl = { version = "0.1.2", features = [] }
467+
rotbl = { version = "0.2.1", features = [] }
468468
rust_decimal = "1.26"
469469
rustix = "0.38.37"
470470
rustls = { version = "0.23.27", features = ["ring", "tls12"], default-features = false }

src/meta/raft-store/src/leveled_store/db_builder.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use databend_common_meta_types::snapshot_db::DB;
2222
use databend_common_meta_types::sys_data::SysData;
2323
use futures::Stream;
2424
use futures_util::TryStreamExt;
25+
use rotbl::storage::impls::fs::FsStorage;
2526
use rotbl::v001::Rotbl;
2627
use rotbl::v001::RotblMeta;
2728
use rotbl::v001::SeqMarked;
@@ -30,40 +31,39 @@ use crate::leveled_store::leveled_map::LeveledMap;
3031

3132
/// Builds a snapshot from series of key-value in `(String, SeqMarked)`
3233
pub(crate) struct DBBuilder {
33-
path: String,
34-
rotbl_builder: rotbl::v001::Builder,
34+
storage_path: String,
35+
rel_path: String,
36+
rotbl_builder: rotbl::v001::Builder<FsStorage>,
3537
}
3638

3739
impl DBBuilder {
3840
pub fn new<P: AsRef<Path>>(
39-
path: P,
41+
storage_path: P,
42+
rel_path: &str,
4043
rotbl_config: rotbl::v001::Config,
4144
) -> Result<Self, io::Error> {
42-
let p = path.as_ref().to_str().ok_or_else(|| {
45+
let storage = FsStorage::new(storage_path.as_ref().to_path_buf());
46+
47+
let base_path = storage_path.as_ref().to_str().ok_or_else(|| {
4348
io::Error::new(
4449
io::ErrorKind::InvalidInput,
45-
format!("invalid path: {:?}", path.as_ref()),
50+
format!("invalid path: {:?}", storage_path.as_ref()),
4651
)
4752
})?;
4853

49-
let inner = rotbl::v001::Builder::new(rotbl_config, path.as_ref())?;
54+
let rel_path = rel_path.to_string();
55+
56+
let inner = rotbl::v001::Builder::new(storage, rotbl_config, &rel_path)?;
5057

5158
let b = Self {
52-
path: p.to_string(),
59+
storage_path: base_path.to_string(),
60+
rel_path,
5361
rotbl_builder: inner,
5462
};
5563

5664
Ok(b)
5765
}
5866

59-
/// This method is only used for test.
60-
#[allow(dead_code)]
61-
pub fn new_with_default_config<P: AsRef<Path>>(path: P) -> Result<Self, io::Error> {
62-
let mut config = rotbl::v001::Config::default();
63-
config.fill_default_values();
64-
Self::new(path, config)
65-
}
66-
6767
/// Append a key-value pair to the builder, the keys must be sorted.
6868
pub fn append_kv(&mut self, k: String, v: SeqMarked) -> Result<(), io::Error> {
6969
self.rotbl_builder.append_kv(k, v)
@@ -82,8 +82,11 @@ impl DBBuilder {
8282
Ok(())
8383
}
8484

85-
/// Flush the data to disk and return a read-only table [`Rotbl`] instance.
86-
pub fn flush(self, sys_data: SysData) -> Result<(String, Rotbl), io::Error> {
85+
/// Flush the data to disk and return:
86+
/// - storage path,
87+
/// - relative path of the db,
88+
/// - and a read-only table [`Rotbl`] instance.
89+
pub fn flush(self, sys_data: SysData) -> Result<(String, String, Rotbl), io::Error> {
8790
let meta = serde_json::to_string(&sys_data).map_err(|e| {
8891
io::Error::new(
8992
io::ErrorKind::InvalidData,
@@ -95,7 +98,7 @@ impl DBBuilder {
9598
let rotbl_meta = RotblMeta::new(0, meta);
9699

97100
let r = self.rotbl_builder.commit(rotbl_meta)?;
98-
Ok((self.path, r))
101+
Ok((self.storage_path, self.rel_path, r))
99102
}
100103

101104
/// Build a [`DB`] from a leveled map.
@@ -116,8 +119,8 @@ impl DBBuilder {
116119
self.append_kv_stream(strm).await?;
117120

118121
let snapshot_id = make_snapshot_id(&sys_data);
119-
let (path, r) = self.flush(sys_data)?;
120-
let db = DB::new(path, snapshot_id, Arc::new(r))?;
122+
let (storage_path, rel_path, r) = self.flush(sys_data)?;
123+
let db = DB::new(storage_path, rel_path, snapshot_id, Arc::new(r))?;
121124

122125
Ok(db)
123126
}

src/meta/raft-store/src/leveled_store/db_map_api_ro_test.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,8 @@ async fn test_db_map_api_ro() -> anyhow::Result<()> {
5757

5858
let temp_dir = tempfile::tempdir()?;
5959
let path = temp_dir.path();
60-
let path = path.join("temp-db");
6160

62-
let db_builder = DBBuilder::new_with_default_config(path)?;
61+
let db_builder = DBBuilder::new(path, "temp-db", rotbl::v001::Config::default())?;
6362
db_builder
6463
.build_from_leveled_map(lm, |_| "1-1-1-1".to_string())
6564
.await?

src/meta/raft-store/src/leveled_store/db_open_snapshot_impl.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,36 @@
1313
// limitations under the License.
1414

1515
use std::io;
16+
use std::path::PathBuf;
1617
use std::sync::Arc;
1718

1819
use databend_common_meta_types::snapshot_db::DB;
1920
use log::info;
2021
use openraft::SnapshotId;
22+
use rotbl::storage::impls::fs::FsStorage;
2123
use rotbl::v001::Rotbl;
2224

2325
use crate::config::RaftConfig;
2426
use crate::sm_v003::open_snapshot::OpenSnapshot;
2527

2628
impl OpenSnapshot for DB {
2729
fn open_snapshot(
28-
path: impl ToString,
30+
storage_path: impl ToString,
31+
rel_path: impl ToString,
2932
snapshot_id: SnapshotId,
3033
raft_config: &RaftConfig,
3134
) -> Result<Self, io::Error> {
35+
let storage_path = storage_path.to_string();
36+
let rel_path = rel_path.to_string();
37+
38+
let storage = FsStorage::new(PathBuf::from(&storage_path));
39+
3240
let config = raft_config.to_rotbl_config();
33-
let r = Rotbl::open(config, path.to_string())?;
41+
let r = Rotbl::open(storage, config, &rel_path.to_string())?;
3442

35-
info!("Opened snapshot at {}", path.to_string());
43+
info!("Opened snapshot at {storage_path}/{rel_path}");
3644

37-
let db = Self::new(path, snapshot_id, Arc::new(r))?;
45+
let db = Self::new(storage_path, rel_path, snapshot_id, Arc::new(r))?;
3846
Ok(db)
3947
}
4048
}

src/meta/raft-store/src/sm_v003/compact_with_db_test.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,12 @@ async fn test_compact() -> anyhow::Result<()> {
156156

157157
let temp_dir = tempfile::tempdir()?;
158158
let path = temp_dir.path();
159-
let path = path.join("temp-compacted");
160-
compact(&mut lm, path.as_os_str().to_str().unwrap()).await?;
159+
compact(
160+
&mut lm,
161+
path.as_os_str().to_str().unwrap(),
162+
"temp-compacted",
163+
)
164+
.await?;
161165

162166
let db = lm.persisted().unwrap();
163167

@@ -208,8 +212,12 @@ async fn test_compact_expire_index() -> anyhow::Result<()> {
208212

209213
let temp_dir = tempfile::tempdir()?;
210214
let path = temp_dir.path();
211-
let path = path.join("temp-compacted");
212-
compact(&mut lm, path.as_os_str().to_str().unwrap()).await?;
215+
compact(
216+
&mut lm,
217+
path.as_os_str().to_str().unwrap(),
218+
"temp-compacted",
219+
)
220+
.await?;
213221

214222
let db = lm.persisted().unwrap();
215223

@@ -359,8 +367,7 @@ async fn build_3_levels() -> anyhow::Result<(LeveledMap, impl Drop)> {
359367
// Move the bottom level to db
360368
let temp_dir = tempfile::tempdir()?;
361369
let path = temp_dir.path();
362-
let path = path.join("temp-db");
363-
move_bottom_to_db(&mut lm, path.to_str().unwrap()).await?;
370+
move_bottom_to_db(&mut lm, path.to_str().unwrap(), "temp-db").await?;
364371

365372
Ok((lm, temp_dir))
366373
}
@@ -393,13 +400,16 @@ async fn build_sm_with_expire() -> anyhow::Result<(SMV003, impl Drop)> {
393400

394401
let temp_dir = tempfile::tempdir()?;
395402
let path = temp_dir.path();
396-
let path = path.join("temp-db");
397-
move_bottom_to_db(lm, path.to_str().unwrap()).await?;
403+
move_bottom_to_db(lm, path.to_str().unwrap(), "temp-db").await?;
398404
Ok((sm, temp_dir))
399405
}
400406

401407
/// Build a DB from the bottom level of the immutable levels.
402-
async fn move_bottom_to_db(lm: &mut LeveledMap, path: &str) -> Result<(), io::Error> {
408+
async fn move_bottom_to_db(
409+
lm: &mut LeveledMap,
410+
base_path: &str,
411+
rel_path: &str,
412+
) -> Result<(), io::Error> {
403413
let mut immutables = lm.immutable_levels_ref().clone();
404414
let bottom = immutables.levels().remove(0);
405415
lm.replace_immutable_levels(immutables);
@@ -408,14 +418,14 @@ async fn move_bottom_to_db(lm: &mut LeveledMap, path: &str) -> Result<(), io::Er
408418
let mut lm2 = LeveledMap::default();
409419
lm2.replace_immutable_levels(bottom);
410420

411-
compact(&mut lm2, path).await?;
421+
compact(&mut lm2, base_path, rel_path).await?;
412422

413423
*lm.persisted_mut() = lm2.persisted_mut().clone();
414424
Ok(())
415425
}
416426

417-
async fn compact(lm: &mut LeveledMap, path: &str) -> Result<(), io::Error> {
418-
let db_builder = DBBuilder::new_with_default_config(path)?;
427+
async fn compact(lm: &mut LeveledMap, base_path: &str, rel_path: &str) -> Result<(), io::Error> {
428+
let db_builder = DBBuilder::new(base_path, rel_path, rotbl::v001::Config::default())?;
419429

420430
let db = db_builder
421431
.build_from_leveled_map(lm, |_sys_data| "1-1-1-1.snap".to_string())

src/meta/raft-store/src/sm_v003/open_snapshot.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ use crate::config::RaftConfig;
1818

1919
/// A trait for opening a snapshot.
2020
pub trait OpenSnapshot {
21+
/// Open a snapshot at `<storage_path>/<rel_path>`
2122
fn open_snapshot(
22-
path: impl ToString,
23+
storage_path: impl ToString,
24+
rel_path: impl ToString,
2325
snapshot_id: SnapshotId,
2426
raft_config: &RaftConfig,
2527
) -> Result<Self, std::io::Error>

src/meta/raft-store/src/sm_v003/received.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ pub struct Received {
2323
pub format: String,
2424
pub vote: Vote,
2525
pub snapshot_meta: SnapshotMeta,
26-
pub temp_path: String,
26+
27+
pub storage_path: String,
28+
pub temp_rel_path: String,
2729

2830
pub remote_addr: String,
2931

src/meta/raft-store/src/sm_v003/receiver_v003.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use crate::sm_v003::received::Received;
3333
pub struct ReceiverV003 {
3434
remote_addr: String,
3535

36-
temp_path: String,
36+
storage_path: String,
37+
38+
temp_rel_path: String,
3739

3840
temp_file: Option<BufWriter<File>>,
3941

@@ -49,13 +51,19 @@ pub struct ReceiverV003 {
4951

5052
impl ReceiverV003 {
5153
/// Create a new snapshot receiver with an empty snapshot.
52-
pub(crate) fn new(remote_addr: impl ToString, temp_path: impl ToString, temp_f: File) -> Self {
54+
pub(crate) fn new(
55+
remote_addr: impl ToString,
56+
storage_path: impl ToString,
57+
temp_rel_path: impl ToString,
58+
temp_f: File,
59+
) -> Self {
5360
let remote_addr = remote_addr.to_string();
54-
info!("Begin receiving snapshot v2 stream from: {}", remote_addr);
61+
info!("Begin receiving snapshot v003 stream from: {}", remote_addr);
5562

5663
ReceiverV003 {
5764
remote_addr,
58-
temp_path: temp_path.to_string(),
65+
storage_path: storage_path.to_string(),
66+
temp_rel_path: temp_rel_path.to_string(),
5967
temp_file: Some(BufWriter::with_capacity(64 * 1024 * 1024, temp_f)),
6068
on_recv: None,
6169
n_received: 0,
@@ -118,27 +126,26 @@ impl ReceiverV003 {
118126
chunk: SnapshotChunkRequestV003,
119127
) -> Result<Option<Received>, io::Error> {
120128
let remote_addr = self.remote_addr.clone();
121-
let temp_path = self.temp_path.clone();
122129

123130
fn invalid_input<E>(e: E) -> io::Error
124131
where E: Into<Box<dyn std::error::Error + Send + Sync>> {
125132
io::Error::new(io::ErrorKind::InvalidInput, e)
126133
}
127134

135+
// 1. update stat
136+
self.update_stat(&chunk);
137+
128138
// Add context info to io::Error
129139
let ctx = |e: io::Error, context: &str| -> io::Error {
130140
io::Error::new(
131141
e.kind(),
132142
format!(
133-
"{} while:(ReceiverV003::receive(): {}; remote_addr: {}; temp_path: {})",
134-
e, context, remote_addr, temp_path
143+
"{} while:(ReceiverV003::receive(): {}; remote_addr: {}; temp_path: {}/{})",
144+
e, context, remote_addr, self.storage_path, self.temp_rel_path
135145
),
136146
)
137147
};
138148

139-
// 1. update stat
140-
self.update_stat(&chunk);
141-
142149
// 2. write chunk to local snapshot_data
143150
{
144151
let f = self.temp_file.as_mut().ok_or_else(|| {
@@ -163,8 +170,8 @@ impl ReceiverV003 {
163170
};
164171

165172
info!(
166-
"snapshot from {} is completely received, format: {}, vote: {:?}, meta: {:?}, size: {}; path: {}",
167-
self.remote_addr, format, vote, snapshot_meta, self.size_received, self.temp_path
173+
"snapshot from {} is completely received, format: {}, vote: {:?}, meta: {:?}, size: {}; path: {}/{}",
174+
self.remote_addr, format, vote, snapshot_meta, self.size_received, self.storage_path, self.temp_rel_path
168175
);
169176

170177
if format != "rotbl::v001" {
@@ -198,7 +205,8 @@ impl ReceiverV003 {
198205
format,
199206
vote,
200207
snapshot_meta,
201-
temp_path: self.temp_path.clone(),
208+
storage_path: self.storage_path.clone(),
209+
temp_rel_path: self.temp_rel_path.clone(),
202210
remote_addr: self.remote_addr.clone(),
203211
n_received: self.n_received,
204212
size_received: self.size_received,
@@ -214,15 +222,15 @@ impl ReceiverV003 {
214222
debug!(
215223
len = data_len,
216224
total_len = self.size_received;
217-
"received {}-th snapshot chunk from {}; path: {}",
218-
self.n_received, self.remote_addr, self.temp_path
225+
"received {}-th snapshot chunk from {}; path: {}/{}",
226+
self.n_received, self.remote_addr, self.storage_path, self.temp_rel_path
219227
);
220228

221229
if self.n_received % 100 == 0 {
222230
info!(
223231
total_len = self.size_received;
224-
"received {}-th snapshot chunk from {}; path: {}",
225-
self.n_received, self.remote_addr, self.temp_path
232+
"received {}-th snapshot chunk from {}; path: {}/{}",
233+
self.n_received, self.remote_addr, self.storage_path, self.temp_rel_path
226234
);
227235
}
228236

0 commit comments

Comments
 (0)