Skip to content

Commit 5b2cc3f

Browse files
committed
merge
2 parents 5f18463 + 15d8bf1 commit 5b2cc3f

File tree

34 files changed

+694
-529
lines changed

34 files changed

+694
-529
lines changed

.github/actions/test_logs/action.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ runs:
3333
bash ./tests/logging/test-logs.sh
3434
3535
- name: Run History Tables Tests
36+
env:
37+
LOG_HISTORY_STORAGE_S3_ACCESS_KEY_ID: 'minioadmin'
38+
LOG_HISTORY_STORAGE_S3_SECRET_ACCESS_KEY: 'minioadmin'
3639
shell: bash
3740
run: |
3841
bash ./tests/logging/test-history-tables.sh

Cargo.lock

Lines changed: 13 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ reqwest-hickory-resolver = "0.2"
466466
ringbuffer = "0.14.2"
467467
rmp-serde = "1.1.1"
468468
roaring = { version = "^0.10", features = ["serde"] }
469-
rotbl = { version = "0.2.1", features = [] }
469+
rotbl = { version = "0.2.3", features = [] }
470470
rust_decimal = "1.26"
471471
rustix = "0.38.37"
472472
rustls = { version = "0.23.27", features = ["ring", "tls12"], default-features = false }

src/common/storage/src/parquet.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ pub async fn read_metadata_async(
116116
let buffer_len = buffer.len();
117117

118118
let map_err =
119-
|e: ParquetError| ErrorCode::BadBytes(format!("Invalid Parquet File {path}: {e}",));
119+
|e: ParquetError| ErrorCode::BadBytes(format!("Invalid Parquet file '{path}': {e}",));
120120
let footer_tail = ParquetMetaDataReader::decode_footer_tail(
121121
&buffer[(buffer_len - FOOTER_SIZE as usize)..]
122122
.try_into()
@@ -160,7 +160,7 @@ pub fn read_metadata_sync(
160160
check_footer_size(file_size, path)?;
161161

162162
let map_err =
163-
|e: ParquetError| ErrorCode::BadBytes(format!("Invalid Parquet File {path}: {e}",));
163+
|e: ParquetError| ErrorCode::BadBytes(format!("Invalid Parquet file '{path}': {e}",));
164164
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
165165
let default_end_len = DEFAULT_FOOTER_READ_SIZE.min(file_size);
166166
let buffer = blocking
@@ -198,7 +198,7 @@ pub fn read_metadata_sync(
198198
fn check_footer_size(file_size: u64, path: &str) -> Result<()> {
199199
if file_size < FOOTER_SIZE {
200200
Err(ErrorCode::BadBytes(format!(
201-
"Invalid Parquet file {path}. Size is smaller than footer."
201+
"Not a parquet file ({path}): only {file_size} bytes."
202202
)))
203203
} else {
204204
Ok(())
@@ -209,7 +209,7 @@ fn check_footer_size(file_size: u64, path: &str) -> Result<()> {
209209
fn check_meta_size(file_size: u64, metadata_len: u64, path: &str) -> Result<()> {
210210
if metadata_len + FOOTER_SIZE > file_size {
211211
Err(ErrorCode::BadBytes(format!(
212-
"Invalid Parquet file {path}. Reported metadata length of {} + {} byte footer, but file is only {} bytes",
212+
"Invalid Parquet file '{path}': Reported metadata length of {} + {} byte footer, but file is only {} bytes",
213213
metadata_len, FOOTER_SIZE, file_size
214214
)))
215215
} else {

src/meta/control/src/import_v004.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ pub async fn import_v004(
5858
let writer = snapshot_store.new_writer()?;
5959
let (tx, join_handle) = writer.spawn_writer_thread("import_v004");
6060

61-
let sys_data = Arc::new(Mutex::new(SysData::default()));
61+
let sys_data_holder = Arc::new(Mutex::new(SysData::default()));
6262

6363
let mut converter = SMEntryV002ToV004 {
64-
sys_data: sys_data.clone(),
64+
sys_data: sys_data_holder.clone(),
6565
};
6666

6767
for line in lines {
@@ -93,20 +93,16 @@ pub async fn import_v004(
9393
let max_log_id = raft_log_importer.max_log_id;
9494
raft_log_importer.flush().await?;
9595

96-
let s = {
97-
let r = sys_data.lock().unwrap();
96+
let sys_data = {
97+
let r = sys_data_holder.lock().unwrap();
9898
r.clone()
9999
};
100-
101-
tx.send(WriteEntry::Finish(s)).await?;
102-
let temp_snapshot_data = join_handle.await??;
103-
104-
let last_applied = {
105-
let r = sys_data.lock().unwrap();
106-
*r.last_applied_ref()
107-
};
100+
let last_applied = *sys_data.last_applied_ref();
108101
let snapshot_id = MetaSnapshotId::new_with_epoch(last_applied);
109-
let db = temp_snapshot_data.move_to_final_path(snapshot_id.to_string())?;
102+
103+
tx.send(WriteEntry::Finish((snapshot_id.clone(), sys_data)))
104+
.await?;
105+
let db = join_handle.await??;
110106

111107
eprintln!(
112108
"{data_version}: Imported {} records, snapshot: {}; snapshot_path: {}; snapshot_stat: {}",

src/meta/raft-store/src/leveled_store/db_builder.rs

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@ use rotbl::v001::RotblMeta;
2828
use rotbl::v001::SeqMarked;
2929

3030
use crate::leveled_store::leveled_map::LeveledMap;
31+
use crate::sm_v003::open_snapshot::OpenSnapshot;
32+
#[cfg(doc)]
33+
use crate::sm_v003::SnapshotStoreV004;
34+
use crate::snapshot_config::SnapshotConfig;
35+
use crate::state_machine::MetaSnapshotId;
3136

3237
/// Builds a snapshot from series of key-value in `(String, SeqMarked)`
3338
pub(crate) struct DBBuilder {
34-
storage_path: String,
35-
rel_path: String,
3639
rotbl_builder: rotbl::v001::Builder<FsStorage>,
3740
}
3841

@@ -44,20 +47,11 @@ impl DBBuilder {
4447
) -> Result<Self, io::Error> {
4548
let storage = FsStorage::new(storage_path.as_ref().to_path_buf());
4649

47-
let base_path = storage_path.as_ref().to_str().ok_or_else(|| {
48-
io::Error::new(
49-
io::ErrorKind::InvalidInput,
50-
format!("invalid path: {:?}", storage_path.as_ref()),
51-
)
52-
})?;
53-
5450
let rel_path = rel_path.to_string();
5551

5652
let inner = rotbl::v001::Builder::new(storage, rotbl_config, &rel_path)?;
5753

5854
let b = Self {
59-
storage_path: base_path.to_string(),
60-
rel_path,
6155
rotbl_builder: inner,
6256
};
6357

@@ -82,11 +76,39 @@ impl DBBuilder {
8276
Ok(())
8377
}
8478

85-
/// Flush the data to disk and return:
86-
/// - storage path,
87-
/// - relative path of the db,
88-
/// - and a read-only table [`Rotbl`] instance.
89-
pub fn flush(self, sys_data: SysData) -> Result<(String, String, Rotbl), io::Error> {
79+
/// Commit the building to the provided [`SnapshotStoreV004`].
80+
pub fn commit_to_snapshot_store(
81+
self,
82+
snapshot_config: &SnapshotConfig,
83+
snapshot_id: MetaSnapshotId,
84+
sys_data: SysData,
85+
) -> Result<DB, io::Error> {
86+
let config = self.rotbl_builder.config().clone();
87+
let storage_path = self.rotbl_builder.storage().base_dir_str().to_string();
88+
89+
let (current_rel_path, _r) = self.commit(sys_data)?;
90+
91+
let current_path = format!("{}/{}", storage_path, current_rel_path);
92+
93+
let (_, rel_path) =
94+
snapshot_config.move_to_final_path(&current_path, snapshot_id.to_string())?;
95+
96+
let db = DB::open_snapshot(&storage_path, &rel_path, snapshot_id.to_string(), config)
97+
.map_err(|e| {
98+
io::Error::new(
99+
e.kind(),
100+
format!(
101+
"{}; when:(open snapshot at: {}/{})",
102+
e, storage_path, rel_path
103+
),
104+
)
105+
})?;
106+
107+
Ok(db)
108+
}
109+
110+
/// Commit the building.
111+
pub(crate) fn commit(self, sys_data: SysData) -> Result<(String, Rotbl), io::Error> {
90112
let meta = serde_json::to_string(&sys_data).map_err(|e| {
91113
io::Error::new(
92114
io::ErrorKind::InvalidData,
@@ -97,8 +119,12 @@ impl DBBuilder {
97119
// the first arg `seq` is not used.
98120
let rotbl_meta = RotblMeta::new(0, meta);
99121

122+
// The relative path of the built rotbl.
123+
let rel_path = self.rotbl_builder.rel_path().to_string();
124+
100125
let r = self.rotbl_builder.commit(rotbl_meta)?;
101-
Ok((self.storage_path, self.rel_path, r))
126+
127+
Ok((rel_path, r))
102128
}
103129

104130
/// Build a [`DB`] from a leveled map.
@@ -118,10 +144,17 @@ impl DBBuilder {
118144

119145
self.append_kv_stream(strm).await?;
120146

147+
let storage_path = self.storage_path().to_string();
148+
121149
let snapshot_id = make_snapshot_id(&sys_data);
122-
let (storage_path, rel_path, r) = self.flush(sys_data)?;
150+
let (rel_path, r) = self.commit(sys_data)?;
123151
let db = DB::new(storage_path, rel_path, snapshot_id, Arc::new(r))?;
124152

125153
Ok(db)
126154
}
155+
156+
/// Returns the base dir of the internal FS-storage.
157+
fn storage_path(&self) -> &str {
158+
self.rotbl_builder.storage().base_dir_str()
159+
}
127160
}

src/meta/raft-store/src/leveled_store/db_open_snapshot_impl.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,20 @@ use openraft::SnapshotId;
2222
use rotbl::storage::impls::fs::FsStorage;
2323
use rotbl::v001::Rotbl;
2424

25-
use crate::config::RaftConfig;
2625
use crate::sm_v003::open_snapshot::OpenSnapshot;
2726

2827
impl OpenSnapshot for DB {
2928
fn open_snapshot(
3029
storage_path: impl ToString,
3130
rel_path: impl ToString,
3231
snapshot_id: SnapshotId,
33-
raft_config: &RaftConfig,
32+
config: rotbl::v001::Config,
3433
) -> Result<Self, io::Error> {
3534
let storage_path = storage_path.to_string();
3635
let rel_path = rel_path.to_string();
3736

3837
let storage = FsStorage::new(PathBuf::from(&storage_path));
3938

40-
let config = raft_config.to_rotbl_config();
4139
let r = Rotbl::open(storage, config, &rel_path.to_string())?;
4240

4341
info!("Opened snapshot at {storage_path}/{rel_path}");

src/meta/raft-store/src/sm_v003/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ pub mod open_snapshot;
2424
pub mod received;
2525
pub mod receiver_v003;
2626
pub mod snapshot_loader;
27-
pub mod temp_snapshot_data;
2827
pub mod write_entry;
2928
pub mod writer_stat;
3029

src/meta/raft-store/src/sm_v003/open_snapshot.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,14 @@
1414

1515
use openraft::SnapshotId;
1616

17-
use crate::config::RaftConfig;
18-
1917
/// A trait for opening a snapshot.
2018
pub trait OpenSnapshot {
2119
/// Open a snapshot at `<storage_path>/<rel_path>`
2220
fn open_snapshot(
2321
storage_path: impl ToString,
2422
rel_path: impl ToString,
2523
snapshot_id: SnapshotId,
26-
raft_config: &RaftConfig,
24+
config: rotbl::v001::Config,
2725
) -> Result<Self, std::io::Error>
2826
where
2927
Self: Sized;

src/meta/raft-store/src/sm_v003/snapshot_loader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ where SD: OpenSnapshot
7777
storage_path.clone(),
7878
rel_path,
7979
snapshot_id.clone(),
80-
self.snapshot_config.raft_config(),
80+
self.snapshot_config.raft_config().to_rotbl_config(),
8181
)
8282
.map_err(|e| {
8383
error!("failed to open snapshot file({}): {}", storage_path, e);

0 commit comments

Comments
 (0)