Skip to content

Commit ed55dce

Browse files
authored
Merge branch 'main' into remove-meta-index
2 parents ff0805f + 70cef24 commit ed55dce

File tree

9 files changed

+209
-62
lines changed

9 files changed

+209
-62
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/metrics/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ prometheus-parse = "0.2.3"
2323
serde = { version = "1.0.137", features = ["derive"] }
2424
tracing = "0.1.35"
2525

26+
[dev-dependencies]
27+
anyhow = "1.0.58"
28+
2629
[dev-dependencies.tokio]
2730
default-features = false
2831
features = ["io-util", "net", "sync", "rt-multi-thread", "macros"]

src/common/metrics/src/counter.rs

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! This mod provides mechanism to track the count of active instances of some type `T`.
16+
//! The count is maintained by a `Count` implementation and will be increased or decreased when a wrapper of `T` `WithCounter` is created or dropped.
17+
//!
18+
//! Example:
19+
//!
20+
//! ```ignore
21+
//! struct Connection{}
22+
//! impl Connection {
23+
//! fn ping() {}
24+
//! }
25+
//!
26+
//! struct MyCounter{ identifier: String, }
27+
//! impl Count for MyCounter {/*...*/}
28+
//!
29+
//! {
30+
//! let conn = WithCounter::new(Connection{}, MyCounter{}); // increase count with `MyCounter`
31+
//! conn.ping();
32+
//! } // decrease count with `MyCounter`
33+
//! ```
34+
35+
use std::ops::Deref;
36+
use std::ops::DerefMut;
37+
38+
/// Defines how to report counter metrics.
39+
pub trait Count {
40+
fn incr_count(&mut self, n: i64);
41+
}
42+
43+
/// Binds a counter to a `T`.
44+
///
45+
/// It counts the number of instances of `T` with the provided counter `Count`.
46+
pub struct WithCount<C, T>
47+
where C: Count
48+
{
49+
counter: C,
50+
inner: T,
51+
}
52+
53+
impl<C, T> WithCount<C, T>
54+
where C: Count
55+
{
56+
pub fn new(t: T, counter: C) -> Self {
57+
let mut s = Self { counter, inner: t };
58+
s.counter.incr_count(1);
59+
s
60+
}
61+
62+
pub fn counter(&self) -> &C {
63+
&self.counter
64+
}
65+
}
66+
67+
/// When being dropped, decreases the count.
68+
impl<C, T> Drop for WithCount<C, T>
69+
where C: Count
70+
{
71+
fn drop(&mut self) {
72+
self.counter.incr_count(-1);
73+
}
74+
}
75+
76+
/// Let an app use `WithCount` the same as using `T`.
77+
impl<C, T> Deref for WithCount<C, T>
78+
where C: Count
79+
{
80+
type Target = T;
81+
82+
fn deref(&self) -> &Self::Target {
83+
&self.inner
84+
}
85+
}
86+
87+
/// Let an app use `WithCount` the same as using `T`.
88+
impl<C, T> DerefMut for WithCount<C, T>
89+
where C: Count
90+
{
91+
fn deref_mut(&mut self) -> &mut Self::Target {
92+
&mut self.inner
93+
}
94+
}
95+
96+
#[cfg(test)]
97+
mod tests {
98+
use std::sync::atomic::AtomicI64;
99+
use std::sync::atomic::Ordering;
100+
use std::sync::Arc;
101+
102+
use crate::counter::Count;
103+
use crate::counter::WithCount;
104+
105+
struct Foo {}
106+
struct Counter {
107+
n: Arc<AtomicI64>,
108+
}
109+
impl Count for Counter {
110+
fn incr_count(&mut self, n: i64) {
111+
self.n.fetch_add(n, Ordering::Relaxed);
112+
}
113+
}
114+
115+
#[test]
116+
fn test_with_count() -> anyhow::Result<()> {
117+
let count = Arc::new(AtomicI64::new(0));
118+
assert_eq!(0, count.load(Ordering::Relaxed));
119+
120+
{
121+
let _a = WithCount::new(Foo {}, Counter { n: count.clone() });
122+
assert_eq!(1, count.load(Ordering::Relaxed));
123+
{
124+
let _b = WithCount::new(Foo {}, Counter { n: count.clone() });
125+
assert_eq!(2, count.load(Ordering::Relaxed));
126+
}
127+
assert_eq!(1, count.load(Ordering::Relaxed));
128+
}
129+
assert_eq!(0, count.load(Ordering::Relaxed));
130+
Ok(())
131+
}
132+
}

src/common/metrics/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
pub mod counter;
1516
mod dump;
1617
mod recorder;
1718

src/meta/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ common-meta-client = { path = "../client" }
3434
common-meta-raft-store = { path = "../raft-store" }
3535
common-meta-sled-store = { path = "../sled-store" }
3636
common-meta-types = { path = "../types" }
37+
common-metrics = { path = "../../common/metrics" }
3738
common-tracing = { path = "../../common/tracing" }
3839

3940
# Github dependencies

src/meta/service/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub mod export;
2121
pub mod meta_service;
2222
pub mod metrics;
2323
pub mod network;
24+
pub mod raft_client;
2425
pub mod store;
2526
pub mod version;
2627
pub mod watcher;

src/meta/service/src/network.rs

Lines changed: 3 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::ops::Deref;
16-
use std::ops::DerefMut;
1715
use std::sync::Arc;
1816
use std::time::Duration;
1917
use std::time::Instant;
@@ -22,9 +20,7 @@ use common_base::containers::ItemManager;
2220
use common_base::containers::Pool;
2321
use common_meta_sled_store::openraft;
2422
use common_meta_sled_store::openraft::MessageSummary;
25-
use common_meta_types::protobuf::raft_service_client::RaftServiceClient;
2623
use common_meta_types::protobuf::RaftRequest;
27-
use common_meta_types::Endpoint;
2824
use common_meta_types::LogEntry;
2925
use common_meta_types::NodeId;
3026
use openraft::async_trait::async_trait;
@@ -40,14 +36,15 @@ use tonic::transport::channel::Channel;
4036
use tracing::debug;
4137
use tracing::info;
4238

43-
use crate::metrics::incr_meta_metrics_active_peers;
4439
use crate::metrics::incr_meta_metrics_fail_connections_to_peer;
4540
use crate::metrics::incr_meta_metrics_sent_bytes_to_peer;
4641
use crate::metrics::incr_meta_metrics_sent_failure_to_peer;
4742
use crate::metrics::incr_meta_metrics_snapshot_send_failures_to_peer;
4843
use crate::metrics::incr_meta_metrics_snapshot_send_inflights_to_peer;
4944
use crate::metrics::incr_meta_metrics_snapshot_send_success_to_peer;
5045
use crate::metrics::sample_meta_metrics_snapshot_sent;
46+
use crate::raft_client::RaftClient;
47+
use crate::raft_client::RaftClientApi;
5148
use crate::store::RaftStore;
5249

5350
struct ChannelManager {}
@@ -70,62 +67,6 @@ impl ItemManager for ChannelManager {
7067
}
7168
}
7269

73-
/// Client for raft protocol communication
74-
pub struct RaftClient {
75-
target: NodeId,
76-
endpoint: Endpoint,
77-
endpoint_str: String,
78-
inner: RaftServiceClient<Channel>,
79-
}
80-
81-
impl Deref for RaftClient {
82-
type Target = RaftServiceClient<Channel>;
83-
84-
fn deref(&self) -> &Self::Target {
85-
&self.inner
86-
}
87-
}
88-
89-
impl DerefMut for RaftClient {
90-
fn deref_mut(&mut self) -> &mut Self::Target {
91-
&mut self.inner
92-
}
93-
}
94-
95-
impl Drop for RaftClient {
96-
fn drop(&mut self) {
97-
incr_meta_metrics_active_peers(&self.target, &self.endpoint_str, -1);
98-
}
99-
}
100-
101-
impl RaftClient {
102-
pub fn new(target: NodeId, endpoint: Endpoint, channel: Channel) -> Self {
103-
let endpoint_str = endpoint.to_string();
104-
105-
debug!(
106-
"RaftClient::new: target: {} endpoint: {}",
107-
target, endpoint_str
108-
);
109-
110-
incr_meta_metrics_active_peers(&target, &endpoint_str, 1);
111-
112-
Self {
113-
target,
114-
endpoint,
115-
endpoint_str,
116-
inner: RaftServiceClient::new(channel),
117-
}
118-
}
119-
120-
pub fn endpoint(&self) -> &Endpoint {
121-
&self.endpoint
122-
}
123-
124-
pub fn endpoint_str(&self) -> &str {
125-
&self.endpoint_str
126-
}
127-
}
128-
12970
pub struct Network {
13071
sto: Arc<RaftStore>,
13172

@@ -150,7 +91,7 @@ impl Network {
15091

15192
match self.conn_pool.get(&addr).await {
15293
Ok(channel) => {
153-
let client = RaftClient::new(*target, endpoint, channel);
94+
let client = RaftClientApi::new(*target, endpoint, channel);
15495
debug!("connected: target={}: {}", target, addr);
15596

15697
Ok(client)

src/meta/service/src/raft_client.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_meta_types::protobuf::raft_service_client::RaftServiceClient;
16+
use common_meta_types::Endpoint;
17+
use common_meta_types::NodeId;
18+
use common_metrics::counter;
19+
use tonic::transport::channel::Channel;
20+
use tracing::debug;
21+
22+
use crate::metrics::incr_meta_metrics_active_peers;
23+
24+
/// A metrics reporter of active raft peers.
25+
pub struct PeerCounter {
26+
target: NodeId,
27+
endpoint: Endpoint,
28+
endpoint_str: String,
29+
}
30+
31+
impl counter::Count for PeerCounter {
32+
fn incr_count(&mut self, n: i64) {
33+
incr_meta_metrics_active_peers(&self.target, &self.endpoint_str, n)
34+
}
35+
}
36+
37+
/// RaftClient is a grpc client bound with a metrics reporter..
38+
pub type RaftClient = counter::WithCount<PeerCounter, RaftServiceClient<Channel>>;
39+
40+
/// Defines the API of the client to a raft node.
41+
pub trait RaftClientApi {
42+
fn new(target: NodeId, endpoint: Endpoint, channel: Channel) -> Self;
43+
fn endpoint(&self) -> &Endpoint;
44+
}
45+
46+
impl RaftClientApi for RaftClient {
47+
fn new(target: NodeId, endpoint: Endpoint, channel: Channel) -> Self {
48+
let endpoint_str = endpoint.to_string();
49+
50+
debug!(
51+
"RaftClient::new: target: {} endpoint: {}",
52+
target, endpoint_str
53+
);
54+
55+
counter::WithCount::new(RaftServiceClient::new(channel), PeerCounter {
56+
target,
57+
endpoint,
58+
endpoint_str,
59+
})
60+
}
61+
62+
fn endpoint(&self) -> &Endpoint {
63+
&self.counter().endpoint
64+
}
65+
}

src/meta/types/proto/meta.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,5 +188,6 @@ service MetaService {
188188
rpc MemberList(MemberListRequest) returns (MemberListReply);
189189

190190
// Respond with the information about the client.
191+
// Since: 2022-09-09 0.8.30
191192
rpc GetClientInfo(Empty) returns (ClientInfo);
192193
}

0 commit comments

Comments
 (0)