Skip to content

Commit 83fc47b

Browse files
committed
refactor(meta/watcher): just use Change as event type
1 parent dbb84cf commit 83fc47b

File tree

3 files changed

+21
-40
lines changed

3 files changed

+21
-40
lines changed

src/meta/api/src/schema_api_impl.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -987,7 +987,6 @@ impl<KV: KVApi> SchemaApi for KV {
987987
if_then: vec![
988988
// Changing a table in a db has to update the seq of db_meta,
989989
// to block the batch-delete-tables when deleting a db.
990-
// TODO: test this when old metasrv is replaced with kv-txn based SchemaApi.
991990
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
992991
txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */
993992
txn_op_put(&tbid, serialize_struct(&req.table_meta)?), /* (tenant, db_id, tb_id) -> tb_meta */
@@ -1149,7 +1148,6 @@ impl<KV: KVApi> SchemaApi for KV {
11491148
if_then: vec![
11501149
// Changing a table in a db has to update the seq of db_meta,
11511150
// to block the batch-delete-tables when deleting a db.
1152-
// TODO: test this when old metasrv is replaced with kv-txn based SchemaApi.
11531151
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
11541152
txn_op_del(&dbid_tbname), // (db_id, tb_name) -> tb_id
11551153
txn_op_put(&tbid, serialize_struct(&tb_meta)?), /* (tenant, db_id, tb_id) -> tb_meta */
@@ -1307,7 +1305,6 @@ impl<KV: KVApi> SchemaApi for KV {
13071305
if_then: vec![
13081306
// Changing a table in a db has to update the seq of db_meta,
13091307
// to block the batch-delete-tables when deleting a db.
1310-
// TODO: test this when old metasrv is replaced with kv-txn based SchemaApi.
13111308
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
13121309
txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */
13131310
// txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list)?)?, // _fd_table_id_list/db_id/table_name -> tb_id_list
@@ -1498,7 +1495,6 @@ impl<KV: KVApi> SchemaApi for KV {
14981495
txn_op_put(&newdbid_newtbname, serialize_u64(table_id)?), /* (db_id, new_tb_name) -> tb_id */
14991496
// Changing a table in a db has to update the seq of db_meta,
15001497
// to block the batch-delete-tables when deleting a db.
1501-
// TODO: test this when old metasrv is replaced with kv-txn based SchemaApi.
15021498
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
15031499
txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/old_table_name -> tb_id_list */
15041500
txn_op_put(&new_dbid_tbname_idlist, serialize_struct(&new_tb_id_list)?), /* _fd_table_id_list/db_id/new_table_name -> tb_id_list */
@@ -1507,7 +1503,6 @@ impl<KV: KVApi> SchemaApi for KV {
15071503

15081504
if db_id != new_db_id {
15091505
then_ops.push(
1510-
// TODO: test this when old metasrv is replaced with kv-txn based SchemaApi.
15111506
txn_op_put(
15121507
&DatabaseId { db_id: new_db_id },
15131508
serialize_struct(&new_db_meta)?,

src/meta/raft-store/src/state_machine/sm.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,11 @@ const TREE_STATE_MACHINE: &str = "state_machine";
9595

9696
/// StateMachine subscriber trait
9797
pub trait StateMachineSubscriber: Debug + Sync + Send {
98-
fn kv_changed(&self, key: &str, prev: Option<SeqV>, current: Option<SeqV>);
98+
fn kv_changed(&self, change: Change<Vec<u8>, String>);
9999
}
100100

101101
/// The state machine of the `MemStore`.
102-
/// It includes user data and two raft-related informations:
102+
/// It includes user data and two raft-related information:
103103
/// `last_applied_logs` and `client_serial_responses` to achieve idempotence.
104104
#[derive(Debug)]
105105
pub struct StateMachine {
@@ -342,9 +342,7 @@ impl StateMachine {
342342
// Send queued change events to subscriber
343343
if let Some(subscriber) = &self.subscriber {
344344
for event in changes {
345-
// TODO: use Change as the event data type.
346-
// safe unwrap
347-
subscriber.kv_changed(&event.ident.unwrap(), event.prev, event.result);
345+
subscriber.kv_changed(event);
348346
}
349347
}
350348

@@ -638,17 +636,15 @@ impl StateMachine {
638636
if let Some(kv_pairs) = kv_pairs {
639637
if let Some(kv_pairs) = kv_pairs.get(delete_by_prefix) {
640638
for (key, _seq) in kv_pairs.iter() {
641-
// TODO: return StorageError
642-
let ret = Self::txn_upsert_kv(txn_tree, &UpsertKV::delete(key), log_time_ms);
639+
let (expired, prev, res) =
640+
Self::txn_upsert_kv(txn_tree, &UpsertKV::delete(key), log_time_ms)?;
643641

644-
if let Ok(ret) = ret {
645-
count += 1;
642+
count += 1;
646643

647-
if ret.0.is_some() {
648-
txn_tree.push_change(key, ret.0.clone(), None);
649-
}
650-
txn_tree.push_change(key, ret.1.clone(), ret.2);
644+
if expired.is_some() {
645+
txn_tree.push_change(key, expired, None);
651646
}
647+
txn_tree.push_change(key, prev, res);
652648
}
653649
}
654650
}

src/meta/service/src/watcher/watcher_manager.rs

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use common_meta_types::protobuf::watch_request::FilterType;
2323
use common_meta_types::protobuf::Event;
2424
use common_meta_types::protobuf::WatchRequest;
2525
use common_meta_types::protobuf::WatchResponse;
26+
use common_meta_types::Change;
2627
use common_meta_types::PbSeqV;
27-
use common_meta_types::SeqV;
2828
use prost::Message;
2929
use tonic::Status;
3030
use tracing::info;
@@ -39,13 +39,6 @@ pub type WatcherStreamSender = Sender<Result<WatchResponse, Status>>;
3939

4040
type CreateWatcherEvent = (WatchRequest, WatcherStreamSender);
4141

42-
#[derive(Clone, Debug)]
43-
pub struct StateMachineKvData {
44-
pub key: String,
45-
pub prev: Option<SeqV>,
46-
pub current: Option<SeqV>,
47-
}
48-
4942
#[derive(Clone, Debug)]
5043
pub struct WatcherStateMachineSubscriber {
5144
event_tx: mpsc::UnboundedSender<WatcherEvent>,
@@ -54,7 +47,7 @@ pub struct WatcherStateMachineSubscriber {
5447
#[derive(Clone)]
5548
pub enum WatcherEvent {
5649
CreateWatcherEvent(CreateWatcherEvent),
57-
StateMachineKvDataEvent(StateMachineKvData),
50+
StateMachineKvDataEvent(Change<Vec<u8>, String>),
5851
}
5952

6053
#[derive(Debug)]
@@ -106,8 +99,8 @@ impl WatcherManagerCore {
10699
WatcherEvent::CreateWatcherEvent((req, tx)) => {
107100
self.create_watcher_stream(req, tx).await;
108101
}
109-
WatcherEvent::StateMachineKvDataEvent(kv) => {
110-
self.notify_event(kv).await;
102+
WatcherEvent::StateMachineKvDataEvent(kv_change) => {
103+
self.notify_event(kv_change).await;
111104
}
112105
}
113106
} else {
@@ -124,14 +117,15 @@ impl WatcherManagerCore {
124117
server_metrics::incr_watchers(-1);
125118
}
126119

127-
async fn notify_event(&mut self, kv: StateMachineKvData) {
128-
let set = self.watcher_range_map.get_by_point(&kv.key);
120+
async fn notify_event(&mut self, change: Change<Vec<u8>, String>) {
121+
let k = change.ident.as_ref().unwrap();
122+
let set = self.watcher_range_map.get_by_point(k);
129123
if set.is_empty() {
130124
return;
131125
}
132126

133-
let current = kv.current;
134-
let prev = kv.prev;
127+
let current = change.result;
128+
let prev = change.prev;
135129

136130
let is_delete_event = current.is_none();
137131
let mut remove_range_keys: Vec<RangeMapKey<String, WatcherId>> = vec![];
@@ -151,7 +145,7 @@ impl WatcherManagerCore {
151145
assert_eq!(stream.id, watcher_id);
152146
let resp = WatchResponse {
153147
event: Some(Event {
154-
key: kv.key.clone(),
148+
key: k.to_string(),
155149
current: current.clone().map(PbSeqV::from),
156150
prev: prev.clone().map(PbSeqV::from),
157151
}),
@@ -217,13 +211,9 @@ impl WatcherManagerCore {
217211
}
218212

219213
impl StateMachineSubscriber for WatcherStateMachineSubscriber {
220-
fn kv_changed(&self, key: &str, prev: Option<SeqV>, current: Option<SeqV>) {
214+
fn kv_changed(&self, change: Change<Vec<u8>, String>) {
221215
let _ = self
222216
.event_tx
223-
.send(WatcherEvent::StateMachineKvDataEvent(StateMachineKvData {
224-
key: key.to_string(),
225-
prev,
226-
current,
227-
}));
217+
.send(WatcherEvent::StateMachineKvDataEvent(change));
228218
}
229219
}

0 commit comments

Comments
 (0)