Skip to content

Commit 6ec6846

Browse files
authored
Merge pull request #9240 from drmingdrmer/30-fixup
refactor(meta/watcher): just use Change as event type
2 parents 8c3b8c5 + 83fc47b commit 6ec6846

File tree

8 files changed

+71
-116
lines changed

8 files changed

+71
-116
lines changed

src/meta/api/src/schema_api_impl.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -987,7 +987,6 @@ impl<KV: KVApi> SchemaApi for KV {
987987
if_then: vec![
988988
// Changing a table in a db has to update the seq of db_meta,
989989
// to block the batch-delete-tables when deleting a db.
990-
// TODO: test this when old metasrv is replaced with kv-txn based SchemaApi.
991990
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
992991
txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */
993992
txn_op_put(&tbid, serialize_struct(&req.table_meta)?), /* (tenant, db_id, tb_id) -> tb_meta */
@@ -1149,7 +1148,6 @@ impl<KV: KVApi> SchemaApi for KV {
11491148
if_then: vec![
11501149
// Changing a table in a db has to update the seq of db_meta,
11511150
// to block the batch-delete-tables when deleting a db.
1152-
// TODO: test this when old metasrv is replaced with kv-txn based SchemaApi.
11531151
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
11541152
txn_op_del(&dbid_tbname), // (db_id, tb_name) -> tb_id
11551153
txn_op_put(&tbid, serialize_struct(&tb_meta)?), /* (tenant, db_id, tb_id) -> tb_meta */
@@ -1307,7 +1305,6 @@ impl<KV: KVApi> SchemaApi for KV {
13071305
if_then: vec![
13081306
// Changing a table in a db has to update the seq of db_meta,
13091307
// to block the batch-delete-tables when deleting a db.
1310-
// TODO: test this when old metasrv is replaced with kv-txn based SchemaApi.
13111308
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
13121309
txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */
13131310
// txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list)?)?, // _fd_table_id_list/db_id/table_name -> tb_id_list
@@ -1498,7 +1495,6 @@ impl<KV: KVApi> SchemaApi for KV {
14981495
txn_op_put(&newdbid_newtbname, serialize_u64(table_id)?), /* (db_id, new_tb_name) -> tb_id */
14991496
// Changing a table in a db has to update the seq of db_meta,
15001497
// to block the batch-delete-tables when deleting a db.
1501-
// TODO: test this when old metasrv is replaced with kv-txn based SchemaApi.
15021498
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
15031499
txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/old_table_name -> tb_id_list */
15041500
txn_op_put(&new_dbid_tbname_idlist, serialize_struct(&new_tb_id_list)?), /* _fd_table_id_list/db_id/new_table_name -> tb_id_list */
@@ -1507,7 +1503,6 @@ impl<KV: KVApi> SchemaApi for KV {
15071503

15081504
if db_id != new_db_id {
15091505
then_ops.push(
1510-
// TODO: test this when old metasrv is replaced with kv-txn based SchemaApi.
15111506
txn_op_put(
15121507
&DatabaseId { db_id: new_db_id },
15131508
serialize_struct(&new_db_meta)?,

src/meta/raft-store/src/state_machine/sm.rs

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,11 @@ const TREE_STATE_MACHINE: &str = "state_machine";
9595

9696
/// StateMachine subscriber trait
9797
pub trait StateMachineSubscriber: Debug + Sync + Send {
98-
fn kv_changed(&self, key: &str, prev: Option<SeqV>, current: Option<SeqV>);
98+
fn kv_changed(&self, change: Change<Vec<u8>, String>);
9999
}
100100

101101
/// The state machine of the `MemStore`.
102-
/// It includes user data and two raft-related informations:
102+
/// It includes user data and two raft-related information:
103103
/// `last_applied_logs` and `client_serial_responses` to achieve idempotence.
104104
#[derive(Debug)]
105105
pub struct StateMachine {
@@ -342,9 +342,7 @@ impl StateMachine {
342342
// Send queued change events to subscriber
343343
if let Some(subscriber) = &self.subscriber {
344344
for event in changes {
345-
// TODO: use Change as the event data type.
346-
// safe unwrap
347-
subscriber.kv_changed(&event.ident.unwrap(), event.prev, event.result);
345+
subscriber.kv_changed(event);
348346
}
349347
}
350348

@@ -638,17 +636,15 @@ impl StateMachine {
638636
if let Some(kv_pairs) = kv_pairs {
639637
if let Some(kv_pairs) = kv_pairs.get(delete_by_prefix) {
640638
for (key, _seq) in kv_pairs.iter() {
641-
// TODO: return StorageError
642-
let ret = Self::txn_upsert_kv(txn_tree, &UpsertKV::delete(key), log_time_ms);
639+
let (expired, prev, res) =
640+
Self::txn_upsert_kv(txn_tree, &UpsertKV::delete(key), log_time_ms)?;
643641

644-
if let Ok(ret) = ret {
645-
count += 1;
642+
count += 1;
646643

647-
if ret.0.is_some() {
648-
txn_tree.push_change(key, ret.0.clone(), None);
649-
}
650-
txn_tree.push_change(key, ret.1.clone(), ret.2);
644+
if expired.is_some() {
645+
txn_tree.push_change(key, expired, None);
651646
}
647+
txn_tree.push_change(key, prev, res);
652648
}
653649
}
654650
}
@@ -1046,7 +1042,7 @@ impl StateMachine {
10461042
sm_nodes.range_values(..)
10471043
}
10481044

1049-
/// Expire an `SeqV` and returns:
1045+
/// Expire an `SeqV` and returns the value discarded by expiration and the unexpired value:
10501046
/// - `(Some, None)` if it expires.
10511047
/// - `(None, Some)` if it does not.
10521048
/// - `(None, None)` if the input is None.
@@ -1064,32 +1060,6 @@ impl StateMachine {
10641060
(None, None)
10651061
}
10661062
}
1067-
1068-
pub fn unexpired<V: Debug>(seq_value: SeqV<V>, log_time_ms: u64) -> Option<SeqV<V>> {
1069-
// Caveat: The cleanup must be consistent across raft nodes:
1070-
// A conditional update, e.g. an upsert_kv() with MatchSeq::Eq(some_value),
1071-
// must be applied with the same timestamp on every raft node.
1072-
// Otherwise: node-1 could have applied a log with a ts that is smaller than value.expire_at,
1073-
// while node-2 may fail to apply the same log if it use a greater ts > value.expire_at.
1074-
//
1075-
// Thus:
1076-
//
1077-
// 1. A raft log must have a field ts assigned by the leader. When applying, use this ts to
1078-
// check against expire_at to decide whether to purge it.
1079-
// 2. A GET operation must not purge any expired entry. Since a GET is only applied to a node itself.
1080-
// 3. The background task can only be triggered by the raft leader, by submit a "clean expired" log.
1081-
1082-
// TODO(xp): background task to clean expired
1083-
// TODO(xp): maybe it needs a expiration queue for efficient cleaning up.
1084-
1085-
debug!("seq_value: {:?} log_time_ms: {}", seq_value, log_time_ms);
1086-
1087-
if seq_value.get_expire_at() < log_time_ms {
1088-
None
1089-
} else {
1090-
Some(seq_value)
1091-
}
1092-
}
10931063
}
10941064

10951065
/// Key space support

src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,10 @@ impl KVApi for StateMachine {
7575
// TODO(xp) refine get(): a &str is enough for key
7676
let sv = self.kvs().get(&key.to_string())?;
7777
debug!("get_kv sv:{:?}", sv);
78-
let seq_v = match sv {
79-
None => return Ok(None),
80-
Some(sv) => sv,
81-
};
8278

8379
let local_now_ms = SeqV::<()>::now_ms();
84-
Ok(Self::unexpired(seq_v, local_now_ms))
80+
let (_expired, res) = Self::expire_seq_v(sv, local_now_ms);
81+
Ok(res)
8582
}
8683

8784
async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, KVAppError> {
@@ -111,7 +108,7 @@ impl KVApi for StateMachine {
111108
let local_now_ms = SeqV::<()>::now_ms();
112109

113110
// Convert expired to None
114-
let x = x.map(|(k, v)| (k, Self::unexpired(v, local_now_ms)));
111+
let x = x.map(|(k, v)| (k, Self::expire_seq_v(Some(v), local_now_ms).1));
115112
// Remove None
116113
let x = x.filter(|(_k, v)| v.is_some());
117114
// Extract from an Option

src/meta/service/src/meta_service/raftmeta.rs

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -329,20 +329,18 @@ impl MetaNode {
329329
Ok(())
330330
}
331331

332-
/// Open or create a metasrv node.
333-
/// Optionally boot a single node cluster.
332+
/// Open or create a meta node.
334333
/// 1. If `open` is `Some`, try to open an existent one.
335334
/// 2. If `create` is `Some`, try to create an one in non-voter mode.
336335
#[tracing::instrument(level = "debug", skip_all)]
337-
pub async fn open_create_boot(
336+
pub async fn open_create(
338337
config: &RaftConfig,
339338
open: Option<()>,
340339
create: Option<()>,
341-
initialize_cluster: Option<Node>,
342340
) -> Result<Arc<MetaNode>, MetaStartupError> {
343341
info!(
344-
"open_create_boot, config: {:?}, open: {:?}, create: {:?}, initialize_cluster: {:?}",
345-
config, open, create, initialize_cluster
342+
"open_create_boot, config: {:?}, open: {:?}, create: {:?}",
343+
config, open, create
346344
);
347345

348346
let mut config = config.clone();
@@ -371,10 +369,21 @@ impl MetaNode {
371369

372370
info!("MetaNode started: {:?}", config);
373371

374-
// init_cluster with advertise_host other than listen_host
375-
if mn.is_opened() {
376-
return Ok(mn);
377-
}
372+
Ok(mn)
373+
}
374+
375+
/// Open or create a metasrv node.
376+
/// Optionally boot a single node cluster.
377+
/// 1. If `open` is `Some`, try to open an existent one.
378+
/// 2. If `create` is `Some`, try to create an one in non-voter mode.
379+
#[tracing::instrument(level = "debug", skip_all)]
380+
pub async fn open_create_boot(
381+
config: &RaftConfig,
382+
open: Option<()>,
383+
create: Option<()>,
384+
initialize_cluster: Option<Node>,
385+
) -> Result<Arc<MetaNode>, MetaStartupError> {
386+
let mn = Self::open_create(config, open, create).await?;
378387

379388
if let Some(node) = initialize_cluster {
380389
mn.init_cluster(node).await?;
@@ -718,47 +727,43 @@ impl MetaNode {
718727
async fn do_start(conf: &MetaConfig) -> Result<Arc<MetaNode>, MetaStartupError> {
719728
let raft_conf = &conf.raft_config;
720729

721-
let initialize_cluster = if raft_conf.single {
722-
Some(conf.get_node())
723-
} else {
724-
None
725-
};
726-
727730
if raft_conf.single {
728-
let mn = MetaNode::open_create_boot(raft_conf, Some(()), Some(()), initialize_cluster)
729-
.await?;
731+
let mn = MetaNode::open_create(raft_conf, Some(()), Some(())).await?;
732+
mn.init_cluster(conf.get_node()).await?;
730733
return Ok(mn);
731734
}
732735

733736
if !raft_conf.join.is_empty() {
734737
// Bring up a new node, join it into a cluster
735738

736-
let mn = MetaNode::open_create_boot(raft_conf, Some(()), Some(()), initialize_cluster)
737-
.await?;
739+
let mn = MetaNode::open_create(raft_conf, Some(()), Some(())).await?;
738740
return Ok(mn);
739741
}
740742
// open mode
741743

742-
let mn = MetaNode::open_create_boot(raft_conf, Some(()), None, initialize_cluster).await?;
744+
let mn = MetaNode::open_create(raft_conf, Some(()), None).await?;
743745
Ok(mn)
744746
}
745747

746748
/// Boot up the first node to create a cluster.
747749
/// For every cluster this func should be called exactly once.
748750
#[tracing::instrument(level = "debug", skip(config), fields(config_id=config.raft_config.config_id.as_str()))]
749751
pub async fn boot(config: &MetaConfig) -> Result<Arc<MetaNode>, MetaStartupError> {
750-
let mn =
751-
Self::open_create_boot(&config.raft_config, None, Some(()), Some(config.get_node()))
752-
.await?;
753-
752+
let mn = Self::open_create(&config.raft_config, None, Some(())).await?;
753+
mn.init_cluster(config.get_node()).await?;
754754
Ok(mn)
755755
}
756756

757-
// Initialized a single node cluster by:
758-
// - Initializing raft membership.
759-
// - Adding current node into the meta data.
757+
/// Initialized a single node cluster if this node is just created:
758+
/// - Initializing raft membership.
759+
/// - Adding current node into the meta data.
760760
#[tracing::instrument(level = "debug", skip(self))]
761761
pub async fn init_cluster(&self, node: Node) -> Result<(), MetaStartupError> {
762+
if self.is_opened() {
763+
info!("It is opened, skip initializing cluster");
764+
return Ok(());
765+
}
766+
762767
let node_id = self.sto.id;
763768

764769
let mut cluster_node_ids = BTreeSet::new();

src/meta/service/src/watcher/watcher_manager.rs

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use common_meta_types::protobuf::watch_request::FilterType;
2323
use common_meta_types::protobuf::Event;
2424
use common_meta_types::protobuf::WatchRequest;
2525
use common_meta_types::protobuf::WatchResponse;
26+
use common_meta_types::Change;
2627
use common_meta_types::PbSeqV;
27-
use common_meta_types::SeqV;
2828
use prost::Message;
2929
use tonic::Status;
3030
use tracing::info;
@@ -39,13 +39,6 @@ pub type WatcherStreamSender = Sender<Result<WatchResponse, Status>>;
3939

4040
type CreateWatcherEvent = (WatchRequest, WatcherStreamSender);
4141

42-
#[derive(Clone, Debug)]
43-
pub struct StateMachineKvData {
44-
pub key: String,
45-
pub prev: Option<SeqV>,
46-
pub current: Option<SeqV>,
47-
}
48-
4942
#[derive(Clone, Debug)]
5043
pub struct WatcherStateMachineSubscriber {
5144
event_tx: mpsc::UnboundedSender<WatcherEvent>,
@@ -54,7 +47,7 @@ pub struct WatcherStateMachineSubscriber {
5447
#[derive(Clone)]
5548
pub enum WatcherEvent {
5649
CreateWatcherEvent(CreateWatcherEvent),
57-
StateMachineKvDataEvent(StateMachineKvData),
50+
StateMachineKvDataEvent(Change<Vec<u8>, String>),
5851
}
5952

6053
#[derive(Debug)]
@@ -106,8 +99,8 @@ impl WatcherManagerCore {
10699
WatcherEvent::CreateWatcherEvent((req, tx)) => {
107100
self.create_watcher_stream(req, tx).await;
108101
}
109-
WatcherEvent::StateMachineKvDataEvent(kv) => {
110-
self.notify_event(kv).await;
102+
WatcherEvent::StateMachineKvDataEvent(kv_change) => {
103+
self.notify_event(kv_change).await;
111104
}
112105
}
113106
} else {
@@ -124,14 +117,15 @@ impl WatcherManagerCore {
124117
server_metrics::incr_watchers(-1);
125118
}
126119

127-
async fn notify_event(&mut self, kv: StateMachineKvData) {
128-
let set = self.watcher_range_map.get_by_point(&kv.key);
120+
async fn notify_event(&mut self, change: Change<Vec<u8>, String>) {
121+
let k = change.ident.as_ref().unwrap();
122+
let set = self.watcher_range_map.get_by_point(k);
129123
if set.is_empty() {
130124
return;
131125
}
132126

133-
let current = kv.current;
134-
let prev = kv.prev;
127+
let current = change.result;
128+
let prev = change.prev;
135129

136130
let is_delete_event = current.is_none();
137131
let mut remove_range_keys: Vec<RangeMapKey<String, WatcherId>> = vec![];
@@ -151,7 +145,7 @@ impl WatcherManagerCore {
151145
assert_eq!(stream.id, watcher_id);
152146
let resp = WatchResponse {
153147
event: Some(Event {
154-
key: kv.key.clone(),
148+
key: k.to_string(),
155149
current: current.clone().map(PbSeqV::from),
156150
prev: prev.clone().map(PbSeqV::from),
157151
}),
@@ -217,13 +211,9 @@ impl WatcherManagerCore {
217211
}
218212

219213
impl StateMachineSubscriber for WatcherStateMachineSubscriber {
220-
fn kv_changed(&self, key: &str, prev: Option<SeqV>, current: Option<SeqV>) {
214+
fn kv_changed(&self, change: Change<Vec<u8>, String>) {
221215
let _ = self
222216
.event_tx
223-
.send(WatcherEvent::StateMachineKvDataEvent(StateMachineKvData {
224-
key: key.to_string(),
225-
prev,
226-
current,
227-
}));
217+
.send(WatcherEvent::StateMachineKvDataEvent(change));
228218
}
229219
}

src/meta/service/tests/it/grpc/metasrv_grpc_kv_api_restart_cluster.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,9 @@ async fn test_kv_api_restart_cluster_token_expired() -> anyhow::Result<()> {
238238

239239
Ok(())
240240
}
241+
241242
// Election timeout is 8~12 sec.
242243
// A raft node waits for a interval of election timeout before starting election
243-
// TODO: the raft should set the wait time by election_timeout.
244-
// For now, just use a large timeout.
245244
fn timeout() -> Option<Duration> {
246245
Some(Duration::from_millis(30_000))
247246
}

0 commit comments

Comments
 (0)