Skip to content

Commit 4dd4102

Browse files
committed
refactor(meta/watcher): remove WatcherManager, adjust type names
1 parent 7028adb commit 4dd4102

File tree

5 files changed

+52
-62
lines changed

5 files changed

+52
-62
lines changed

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,7 @@ impl MetaService for MetaServiceImpl {
239239
) -> Result<Response<Self::WatchStream>, Status> {
240240
let (tx, rx) = mpsc::channel(4);
241241

242-
let meta_node = &self.meta_node;
243-
meta_node.create_watcher_stream(request.into_inner(), tx);
242+
self.meta_node.add_watcher(request.into_inner(), tx);
244243

245244
let output_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
246245
Ok(Response::new(Box::pin(output_stream) as Self::WatchStream))

src/meta/service/src/meta_service/raftmeta.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::time::Duration;
2121

2222
use anyerror::AnyError;
2323
use common_base::base::tokio;
24+
use common_base::base::tokio::sync::mpsc;
2425
use common_base::base::tokio::sync::watch;
2526
use common_base::base::tokio::sync::watch::error::RecvError;
2627
use common_base::base::tokio::sync::Mutex;
@@ -83,8 +84,10 @@ use crate::metrics::server_metrics;
8384
use crate::network::Network;
8485
use crate::store::RaftStore;
8586
use crate::store::RaftStoreBare;
86-
use crate::watcher::WatcherManager;
87-
use crate::watcher::WatcherStreamSender;
87+
use crate::watcher::DispatcherSender;
88+
use crate::watcher::EventDispatcher;
89+
use crate::watcher::WatchEvent;
90+
use crate::watcher::WatcherSender;
8891
use crate::Opened;
8992

9093
#[derive(serde::Serialize)]
@@ -123,7 +126,7 @@ pub type MetaRaft = Raft<LogEntry, AppliedState, Network, RaftStore>;
123126
// MetaNode is the container of meta data related components and threads, such as storage, the raft node and a raft-state monitor.
124127
pub struct MetaNode {
125128
pub sto: Arc<RaftStore>,
126-
pub watcher: WatcherManager,
129+
pub dispatcher_tx: mpsc::UnboundedSender<WatchEvent>,
127130
pub raft: MetaRaft,
128131
pub running_tx: watch::Sender<()>,
129132
pub running_rx: watch::Receiver<()>,
@@ -168,15 +171,15 @@ impl MetaNodeBuilder {
168171

169172
let (tx, rx) = watch::channel::<()>(());
170173

171-
let watcher = WatcherManager::create();
174+
let dispatcher_tx = EventDispatcher::spawn();
172175

173176
sto.get_state_machine()
174177
.await
175-
.set_subscriber(Box::new(watcher.subscriber.clone()));
178+
.set_subscriber(Box::new(DispatcherSender(dispatcher_tx.clone())));
176179

177180
let mn = Arc::new(MetaNode {
178181
sto: sto.clone(),
179-
watcher,
182+
dispatcher_tx,
180183
raft,
181184
running_tx: tx,
182185
running_rx: rx,
@@ -1082,7 +1085,10 @@ impl MetaNode {
10821085
Ok(resp)
10831086
}
10841087

1085-
pub fn create_watcher_stream(&self, request: WatchRequest, tx: WatcherStreamSender) {
1086-
self.watcher.create_watcher_stream(request, tx)
1088+
pub fn add_watcher(&self, request: WatchRequest, tx: WatcherSender) {
1089+
// TODO: handle error?
1090+
let _ = self
1091+
.dispatcher_tx
1092+
.send(WatchEvent::AddWatcher((request, tx)));
10871093
}
10881094
}

src/meta/service/src/watcher/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
mod watcher_manager;
1616
mod watcher_stream;
1717

18-
pub use watcher_manager::WatcherEvent;
18+
pub(crate) use watcher_manager::DispatcherSender;
19+
pub(crate) use watcher_manager::EventDispatcher;
20+
pub use watcher_manager::WatchEvent;
1921
pub use watcher_manager::WatcherId;
20-
pub use watcher_manager::WatcherManager;
21-
pub use watcher_manager::WatcherStreamSender;
22+
pub use watcher_manager::WatcherSender;
2223
pub use watcher_stream::WatcherStream;

src/meta/service/src/watcher/watcher_manager.rs

Lines changed: 30 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -35,76 +35,60 @@ use crate::metrics::network_metrics;
3535
use crate::metrics::server_metrics;
3636

3737
pub type WatcherId = i64;
38-
pub type WatcherStreamSender = Sender<Result<WatchResponse, Status>>;
3938

40-
type CreateWatcherEvent = (WatchRequest, WatcherStreamSender);
39+
/// A sender for dispatcher to send event to interested watchers.
40+
pub type WatcherSender = Sender<Result<WatchResponse, Status>>;
4141

42+
/// A sender for event source, such as raft state machine, to send event to [`EventDispatcher`].
4243
#[derive(Clone, Debug)]
43-
pub struct WatcherStateMachineSubscriber {
44-
event_tx: mpsc::UnboundedSender<WatcherEvent>,
45-
}
44+
pub(crate) struct DispatcherSender(pub(crate) mpsc::UnboundedSender<WatchEvent>);
4645

4746
#[derive(Clone)]
48-
pub enum WatcherEvent {
49-
CreateWatcherEvent(CreateWatcherEvent),
50-
StateMachineKvDataEvent(Change<Vec<u8>, String>),
51-
}
52-
53-
#[derive(Debug)]
54-
pub struct WatcherManager {
55-
event_tx: mpsc::UnboundedSender<WatcherEvent>,
56-
57-
pub subscriber: WatcherStateMachineSubscriber,
47+
pub enum WatchEvent {
48+
AddWatcher((WatchRequest, WatcherSender)),
49+
KVChange(Change<Vec<u8>, String>),
5850
}
5951

60-
struct WatcherManagerCore {
61-
event_rx: mpsc::UnboundedReceiver<WatcherEvent>,
52+
/// Receives events from event sources, dispatches them to interested watchers.
53+
pub(crate) struct EventDispatcher {
54+
event_rx: mpsc::UnboundedReceiver<WatchEvent>,
6255

6356
/// map range to WatcherId
6457
watcher_range_map: RangeMap<String, WatcherId, WatcherStream>,
6558

6659
current_watcher_id: WatcherId,
6760
}
6861

69-
impl WatcherManager {
70-
pub fn create() -> Self {
62+
impl EventDispatcher {
63+
/// Spawn a dispatcher loop task.
64+
pub(crate) fn spawn() -> mpsc::UnboundedSender<WatchEvent> {
7165
let (event_tx, event_rx) = mpsc::unbounded_channel();
7266

73-
let core = WatcherManagerCore {
67+
let dispatcher = EventDispatcher {
7468
event_rx,
7569
watcher_range_map: RangeMap::new(),
7670
current_watcher_id: 1,
7771
};
7872

79-
let _h = tokio::spawn(core.watcher_manager_main());
73+
let _h = tokio::spawn(dispatcher.main());
8074

81-
WatcherManager {
82-
event_tx: event_tx.clone(),
83-
subscriber: WatcherStateMachineSubscriber { event_tx },
84-
}
85-
}
86-
87-
pub fn create_watcher_stream(&self, request: WatchRequest, tx: WatcherStreamSender) {
88-
let create: CreateWatcherEvent = (request, tx);
89-
let _ = self.event_tx.send(WatcherEvent::CreateWatcherEvent(create));
75+
event_tx
9076
}
91-
}
9277

93-
impl WatcherManagerCore {
9478
#[tracing::instrument(level = "trace", skip(self))]
95-
async fn watcher_manager_main(mut self) {
79+
async fn main(mut self) {
9680
loop {
9781
if let Some(event) = self.event_rx.recv().await {
9882
match event {
99-
WatcherEvent::CreateWatcherEvent((req, tx)) => {
100-
self.create_watcher_stream(req, tx).await;
83+
WatchEvent::AddWatcher((req, tx)) => {
84+
self.add_watcher(req, tx).await;
10185
}
102-
WatcherEvent::StateMachineKvDataEvent(kv_change) => {
103-
self.notify_event(kv_change).await;
86+
WatchEvent::KVChange(kv_change) => {
87+
self.dispatch_event(kv_change).await;
10488
}
10589
}
10690
} else {
107-
info!("watcher manager has been shutdown");
91+
info!("all event senders are closed. quit.");
10892
break;
10993
}
11094
}
@@ -117,7 +101,8 @@ impl WatcherManagerCore {
117101
server_metrics::incr_watchers(-1);
118102
}
119103

120-
async fn notify_event(&mut self, change: Change<Vec<u8>, String>) {
104+
/// Dispatch a kv change event to interested watchers.
105+
async fn dispatch_event(&mut self, change: Change<Vec<u8>, String>) {
121106
let k = change.ident.as_ref().unwrap();
122107
let set = self.watcher_range_map.get_by_point(k);
123108
if set.is_empty() {
@@ -165,23 +150,24 @@ impl WatcherManagerCore {
165150
};
166151
}
167152

153+
// TODO: when a watcher stream is dropped, send a event to remove the watcher explicitly
168154
for range_key in remove_range_keys {
169155
self.close_stream(range_key);
170156
}
171157
}
172158

173159
#[tracing::instrument(level = "debug", skip(self))]
174-
pub async fn create_watcher_stream(&mut self, create: WatchRequest, tx: WatcherStreamSender) {
160+
pub async fn add_watcher(&mut self, create: WatchRequest, tx: WatcherSender) {
175161
info!("create_watcher_stream: {:?}", create);
176162

177-
let range = match WatcherManagerCore::get_range_key(create.key.clone(), &create.key_end) {
163+
let range = match EventDispatcher::get_range_key(create.key.clone(), &create.key_end) {
178164
Ok(range) => range,
179165
Err(_) => return,
180166
};
181167

182168
self.current_watcher_id += 1;
183169
let watcher_id = self.current_watcher_id;
184-
let filter = create.filter_type();
170+
let filter: FilterType = create.filter_type();
185171

186172
let watcher_stream = WatcherStream::new(
187173
watcher_id,
@@ -210,10 +196,8 @@ impl WatcherManagerCore {
210196
}
211197
}
212198

213-
impl StateMachineSubscriber for WatcherStateMachineSubscriber {
199+
impl StateMachineSubscriber for DispatcherSender {
214200
fn kv_changed(&self, change: Change<Vec<u8>, String>) {
215-
let _ = self
216-
.event_tx
217-
.send(WatcherEvent::StateMachineKvDataEvent(change));
201+
let _ = self.0.send(WatchEvent::KVChange(change));
218202
}
219203
}

src/meta/service/src/watcher/watcher_stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ use common_meta_types::protobuf::WatchResponse;
1818
use tonic::Status;
1919

2020
use super::WatcherId;
21-
use super::WatcherStreamSender;
21+
use super::WatcherSender;
2222

2323
pub struct WatcherStream {
2424
pub id: WatcherId,
2525

2626
pub filter_type: FilterType,
2727

28-
tx: WatcherStreamSender,
28+
tx: WatcherSender,
2929

3030
pub key: String,
3131

@@ -36,7 +36,7 @@ impl WatcherStream {
3636
pub fn new(
3737
id: WatcherId,
3838
filter_type: FilterType,
39-
tx: WatcherStreamSender,
39+
tx: WatcherSender,
4040
key: String,
4141
key_end: String,
4242
) -> Self {

0 commit comments

Comments
 (0)