@@ -22,9 +22,7 @@ use common_base::base::tokio::sync::mpsc;
22
22
use common_grpc:: GrpcClaim ;
23
23
use common_grpc:: GrpcToken ;
24
24
use common_meta_api:: KVApi ;
25
- use common_meta_client:: MetaGrpcReadReq ;
26
25
use common_meta_client:: MetaGrpcReq ;
27
- use common_meta_client:: MetaGrpcWriteReq ;
28
26
use common_meta_types:: protobuf:: meta_service_server:: MetaService ;
29
27
use common_meta_types:: protobuf:: ClientInfo ;
30
28
use common_meta_types:: protobuf:: Empty ;
@@ -39,6 +37,7 @@ use common_meta_types::protobuf::WatchRequest;
39
37
use common_meta_types:: protobuf:: WatchResponse ;
40
38
use common_meta_types:: TxnReply ;
41
39
use common_meta_types:: TxnRequest ;
40
+ use common_metrics:: counter:: WithCount ;
42
41
use futures:: StreamExt ;
43
42
use prost:: Message ;
44
43
use tokio_stream;
@@ -52,10 +51,10 @@ use tracing::info;
52
51
53
52
use crate :: meta_service:: meta_service_impl:: GrpcStream ;
54
53
use crate :: meta_service:: MetaNode ;
55
- use crate :: metrics:: add_meta_metrics_meta_request_inflights;
56
54
use crate :: metrics:: incr_meta_metrics_meta_recv_bytes;
57
55
use crate :: metrics:: incr_meta_metrics_meta_request_result;
58
56
use crate :: metrics:: incr_meta_metrics_meta_sent_bytes;
57
+ use crate :: metrics:: RequestInFlight ;
59
58
use crate :: version:: from_digit_ver;
60
59
use crate :: version:: to_digit_ver;
61
60
use crate :: version:: METASRV_SEMVER ;
@@ -87,34 +86,7 @@ impl MetaServiceImpl {
87
86
Ok ( claim)
88
87
}
89
88
90
- pub async fn execute_kv_req ( & self , req : MetaGrpcReq ) -> RaftReply {
91
- // To keep the code IDE-friendly, we manually expand the enum variants and dispatch them one by one
92
-
93
- match req {
94
- MetaGrpcReq :: UpsertKV ( a) => {
95
- let res = self . meta_node . upsert_kv ( a) . await ;
96
- incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
97
- RaftReply :: from ( res)
98
- }
99
- MetaGrpcReq :: GetKV ( a) => {
100
- let res = self . meta_node . get_kv ( & a. key ) . await ;
101
- incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
102
- RaftReply :: from ( res)
103
- }
104
- MetaGrpcReq :: MGetKV ( a) => {
105
- let res = self . meta_node . mget_kv ( & a. keys ) . await ;
106
- incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
107
- RaftReply :: from ( res)
108
- }
109
- MetaGrpcReq :: ListKV ( a) => {
110
- let res = self . meta_node . prefix_list_kv ( & a. prefix ) . await ;
111
- incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
112
- RaftReply :: from ( res)
113
- }
114
- }
115
- }
116
-
117
- pub async fn execute_txn ( & self , req : TxnRequest ) -> TxnReply {
89
+ async fn execute_txn ( & self , req : TxnRequest ) -> TxnReply {
118
90
let ret = self . meta_node . transaction ( req) . await ;
119
91
incr_meta_metrics_meta_request_result ( ret. is_ok ( ) ) ;
120
92
@@ -188,51 +160,48 @@ impl MetaService for MetaServiceImpl {
188
160
}
189
161
}
190
162
191
- async fn write_msg (
192
- & self ,
193
- request : Request < RaftRequest > ,
194
- ) -> Result < Response < RaftReply > , Status > {
195
- self . check_token ( request. metadata ( ) ) ?;
196
- common_tracing:: extract_remote_span_as_parent ( & request) ;
197
-
198
- incr_meta_metrics_meta_recv_bytes ( request. get_ref ( ) . encoded_len ( ) as u64 ) ;
199
-
200
- let req: MetaGrpcWriteReq = request. try_into ( ) ?;
201
- let req: MetaGrpcReq = req. into ( ) ;
202
-
203
- add_meta_metrics_meta_request_inflights ( 1 ) ;
204
-
205
- info ! ( "Receive write_action: {:?}" , req) ;
206
-
207
- let body = self . execute_kv_req ( req) . await ;
208
-
209
- add_meta_metrics_meta_request_inflights ( -1 ) ;
210
-
211
- incr_meta_metrics_meta_sent_bytes ( body. encoded_len ( ) as u64 ) ;
212
-
213
- Ok ( Response :: new ( body) )
163
+ async fn write_msg ( & self , r : Request < RaftRequest > ) -> Result < Response < RaftReply > , Status > {
164
+ self . kv_api ( r) . await
214
165
}
215
166
216
- async fn read_msg ( & self , request : Request < RaftRequest > ) -> Result < Response < RaftReply > , Status > {
217
- self . check_token ( request. metadata ( ) ) ?;
218
- common_tracing:: extract_remote_span_as_parent ( & request) ;
219
-
220
- incr_meta_metrics_meta_recv_bytes ( request. get_ref ( ) . encoded_len ( ) as u64 ) ;
221
-
222
- let req: MetaGrpcReadReq = request. try_into ( ) ?;
223
- let req: MetaGrpcReq = req. into ( ) ;
167
+ async fn read_msg ( & self , r : Request < RaftRequest > ) -> Result < Response < RaftReply > , Status > {
168
+ self . kv_api ( r) . await
169
+ }
224
170
225
- add_meta_metrics_meta_request_inflights ( 1 ) ;
171
+ async fn kv_api ( & self , r : Request < RaftRequest > ) -> Result < Response < RaftReply > , Status > {
172
+ let _guard = WithCount :: new ( ( ) , RequestInFlight ) ;
226
173
227
- info ! ( "Receive read_action: {:?}" , req) ;
174
+ self . check_token ( r. metadata ( ) ) ?;
175
+ common_tracing:: extract_remote_span_as_parent ( & r) ;
176
+ incr_meta_metrics_meta_recv_bytes ( r. get_ref ( ) . encoded_len ( ) as u64 ) ;
228
177
229
- let res = self . execute_kv_req ( req) . await ;
178
+ let req: MetaGrpcReq = r. try_into ( ) ?;
179
+ info ! ( "Received MetaGrpcReq: {:?}" , req) ;
230
180
231
- add_meta_metrics_meta_request_inflights ( -1 ) ;
181
+ let m = & self . meta_node ;
182
+ let reply = match req {
183
+ MetaGrpcReq :: UpsertKV ( a) => {
184
+ let res = m. upsert_kv ( a) . await ;
185
+ RaftReply :: from ( res)
186
+ }
187
+ MetaGrpcReq :: GetKV ( a) => {
188
+ let res = m. get_kv ( & a. key ) . await ;
189
+ RaftReply :: from ( res)
190
+ }
191
+ MetaGrpcReq :: MGetKV ( a) => {
192
+ let res = m. mget_kv ( & a. keys ) . await ;
193
+ RaftReply :: from ( res)
194
+ }
195
+ MetaGrpcReq :: ListKV ( a) => {
196
+ let res = m. prefix_list_kv ( & a. prefix ) . await ;
197
+ RaftReply :: from ( res)
198
+ }
199
+ } ;
232
200
233
- incr_meta_metrics_meta_sent_bytes ( res. encoded_len ( ) as u64 ) ;
201
+ incr_meta_metrics_meta_request_result ( reply. error . is_empty ( ) ) ;
202
+ incr_meta_metrics_meta_sent_bytes ( reply. encoded_len ( ) as u64 ) ;
234
203
235
- Ok ( Response :: new ( res ) )
204
+ Ok ( Response :: new ( reply ) )
236
205
}
237
206
238
207
type ExportStream =
@@ -246,12 +215,12 @@ impl MetaService for MetaServiceImpl {
246
215
& self ,
247
216
_request : Request < common_meta_types:: protobuf:: Empty > ,
248
217
) -> Result < Response < Self :: ExportStream > , Status > {
249
- let meta_node = & self . meta_node ;
218
+ let _guard = WithCount :: new ( ( ) , RequestInFlight ) ;
250
219
220
+ let meta_node = & self . meta_node ;
251
221
let res = meta_node. sto . export ( ) . await ?;
252
222
253
223
let stream = ExportStream { data : res } ;
254
-
255
224
let s = stream. map ( |strings| Ok ( ExportedChunk { data : strings } ) ) ;
256
225
257
226
Ok ( Response :: new ( Box :: pin ( s) ) )
@@ -280,7 +249,7 @@ impl MetaService for MetaServiceImpl {
280
249
) -> Result < Response < TxnReply > , Status > {
281
250
self . check_token ( request. metadata ( ) ) ?;
282
251
incr_meta_metrics_meta_recv_bytes ( request. get_ref ( ) . encoded_len ( ) as u64 ) ;
283
- add_meta_metrics_meta_request_inflights ( 1 ) ;
252
+ let _guard = WithCount :: new ( ( ) , RequestInFlight ) ;
284
253
285
254
common_tracing:: extract_remote_span_as_parent ( & request) ;
286
255
@@ -289,8 +258,6 @@ impl MetaService for MetaServiceImpl {
289
258
info ! ( "Receive txn_request: {:?}" , request) ;
290
259
291
260
let body = self . execute_txn ( request) . await ;
292
- add_meta_metrics_meta_request_inflights ( -1 ) ;
293
-
294
261
incr_meta_metrics_meta_sent_bytes ( body. encoded_len ( ) as u64 ) ;
295
262
296
263
Ok ( Response :: new ( body) )
@@ -301,6 +268,8 @@ impl MetaService for MetaServiceImpl {
301
268
request : Request < MemberListRequest > ,
302
269
) -> Result < Response < MemberListReply > , Status > {
303
270
self . check_token ( request. metadata ( ) ) ?;
271
+ let _guard = WithCount :: new ( ( ) , RequestInFlight ) ;
272
+
304
273
let meta_node = & self . meta_node ;
305
274
let members = meta_node. get_meta_addrs ( ) . await . map_err ( |e| {
306
275
Status :: internal ( format ! ( "Cannot get metasrv member list, error: {:?}" , e) )
@@ -316,6 +285,8 @@ impl MetaService for MetaServiceImpl {
316
285
& self ,
317
286
request : Request < Empty > ,
318
287
) -> Result < Response < ClientInfo > , Status > {
288
+ let _guard = WithCount :: new ( ( ) , RequestInFlight ) ;
289
+
319
290
let r = request. remote_addr ( ) ;
320
291
if let Some ( addr) = r {
321
292
let resp = ClientInfo {
0 commit comments