Skip to content

Commit da0dfaa

Browse files
authored
chore: refine meta-service (#16229)
1 parent cbb548e commit da0dfaa

File tree

3 files changed

+116
-112
lines changed

3 files changed

+116
-112
lines changed

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 35 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::BTreeMap;
1615
use std::collections::BTreeSet;
1716
use std::net::Ipv4Addr;
1817
use std::sync::atomic::AtomicI32;
@@ -32,7 +31,6 @@ use databend_common_grpc::DNSResolver;
3231
use databend_common_meta_client::reply_to_api_result;
3332
use databend_common_meta_client::RequestFor;
3433
use databend_common_meta_raft_store::config::RaftConfig;
35-
use databend_common_meta_raft_store::ondisk::DataVersion;
3634
use databend_common_meta_raft_store::ondisk::DATA_VERSION;
3735
use databend_common_meta_sled_store::openraft;
3836
use databend_common_meta_sled_store::openraft::ChangeMembers;
@@ -83,6 +81,7 @@ use crate::message::LeaveRequest;
8381
use crate::meta_service::errors::grpc_error_to_network_err;
8482
use crate::meta_service::forwarder::MetaForwarder;
8583
use crate::meta_service::meta_leader::MetaLeader;
84+
use crate::meta_service::meta_node_status::MetaNodeStatus;
8685
use crate::meta_service::RaftServiceImpl;
8786
use crate::metrics::server_metrics;
8887
use crate::network::NetworkFactory;
@@ -97,66 +96,6 @@ use crate::watcher::Watcher;
9796
use crate::watcher::WatcherSender;
9897
use crate::Opened;
9998

100-
#[derive(serde::Serialize)]
101-
pub struct MetaNodeStatus {
102-
pub id: NodeId,
103-
104-
/// The build version of meta-service binary.
105-
pub binary_version: String,
106-
107-
/// The version of the data this meta-service is serving.
108-
pub data_version: DataVersion,
109-
110-
/// The raft service endpoint for internal communication
111-
pub endpoint: String,
112-
113-
/// The size in bytes of the on disk data.
114-
pub db_size: u64,
115-
116-
/// key number of current snapshot
117-
pub key_num: u64,
118-
119-
/// Server state, one of "Follower", "Learner", "Candidate", "Leader".
120-
pub state: String,
121-
122-
/// Is this node a leader.
123-
pub is_leader: bool,
124-
125-
/// Current term.
126-
pub current_term: u64,
127-
128-
/// Last received log index
129-
pub last_log_index: u64,
130-
131-
/// Last log id that has been committed and applied to state machine.
132-
pub last_applied: LogId,
133-
134-
/// The last log id contained in the last built snapshot.
135-
pub snapshot_last_log_id: Option<LogId>,
136-
137-
/// The last log id that has been purged, inclusive.
138-
pub purged: Option<LogId>,
139-
140-
/// The last known leader node.
141-
pub leader: Option<Node>,
142-
143-
/// The replication state of all nodes.
144-
///
145-
/// Only leader node has non-None data for this field, i.e., `is_leader` is true.
146-
pub replication: Option<BTreeMap<NodeId, Option<LogId>>>,
147-
148-
/// Nodes that can vote in election can grant replication.
149-
pub voters: Vec<Node>,
150-
151-
/// Also known as `learner`s.
152-
pub non_voters: Vec<Node>,
153-
154-
/// The last `seq` used by GenericKV sub tree.
155-
///
156-
/// `seq` is a monotonically incremental integer for every value that is inserted or updated.
157-
pub last_seq: u64,
158-
}
159-
16099
pub type LogStore = RaftStore;
161100
pub type SMStore = RaftStore;
162101

@@ -184,8 +123,7 @@ pub struct MetaNodeBuilder {
184123
node_id: Option<NodeId>,
185124
raft_config: Option<Config>,
186125
sto: Option<RaftStore>,
187-
monitor_metrics: bool,
188-
endpoint: Option<Endpoint>,
126+
raft_service_endpoint: Option<Endpoint>,
189127
}
190128

191129
impl MetaNodeBuilder {
@@ -212,7 +150,6 @@ impl MetaNodeBuilder {
212150
let raft = MetaRaft::new(node_id, Arc::new(config), net, log_store, sm_store)
213151
.await
214152
.map_err(|e| MetaStartupError::MetaServiceError(e.to_string()))?;
215-
let metrics_rx = raft.metrics();
216153

217154
let (tx, rx) = watch::channel::<()>(());
218155

@@ -222,22 +159,19 @@ impl MetaNodeBuilder {
222159
.await
223160
.set_subscriber(Box::new(DispatcherSender(dispatcher_tx.clone())));
224161

225-
let mn = Arc::new(MetaNode {
162+
let meta_node = Arc::new(MetaNode {
226163
sto: sto.clone(),
227164
dispatcher_handle: EventDispatcherHandle::new(dispatcher_tx),
228-
raft,
165+
raft: raft.clone(),
229166
running_tx: tx,
230167
running_rx: rx,
231168
join_handles: Mutex::new(Vec::new()),
232169
joined_tasks: AtomicI32::new(1),
233170
});
234171

235-
if self.monitor_metrics {
236-
info!("about to subscribe raft metrics");
237-
MetaNode::subscribe_metrics(mn.clone(), metrics_rx).await;
238-
}
172+
MetaNode::subscribe_metrics(meta_node.clone(), raft.metrics()).await;
239173

240-
let endpoint = if let Some(a) = self.endpoint.take() {
174+
let endpoint = if let Some(a) = self.raft_service_endpoint.take() {
241175
a
242176
} else {
243177
sto.get_node_raft_endpoint(&node_id).await.map_err(|e| {
@@ -248,11 +182,9 @@ impl MetaNodeBuilder {
248182
})?
249183
};
250184

251-
info!("about to start raft grpc on endpoint {}", endpoint);
185+
MetaNode::start_raft_service(meta_node.clone(), &endpoint).await?;
252186

253-
MetaNode::start_grpc(mn.clone(), endpoint.addr(), endpoint.port()).await?;
254-
255-
Ok(mn)
187+
Ok(meta_node)
256188
}
257189

258190
#[must_use]
@@ -268,14 +200,8 @@ impl MetaNodeBuilder {
268200
}
269201

270202
#[must_use]
271-
pub fn endpoint(mut self, a: Endpoint) -> Self {
272-
self.endpoint = Some(a);
273-
self
274-
}
275-
276-
#[must_use]
277-
pub fn monitor_metrics(mut self, b: bool) -> Self {
278-
self.monitor_metrics = b;
203+
pub fn raft_service_endpoint(mut self, endpoint: Endpoint) -> Self {
204+
self.raft_service_endpoint = Some(endpoint);
279205
self
280206
}
281207
}
@@ -288,8 +214,7 @@ impl MetaNode {
288214
node_id: None,
289215
raft_config: Some(raft_config),
290216
sto: None,
291-
monitor_metrics: true,
292-
endpoint: None,
217+
raft_service_endpoint: None,
293218
}
294219
}
295220

@@ -315,20 +240,24 @@ impl MetaNode {
315240

316241
/// Start the grpc service for raft communication and meta operation API.
317242
#[fastrace::trace]
318-
pub async fn start_grpc(
319-
mn: Arc<MetaNode>,
320-
host: &str,
321-
port: u16,
243+
pub async fn start_raft_service(
244+
meta_node: Arc<MetaNode>,
245+
endpoint: &Endpoint,
322246
) -> Result<(), MetaNetworkError> {
323-
let mut rx = mn.running_rx.clone();
247+
info!("Start raft service listening on: {}", endpoint);
248+
249+
let host = endpoint.addr();
250+
let port = endpoint.port();
251+
252+
let mut running_rx = meta_node.running_rx.clone();
324253

325-
let meta_srv_impl = RaftServiceImpl::create(mn.clone());
326-
let meta_srv = RaftServiceServer::new(meta_srv_impl)
254+
let raft_service_impl = RaftServiceImpl::create(meta_node.clone());
255+
let raft_server = RaftServiceServer::new(raft_service_impl)
327256
.max_decoding_message_size(GrpcConfig::MAX_DECODING_SIZE)
328257
.max_encoding_message_size(GrpcConfig::MAX_ENCODING_SIZE);
329258

330259
let ipv4_addr = host.parse::<Ipv4Addr>();
331-
let addr = match ipv4_addr {
260+
let ip_port = match ipv4_addr {
332261
Ok(addr) => format!("{}:{}", addr, port),
333262
Err(_) => {
334263
let resolver = DNSResolver::instance().map_err(|e| {
@@ -347,37 +276,30 @@ impl MetaNode {
347276
}
348277
};
349278

350-
info!("about to start raft grpc on resolved addr {}", addr);
279+
info!("about to start raft grpc on: {}", ip_port);
351280

352-
let addr_str = addr.to_string();
353-
let ret = addr.parse::<std::net::SocketAddr>();
354-
let addr = match ret {
355-
Ok(addr) => addr,
356-
Err(e) => {
357-
return Err(e.into());
358-
}
359-
};
360-
let node_id = mn.sto.id;
281+
let socket_addr = ip_port.parse::<std::net::SocketAddr>()?;
282+
let node_id = meta_node.sto.id;
361283

362-
let srv = tonic::transport::Server::builder().add_service(meta_srv);
284+
let srv = tonic::transport::Server::builder().add_service(raft_server);
363285

364286
let h = databend_common_base::runtime::spawn(async move {
365-
srv.serve_with_shutdown(addr, async move {
366-
let _ = rx.changed().await;
287+
srv.serve_with_shutdown(socket_addr, async move {
288+
let _ = running_rx.changed().await;
367289
info!(
368290
"signal received, shutting down: id={} {} ",
369-
node_id, addr_str
291+
node_id, ip_port
370292
);
371293
})
372294
.await
373295
.map_err(|e| {
374-
AnyError::new(&e).add_context(|| "when serving meta-service grpc service")
296+
AnyError::new(&e).add_context(|| "when serving meta-service raft service")
375297
})?;
376298

377299
Ok::<(), AnyError>(())
378300
});
379301

380-
let mut jh = mn.join_handles.lock().await;
302+
let mut jh = meta_node.join_handles.lock().await;
381303
jh.push(h);
382304
Ok(())
383305
}
@@ -415,7 +337,7 @@ impl MetaNode {
415337
let builder = MetaNode::builder(&config)
416338
.sto(sto.clone())
417339
.node_id(self_node_id)
418-
.endpoint(config.raft_api_listen_host_endpoint());
340+
.raft_service_endpoint(config.raft_api_listen_host_endpoint());
419341
let mn = builder.build().await?;
420342

421343
info!("MetaNode started: {:?}", config);
@@ -489,6 +411,7 @@ impl MetaNode {
489411

490412
/// Spawn a monitor to watch raft state changes and report metrics changes.
491413
pub async fn subscribe_metrics(mn: Arc<Self>, mut metrics_rx: watch::Receiver<RaftMetrics>) {
414+
info!("Start a task subscribing raft metrics and forward to metrics API");
492415
let meta_node = mn.clone();
493416

494417
let fut = async move {
@@ -971,7 +894,7 @@ impl MetaNode {
971894

972895
#[fastrace::trace]
973896
pub async fn get_grpc_advertise_addrs(&self) -> Vec<String> {
974-
// inconsistent get: from local state machine
897+
// Maybe stale get: from local state machine
975898

976899
let nodes = {
977900
let sm = self.sto.state_machine.read().await;
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BTreeMap;
16+
17+
use databend_common_meta_raft_store::ondisk::DataVersion;
18+
use databend_common_meta_types::LogId;
19+
use databend_common_meta_types::Node;
20+
use databend_common_meta_types::NodeId;
21+
22+
#[derive(serde::Serialize)]
23+
pub struct MetaNodeStatus {
24+
pub id: NodeId,
25+
26+
/// The build version of meta-service binary.
27+
pub binary_version: String,
28+
29+
/// The version of the data this meta-service is serving.
30+
pub data_version: DataVersion,
31+
32+
/// The raft service endpoint for internal communication
33+
pub endpoint: String,
34+
35+
/// The size in bytes of the on disk data.
36+
pub db_size: u64,
37+
38+
/// key number of current snapshot
39+
pub key_num: u64,
40+
41+
/// Server state, one of "Follower", "Learner", "Candidate", "Leader".
42+
pub state: String,
43+
44+
/// Is this node a leader.
45+
pub is_leader: bool,
46+
47+
/// Current term.
48+
pub current_term: u64,
49+
50+
/// Last received log index
51+
pub last_log_index: u64,
52+
53+
/// Last log id that has been committed and applied to state machine.
54+
pub last_applied: LogId,
55+
56+
/// The last log id contained in the last built snapshot.
57+
pub snapshot_last_log_id: Option<LogId>,
58+
59+
/// The last log id that has been purged, inclusive.
60+
pub purged: Option<LogId>,
61+
62+
/// The last known leader node.
63+
pub leader: Option<Node>,
64+
65+
/// The replication state of all nodes.
66+
///
67+
/// Only leader node has non-None data for this field, i.e., `is_leader` is true.
68+
pub replication: Option<BTreeMap<NodeId, Option<LogId>>>,
69+
70+
/// Nodes that can vote in election can grant replication.
71+
pub voters: Vec<Node>,
72+
73+
/// Also known as `learner`s.
74+
pub non_voters: Vec<Node>,
75+
76+
/// The last `seq` used by GenericKV sub tree.
77+
///
78+
/// `seq` is a monotonically incremental integer for every value that is inserted or updated.
79+
pub last_seq: u64,
80+
}

src/meta/service/src/meta_service/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub(crate) mod snapshot_receiver_v1;
2020

2121
pub mod meta_leader;
2222
pub mod meta_node;
23+
pub mod meta_node_status;
2324
pub mod raft_service_impl;
2425

2526
pub use forwarder::MetaForwarder;

0 commit comments

Comments
 (0)