Skip to content

Commit 0ca00f9

Browse files
authored
refactor: enable feature flag openraft:storage-v2 (#14838)
* refactor: enable feature flag openraft:storage-v2 Separate the implementations of RaftLogStorage and RaftStateMachine. * chore: tolerate time diff in expiration test
1 parent f22ad96 commit 0ca00f9

File tree

10 files changed

+481
-424
lines changed

10 files changed

+481
-424
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus
123123
openraft = { git = "https://github.com/drmingdrmer/openraft", tag = "v0.9.0-alpha.8", features = [
124124
"serde",
125125
"tracing-log",
126+
"storage-v2",
126127
"loosen-follower-log-revert", # allows removing all data from a follower and restoring from the leader.
127128
] }
128129

src/binaries/metactl/snapshot.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ use databend_common_meta_raft_store::sm_v002::SnapshotStoreV002;
3838
use databend_common_meta_raft_store::state::RaftState;
3939
use databend_common_meta_sled_store::get_sled_db;
4040
use databend_common_meta_sled_store::init_sled_db;
41+
use databend_common_meta_sled_store::openraft::storage::RaftLogStorageExt;
4142
use databend_common_meta_sled_store::openraft::RaftSnapshotBuilder;
42-
use databend_common_meta_sled_store::openraft::RaftStorage;
4343
use databend_common_meta_types::Cmd;
4444
use databend_common_meta_types::CommittedLeaderId;
4545
use databend_common_meta_types::Endpoint;
@@ -360,7 +360,7 @@ async fn init_new_cluster(
360360
payload: EntryPayload::Membership(membership),
361361
};
362362

363-
sto.append_to_log([entry]).await?;
363+
sto.blocking_append([entry]).await?;
364364

365365
// insert AddNodes logs
366366
for (node_id, node) in nodes {
@@ -381,7 +381,7 @@ async fn init_new_cluster(
381381
}),
382382
};
383383

384-
sto.append_to_log([entry]).await?;
384+
sto.blocking_append([entry]).await?;
385385
}
386386
}
387387

src/meta/raft-store/src/sm_v002/sm_v002.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,18 +232,19 @@ impl SMV002 {
232232
Applier::new(self)
233233
}
234234

235-
pub async fn apply_entries<'a>(
235+
pub async fn apply_entries(
236236
&mut self,
237-
entries: impl IntoIterator<Item = &'a Entry>,
237+
entries: impl IntoIterator<Item = Entry>,
238238
) -> Result<Vec<AppliedState>, StorageIOError> {
239239
let mut applier = Applier::new(self);
240240

241241
let mut res = vec![];
242242

243243
for ent in entries.into_iter() {
244+
info!("apply: {}", *ent.get_log_id());
244245
let log_id = *ent.get_log_id();
245246
let r = applier
246-
.apply(ent)
247+
.apply(&ent)
247248
.await
248249
.map_err(|e| StorageIOError::apply(log_id, &e))?;
249250
res.push(r);

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ use databend_common_meta_raft_store::ondisk::DataVersion;
3636
use databend_common_meta_raft_store::ondisk::DATA_VERSION;
3737
use databend_common_meta_raft_store::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
3838
use databend_common_meta_sled_store::openraft;
39-
use databend_common_meta_sled_store::openraft::storage::Adaptor;
4039
use databend_common_meta_sled_store::openraft::ChangeMembers;
4140
use databend_common_meta_stoerr::MetaStorageError;
4241
use databend_common_meta_types::protobuf::raft_service_client::RaftServiceClient;
@@ -156,8 +155,8 @@ pub struct MetaNodeStatus {
156155
pub last_seq: u64,
157156
}
158157

159-
pub type LogStore = Adaptor<TypeConfig, RaftStore>;
160-
pub type SMStore = Adaptor<TypeConfig, RaftStore>;
158+
pub type LogStore = RaftStore;
159+
pub type SMStore = RaftStore;
161160

162161
/// MetaRaft is a implementation of the generic Raft handling meta data R/W.
163162
pub type MetaRaft = Raft<TypeConfig>;
@@ -205,7 +204,8 @@ impl MetaNodeBuilder {
205204

206205
let net = Network::new(sto.clone());
207206

208-
let (log_store, sm_store) = Adaptor::new(sto.clone());
207+
let log_store = sto.clone();
208+
let sm_store = sto.clone();
209209

210210
let raft = MetaRaft::new(node_id, Arc::new(config), net, log_store, sm_store)
211211
.await

src/meta/service/src/store/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod raft_log_storage_impl;
16+
mod raft_state_machine_impl;
1517
#[allow(clippy::module_inception)]
1618
mod store;
1719
mod store_inner;
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt::Debug;
16+
use std::io::ErrorKind;
17+
use std::ops::RangeBounds;
18+
19+
use databend_common_base::base::tokio::io;
20+
use databend_common_meta_sled_store::openraft::storage::LogFlushed;
21+
use databend_common_meta_sled_store::openraft::storage::RaftLogStorage;
22+
use databend_common_meta_sled_store::openraft::ErrorSubject;
23+
use databend_common_meta_sled_store::openraft::ErrorVerb;
24+
use databend_common_meta_sled_store::openraft::LogState;
25+
use databend_common_meta_sled_store::openraft::OptionalSend;
26+
use databend_common_meta_sled_store::openraft::RaftLogReader;
27+
use databend_common_meta_types::Entry;
28+
use databend_common_meta_types::LogId;
29+
use databend_common_meta_types::StorageError;
30+
use databend_common_meta_types::TypeConfig;
31+
use databend_common_meta_types::Vote;
32+
use log::debug;
33+
use log::error;
34+
use log::info;
35+
36+
use crate::metrics::raft_metrics;
37+
use crate::store::RaftStore;
38+
use crate::store::ToStorageError;
39+
40+
impl RaftLogReader<TypeConfig> for RaftStore {
41+
#[minitrace::trace]
42+
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send>(
43+
&mut self,
44+
range: RB,
45+
) -> Result<Vec<Entry>, StorageError> {
46+
debug!(
47+
"try_get_log_entries: self.id={}, range: {:?}",
48+
self.id, range
49+
);
50+
51+
match self
52+
.log
53+
.read()
54+
.await
55+
.range_values(range)
56+
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read)
57+
{
58+
Ok(entries) => Ok(entries),
59+
Err(err) => {
60+
raft_metrics::storage::incr_raft_storage_fail("try_get_log_entries", false);
61+
Err(err)
62+
}
63+
}
64+
}
65+
}
66+
67+
impl RaftLogStorage<TypeConfig> for RaftStore {
68+
type LogReader = RaftStore;
69+
70+
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError> {
71+
let last_purged_log_id = match self
72+
.log
73+
.read()
74+
.await
75+
.get_last_purged()
76+
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read)
77+
{
78+
Err(err) => {
79+
raft_metrics::storage::incr_raft_storage_fail("get_log_state", false);
80+
return Err(err);
81+
}
82+
Ok(r) => r,
83+
};
84+
85+
let last = match self
86+
.log
87+
.read()
88+
.await
89+
.logs()
90+
.last()
91+
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read)
92+
{
93+
Err(err) => {
94+
raft_metrics::storage::incr_raft_storage_fail("get_log_state", false);
95+
return Err(err);
96+
}
97+
Ok(r) => r,
98+
};
99+
100+
let last_log_id = match last {
101+
None => last_purged_log_id,
102+
Some(x) => Some(x.1.log_id),
103+
};
104+
105+
debug!(
106+
"get_log_state: ({:?},{:?}]",
107+
last_purged_log_id, last_log_id
108+
);
109+
110+
Ok(LogState {
111+
last_purged_log_id,
112+
last_log_id,
113+
})
114+
}
115+
116+
async fn get_log_reader(&mut self) -> Self::LogReader {
117+
self.clone()
118+
}
119+
120+
async fn save_committed(&mut self, committed: Option<LogId>) -> Result<(), StorageError> {
121+
self.raft_state
122+
.write()
123+
.await
124+
.save_committed(committed)
125+
.await
126+
.map_to_sto_err(ErrorSubject::Store, ErrorVerb::Write)
127+
}
128+
129+
async fn read_committed(&mut self) -> Result<Option<LogId>, StorageError> {
130+
self.raft_state
131+
.read()
132+
.await
133+
.read_committed()
134+
.map_to_sto_err(ErrorSubject::Store, ErrorVerb::Read)
135+
}
136+
137+
#[minitrace::trace]
138+
async fn save_vote(&mut self, hs: &Vote) -> Result<(), StorageError> {
139+
info!(id = self.id, hs :? =(hs); "save_vote");
140+
141+
match self
142+
.raft_state
143+
.write()
144+
.await
145+
.save_vote(hs)
146+
.await
147+
.map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Write)
148+
{
149+
Err(err) => {
150+
raft_metrics::storage::incr_raft_storage_fail("save_vote", true);
151+
Err(err)
152+
}
153+
Ok(_) => Ok(()),
154+
}
155+
}
156+
157+
#[minitrace::trace]
158+
async fn read_vote(&mut self) -> Result<Option<Vote>, StorageError> {
159+
match self
160+
.raft_state
161+
.read()
162+
.await
163+
.read_vote()
164+
.map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Read)
165+
{
166+
Err(err) => {
167+
raft_metrics::storage::incr_raft_storage_fail("read_vote", false);
168+
Err(err)
169+
}
170+
Ok(vote) => Ok(vote),
171+
}
172+
}
173+
174+
#[minitrace::trace]
175+
async fn append<I>(
176+
&mut self,
177+
entries: I,
178+
callback: LogFlushed<TypeConfig>,
179+
) -> Result<(), StorageError>
180+
where
181+
I: IntoIterator<Item = Entry> + OptionalSend,
182+
I::IntoIter: OptionalSend,
183+
{
184+
// TODO: it is bad: allocates a new vec.
185+
let entries = entries
186+
.into_iter()
187+
.map(|x| {
188+
info!("append_to_log: {}", x.log_id);
189+
x
190+
})
191+
.collect::<Vec<_>>();
192+
193+
let res = match self.log.write().await.append(entries).await {
194+
Err(err) => {
195+
raft_metrics::storage::incr_raft_storage_fail("append_to_log", true);
196+
Err(err)
197+
}
198+
Ok(_) => Ok(()),
199+
};
200+
201+
callback.log_io_completed(res.map_err(|e| io::Error::new(ErrorKind::InvalidData, e)));
202+
203+
Ok(())
204+
}
205+
206+
#[minitrace::trace]
207+
async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> {
208+
info!(id = self.id; "truncate: {}", log_id);
209+
210+
match self
211+
.log
212+
.write()
213+
.await
214+
.range_remove(log_id.index..)
215+
.await
216+
.map_to_sto_err(ErrorSubject::Log(log_id), ErrorVerb::Delete)
217+
{
218+
Ok(_) => Ok(()),
219+
Err(err) => {
220+
raft_metrics::storage::incr_raft_storage_fail("delete_conflict_logs_since", true);
221+
Err(err)
222+
}
223+
}
224+
}
225+
226+
#[minitrace::trace]
227+
async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> {
228+
info!(id = self.id, log_id :? =(&log_id); "purge upto: start");
229+
230+
if let Err(err) = self
231+
.log
232+
.write()
233+
.await
234+
.set_last_purged(log_id)
235+
.await
236+
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Write)
237+
{
238+
raft_metrics::storage::incr_raft_storage_fail("purge_logs_upto", true);
239+
return Err(err);
240+
};
241+
242+
info!(id = self.id, log_id :? =(&log_id); "purge_logs_upto: Done: set_last_purged()");
243+
244+
let log = self.log.write().await.clone();
245+
246+
// Purge can be done in another task safely, because:
247+
//
248+
// - Next time when raft starts, it will read last_purged_log_id without examining the actual first log.
249+
// And junk can be removed next time purge_logs_upto() is called.
250+
//
251+
// - Purging operates the start of the logs, and only committed logs are purged;
252+
// while append and truncate operates on the end of the logs,
253+
// it is safe to run purge && (append || truncate) concurrently.
254+
databend_common_base::runtime::spawn({
255+
let id = self.id;
256+
async move {
257+
info!(id = id, log_id :? =(&log_id); "purge_logs_upto: Start: asynchronous range_remove()");
258+
259+
let res = log.range_remove(..=log_id.index).await;
260+
261+
if let Err(err) = res {
262+
error!(id = id, log_id :? =(&log_id); "purge_logs_upto: in asynchronous error: {}", err);
263+
raft_metrics::storage::incr_raft_storage_fail("purge_logs_upto", true);
264+
}
265+
266+
info!(id = id, log_id :? =(&log_id); "purge_logs_upto: Done: asynchronous range_remove()");
267+
}
268+
});
269+
270+
Ok(())
271+
}
272+
}

0 commit comments

Comments
 (0)