@@ -21,7 +21,9 @@ use common_arrow::arrow_format::flight::data::BasicAuth;
21
21
use common_base:: base:: tokio:: sync:: mpsc;
22
22
use common_grpc:: GrpcClaim ;
23
23
use common_grpc:: GrpcToken ;
24
+ use common_meta_api:: KVApi ;
24
25
use common_meta_client:: MetaGrpcReadReq ;
26
+ use common_meta_client:: MetaGrpcReq ;
25
27
use common_meta_client:: MetaGrpcWriteReq ;
26
28
use common_meta_types:: protobuf:: meta_service_server:: MetaService ;
27
29
use common_meta_types:: protobuf:: ClientInfo ;
@@ -48,11 +50,11 @@ use tonic::Status;
48
50
use tonic:: Streaming ;
49
51
use tracing:: info;
50
52
51
- use crate :: executor:: ActionHandler ;
52
53
use crate :: meta_service:: meta_service_impl:: GrpcStream ;
53
54
use crate :: meta_service:: MetaNode ;
54
55
use crate :: metrics:: add_meta_metrics_meta_request_inflights;
55
56
use crate :: metrics:: incr_meta_metrics_meta_recv_bytes;
57
+ use crate :: metrics:: incr_meta_metrics_meta_request_result;
56
58
use crate :: metrics:: incr_meta_metrics_meta_sent_bytes;
57
59
use crate :: version:: from_digit_ver;
58
60
use crate :: version:: to_digit_ver;
@@ -61,14 +63,14 @@ use crate::version::MIN_METACLI_SEMVER;
61
63
62
64
pub struct MetaServiceImpl {
63
65
token : GrpcToken ,
64
- action_handler : ActionHandler ,
66
+ pub ( crate ) meta_node : Arc < MetaNode > ,
65
67
}
66
68
67
69
impl MetaServiceImpl {
68
70
pub fn create ( meta_node : Arc < MetaNode > ) -> Self {
69
71
Self {
70
72
token : GrpcToken :: create ( ) ,
71
- action_handler : ActionHandler :: create ( meta_node) ,
73
+ meta_node,
72
74
}
73
75
}
74
76
@@ -84,6 +86,47 @@ impl MetaServiceImpl {
84
86
} ) ?;
85
87
Ok ( claim)
86
88
}
89
+
90
+ pub async fn execute_kv_req ( & self , req : MetaGrpcReq ) -> RaftReply {
91
+ // To keep the code IDE-friendly, we manually expand the enum variants and dispatch them one by one
92
+
93
+ match req {
94
+ MetaGrpcReq :: UpsertKV ( a) => {
95
+ let res = self . meta_node . upsert_kv ( a) . await ;
96
+ incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
97
+ RaftReply :: from ( res)
98
+ }
99
+ MetaGrpcReq :: GetKV ( a) => {
100
+ let res = self . meta_node . get_kv ( & a. key ) . await ;
101
+ incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
102
+ RaftReply :: from ( res)
103
+ }
104
+ MetaGrpcReq :: MGetKV ( a) => {
105
+ let res = self . meta_node . mget_kv ( & a. keys ) . await ;
106
+ incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
107
+ RaftReply :: from ( res)
108
+ }
109
+ MetaGrpcReq :: ListKV ( a) => {
110
+ let res = self . meta_node . prefix_list_kv ( & a. prefix ) . await ;
111
+ incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
112
+ RaftReply :: from ( res)
113
+ }
114
+ }
115
+ }
116
+
117
+ pub async fn execute_txn ( & self , req : TxnRequest ) -> TxnReply {
118
+ let ret = self . meta_node . transaction ( req) . await ;
119
+ incr_meta_metrics_meta_request_result ( ret. is_ok ( ) ) ;
120
+
121
+ match ret {
122
+ Ok ( resp) => resp,
123
+ Err ( err) => TxnReply {
124
+ success : false ,
125
+ error : serde_json:: to_string ( & err) . expect ( "fail to serialize" ) ,
126
+ responses : vec ! [ ] ,
127
+ } ,
128
+ }
129
+ }
87
130
}
88
131
89
132
#[ async_trait:: async_trait]
@@ -154,13 +197,14 @@ impl MetaService for MetaServiceImpl {
154
197
155
198
incr_meta_metrics_meta_recv_bytes ( request. get_ref ( ) . encoded_len ( ) as u64 ) ;
156
199
157
- let action: MetaGrpcWriteReq = request. try_into ( ) ?;
200
+ let req: MetaGrpcWriteReq = request. try_into ( ) ?;
201
+ let req: MetaGrpcReq = req. into ( ) ;
158
202
159
203
add_meta_metrics_meta_request_inflights ( 1 ) ;
160
204
161
- info ! ( "Receive write_action: {:?}" , action ) ;
205
+ info ! ( "Receive write_action: {:?}" , req ) ;
162
206
163
- let body = self . action_handler . execute_write ( action ) . await ;
207
+ let body = self . execute_kv_req ( req ) . await ;
164
208
165
209
add_meta_metrics_meta_request_inflights ( -1 ) ;
166
210
@@ -175,13 +219,14 @@ impl MetaService for MetaServiceImpl {
175
219
176
220
incr_meta_metrics_meta_recv_bytes ( request. get_ref ( ) . encoded_len ( ) as u64 ) ;
177
221
178
- let action: MetaGrpcReadReq = request. try_into ( ) ?;
222
+ let req: MetaGrpcReadReq = request. try_into ( ) ?;
223
+ let req: MetaGrpcReq = req. into ( ) ;
179
224
180
225
add_meta_metrics_meta_request_inflights ( 1 ) ;
181
226
182
- info ! ( "Receive read_action: {:?}" , action ) ;
227
+ info ! ( "Receive read_action: {:?}" , req ) ;
183
228
184
- let res = self . action_handler . execute_read ( action ) . await ;
229
+ let res = self . execute_kv_req ( req ) . await ;
185
230
186
231
add_meta_metrics_meta_request_inflights ( -1 ) ;
187
232
@@ -201,7 +246,7 @@ impl MetaService for MetaServiceImpl {
201
246
& self ,
202
247
_request : Request < common_meta_types:: protobuf:: Empty > ,
203
248
) -> Result < Response < Self :: ExportStream > , Status > {
204
- let meta_node = & self . action_handler . meta_node ;
249
+ let meta_node = & self . meta_node ;
205
250
206
251
let res = meta_node. sto . export ( ) . await ?;
207
252
@@ -222,7 +267,7 @@ impl MetaService for MetaServiceImpl {
222
267
) -> Result < Response < Self :: WatchStream > , Status > {
223
268
let ( tx, rx) = mpsc:: channel ( 4 ) ;
224
269
225
- let meta_node = & self . action_handler . meta_node ;
270
+ let meta_node = & self . meta_node ;
226
271
meta_node. create_watcher_stream ( request. into_inner ( ) , tx) ;
227
272
228
273
let output_stream = tokio_stream:: wrappers:: ReceiverStream :: new ( rx) ;
@@ -243,7 +288,7 @@ impl MetaService for MetaServiceImpl {
243
288
244
289
info ! ( "Receive txn_request: {:?}" , request) ;
245
290
246
- let body = self . action_handler . execute_txn ( request) . await ;
291
+ let body = self . execute_txn ( request) . await ;
247
292
add_meta_metrics_meta_request_inflights ( -1 ) ;
248
293
249
294
incr_meta_metrics_meta_sent_bytes ( body. encoded_len ( ) as u64 ) ;
@@ -256,7 +301,7 @@ impl MetaService for MetaServiceImpl {
256
301
request : Request < MemberListRequest > ,
257
302
) -> Result < Response < MemberListReply > , Status > {
258
303
self . check_token ( request. metadata ( ) ) ?;
259
- let meta_node = & self . action_handler . meta_node ;
304
+ let meta_node = & self . meta_node ;
260
305
let members = meta_node. get_meta_addrs ( ) . await . map_err ( |e| {
261
306
Status :: internal ( format ! ( "Cannot get metasrv member list, error: {:?}" , e) )
262
307
} ) ?;
0 commit comments