Skip to content

Commit 1dad976

Browse files
authored
fix: Avoid duplicate temp file name for install-snapshot requests (#15565)
This commit resolves an issue where duplicate temporary file names were generated when two install-snapshot requests were received in close temporal proximity. This could potentially lead to conflicts and erroneous behavior during snapshot installation. Changes: - Added logic to generate unique temporary file names for each install-snapshot request to prevent naming conflicts. - Additionally, this commit enhances error handling by including context information in the `io::Error` returned from `SnapshotStoreV002`. This improvement aids in better understanding the source and nature of errors when they occur.
1 parent 8d6ebaf commit 1dad976

File tree

2 files changed

+84
-31
lines changed

2 files changed

+84
-31
lines changed

โ€Žsrc/meta/raft-store/src/sm_v002/snapshot_store.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ pub struct SnapshotStoreV002 {
103103
}
104104

105105
impl SnapshotStoreV002 {
106+
const TEMP_PREFIX: &'static str = "0.snap";
107+
106108
pub fn new(data_version: DataVersion, config: RaftConfig) -> Self {
107109
SnapshotStoreV002 {
108110
data_version,
@@ -130,12 +132,15 @@ impl SnapshotStoreV002 {
130132
}
131133

132134
pub fn snapshot_temp_path(&self) -> String {
135+
// Sleep to avoid timestamp collision when this function is called twice in a short time.
136+
std::thread::sleep(std::time::Duration::from_millis(2));
137+
133138
let ts = SystemTime::now()
134139
.duration_since(UNIX_EPOCH)
135140
.unwrap()
136141
.as_millis();
137142

138-
format!("{}/0.snap-{}", self.snapshot_dir(), ts)
143+
format!("{}/{}-{}", self.snapshot_dir(), Self::TEMP_PREFIX, ts)
139144
}
140145

141146
/// Return a list of valid snapshot ids found in the snapshot directory.
@@ -164,7 +169,19 @@ impl SnapshotStoreV002 {
164169

165170
info!("cleaning old snapshots in {}", dir);
166171

167-
let (snapshot_ids, invalid_files) = self.load_snapshot_ids().await?;
172+
let (snapshot_ids, mut invalid_files) = self.load_snapshot_ids().await?;
173+
174+
// The last several temp files may be in use by snapshot transmitting.
175+
// And do not delete them at once.
176+
{
177+
let l = invalid_files.len();
178+
if l > 2 {
179+
invalid_files = invalid_files.into_iter().take(l - 2).collect();
180+
} else {
181+
invalid_files = vec![];
182+
}
183+
}
184+
168185
for invalid_file in invalid_files {
169186
let path = format!("{}/{}", dir, invalid_file);
170187

@@ -240,7 +257,10 @@ impl SnapshotStoreV002 {
240257
}
241258

242259
snapshot_ids.sort();
260+
invalid_files.sort();
261+
243262
info!("dir: {}; loaded snapshots: {:?}", dir, snapshot_ids);
263+
info!("dir: {}; invalid files: {:?}", dir, invalid_files);
244264

245265
Ok((snapshot_ids, invalid_files))
246266
}
@@ -329,3 +349,30 @@ impl SnapshotStoreV002 {
329349
SnapshotStoreError::read(e).with_context(context)
330350
}
331351
}
352+
353+
#[cfg(test)]
354+
mod tests {
355+
use crate::config::RaftConfig;
356+
use crate::ondisk::DATA_VERSION;
357+
358+
#[test]
359+
fn test_temp_path_no_dup() -> anyhow::Result<()> {
360+
let temp = tempfile::tempdir()?;
361+
let p = temp.path();
362+
let raft_config = RaftConfig {
363+
raft_dir: p.to_str().unwrap().to_string(),
364+
..Default::default()
365+
};
366+
367+
let store = super::SnapshotStoreV002::new(DATA_VERSION, raft_config);
368+
369+
let mut prev = None;
370+
for _i in 0..10 {
371+
let path = store.snapshot_temp_path();
372+
assert_ne!(prev, Some(path.clone()), "dup: {}", path);
373+
prev = Some(path);
374+
}
375+
376+
Ok(())
377+
}
378+
}

โ€Žsrc/meta/types/src/raft_snapshot_data.rs

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::task::Poll;
2121
use tokio::fs;
2222
use tokio::io::AsyncBufReadExt;
2323
use tokio::io::AsyncRead;
24-
use tokio::io::AsyncReadExt;
2524
use tokio::io::AsyncSeek;
2625
use tokio::io::AsyncWrite;
2726
use tokio::io::AsyncWriteExt;
@@ -41,7 +40,10 @@ impl SnapshotData {
4140
.create(false)
4241
.create_new(false)
4342
.read(true)
44-
.open(&path)?;
43+
.open(&path)
44+
.map_err(|e| {
45+
io::Error::new(e.kind(), format!("{}: while open(); path: {}", e, path))
46+
})?;
4547

4648
Ok(SnapshotData {
4749
is_temp: false,
@@ -56,7 +58,10 @@ impl SnapshotData {
5658
.write(true)
5759
.read(true)
5860
.open(&path)
59-
.await?;
61+
.await
62+
.map_err(|e| {
63+
io::Error::new(e.kind(), format!("{}: while new_temp(); path: {}", e, path))
64+
})?;
6065

6166
Ok(SnapshotData {
6267
is_temp: true,
@@ -66,7 +71,12 @@ impl SnapshotData {
6671
}
6772

6873
pub async fn data_size(&self) -> Result<u64, io::Error> {
69-
self.f.metadata().await.map(|m| m.len())
74+
self.f.metadata().await.map(|m| m.len()).map_err(|e| {
75+
io::Error::new(
76+
e.kind(),
77+
format!("{}: while data_size(); path: {}", e, self.path),
78+
)
79+
})
7080
}
7181

7282
pub fn is_temp(&self) -> bool {
@@ -82,44 +92,40 @@ impl SnapshotData {
8292
}
8393

8494
pub async fn sync_all(&mut self) -> Result<(), io::Error> {
85-
self.f.flush().await?;
86-
self.f.sync_all().await
87-
}
88-
89-
pub async fn read_lines<T>(self: Box<SnapshotData>) -> Result<Vec<T>, io::Error>
90-
where T: serde::de::DeserializeOwned {
91-
let mut res = vec![];
92-
93-
let b = BufReader::new(self);
94-
let mut lines = AsyncBufReadExt::lines(b);
95-
96-
while let Some(l) = lines.next_line().await? {
97-
let ent: T = serde_json::from_str(&l)
98-
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
99-
res.push(ent)
100-
}
101-
102-
Ok(res)
95+
self.f.flush().await.map_err(|e| {
96+
io::Error::new(
97+
e.kind(),
98+
format!("{}: while flush(); path: {}", e, self.path),
99+
)
100+
})?;
101+
102+
self.f.sync_all().await.map_err(|e| {
103+
io::Error::new(
104+
e.kind(),
105+
format!("{}: while sync_all(); path: {}", e, self.path),
106+
)
107+
})
103108
}
104109

105110
pub async fn read_to_lines(self: Box<SnapshotData>) -> Result<Vec<String>, io::Error> {
106111
let mut res = vec![];
107112

113+
let path = self.path.clone();
114+
108115
let b = BufReader::new(self);
109116
let mut lines = AsyncBufReadExt::lines(b);
110117

111-
while let Some(l) = lines.next_line().await? {
118+
while let Some(l) = lines.next_line().await.map_err(|e| {
119+
io::Error::new(
120+
e.kind(),
121+
format!("{}: while read_to_lines(); path: {}", e, path),
122+
)
123+
})? {
112124
res.push(l)
113125
}
114126

115127
Ok(res)
116128
}
117-
118-
pub async fn read_to_string(self: &mut Box<SnapshotData>) -> Result<String, io::Error> {
119-
let mut res = String::new();
120-
self.f.read_to_string(&mut res).await?;
121-
Ok(res)
122-
}
123129
}
124130

125131
impl AsyncRead for SnapshotData {

0 commit comments

Comments
ย (0)