@@ -21,6 +21,7 @@ use common_arrow::arrow_format::flight::data::BasicAuth;
21
21
use common_base:: base:: tokio:: sync:: mpsc;
22
22
use common_grpc:: GrpcClaim ;
23
23
use common_grpc:: GrpcToken ;
24
+ use common_meta_api:: KVApi ;
24
25
use common_meta_client:: MetaGrpcReadReq ;
25
26
use common_meta_client:: MetaGrpcReq ;
26
27
use common_meta_client:: MetaGrpcWriteReq ;
@@ -49,11 +50,11 @@ use tonic::Status;
49
50
use tonic:: Streaming ;
50
51
use tracing:: info;
51
52
52
- use crate :: executor:: ActionHandler ;
53
53
use crate :: meta_service:: meta_service_impl:: GrpcStream ;
54
54
use crate :: meta_service:: MetaNode ;
55
55
use crate :: metrics:: add_meta_metrics_meta_request_inflights;
56
56
use crate :: metrics:: incr_meta_metrics_meta_recv_bytes;
57
+ use crate :: metrics:: incr_meta_metrics_meta_request_result;
57
58
use crate :: metrics:: incr_meta_metrics_meta_sent_bytes;
58
59
use crate :: version:: from_digit_ver;
59
60
use crate :: version:: to_digit_ver;
@@ -62,14 +63,14 @@ use crate::version::MIN_METACLI_SEMVER;
62
63
63
64
pub struct MetaServiceImpl {
64
65
token : GrpcToken ,
65
- action_handler : ActionHandler ,
66
+ pub ( crate ) meta_node : Arc < MetaNode > ,
66
67
}
67
68
68
69
impl MetaServiceImpl {
69
70
pub fn create ( meta_node : Arc < MetaNode > ) -> Self {
70
71
Self {
71
72
token : GrpcToken :: create ( ) ,
72
- action_handler : ActionHandler :: create ( meta_node) ,
73
+ meta_node,
73
74
}
74
75
}
75
76
@@ -85,6 +86,47 @@ impl MetaServiceImpl {
85
86
} ) ?;
86
87
Ok ( claim)
87
88
}
89
+
90
+ pub async fn execute_kv_req ( & self , req : MetaGrpcReq ) -> RaftReply {
91
+ // To keep the code IDE-friendly, we manually expand the enum variants and dispatch them one by one
92
+
93
+ match req {
94
+ MetaGrpcReq :: UpsertKV ( a) => {
95
+ let res = self . meta_node . upsert_kv ( a) . await ;
96
+ incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
97
+ RaftReply :: from ( res)
98
+ }
99
+ MetaGrpcReq :: GetKV ( a) => {
100
+ let res = self . meta_node . get_kv ( & a. key ) . await ;
101
+ incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
102
+ RaftReply :: from ( res)
103
+ }
104
+ MetaGrpcReq :: MGetKV ( a) => {
105
+ let res = self . meta_node . mget_kv ( & a. keys ) . await ;
106
+ incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
107
+ RaftReply :: from ( res)
108
+ }
109
+ MetaGrpcReq :: ListKV ( a) => {
110
+ let res = self . meta_node . prefix_list_kv ( & a. prefix ) . await ;
111
+ incr_meta_metrics_meta_request_result ( res. is_ok ( ) ) ;
112
+ RaftReply :: from ( res)
113
+ }
114
+ }
115
+ }
116
+
117
+ pub async fn execute_txn ( & self , req : TxnRequest ) -> TxnReply {
118
+ let ret = self . meta_node . transaction ( req) . await ;
119
+ incr_meta_metrics_meta_request_result ( ret. is_ok ( ) ) ;
120
+
121
+ match ret {
122
+ Ok ( resp) => resp,
123
+ Err ( err) => TxnReply {
124
+ success : false ,
125
+ error : serde_json:: to_string ( & err) . expect ( "fail to serialize" ) ,
126
+ responses : vec ! [ ] ,
127
+ } ,
128
+ }
129
+ }
88
130
}
89
131
90
132
#[ async_trait:: async_trait]
@@ -162,7 +204,7 @@ impl MetaService for MetaServiceImpl {
162
204
163
205
info ! ( "Receive write_action: {:?}" , req) ;
164
206
165
- let body = self . action_handler . execute_kv_req ( req) . await ;
207
+ let body = self . execute_kv_req ( req) . await ;
166
208
167
209
add_meta_metrics_meta_request_inflights ( -1 ) ;
168
210
@@ -184,7 +226,7 @@ impl MetaService for MetaServiceImpl {
184
226
185
227
info ! ( "Receive read_action: {:?}" , req) ;
186
228
187
- let res = self . action_handler . execute_kv_req ( req) . await ;
229
+ let res = self . execute_kv_req ( req) . await ;
188
230
189
231
add_meta_metrics_meta_request_inflights ( -1 ) ;
190
232
@@ -204,7 +246,7 @@ impl MetaService for MetaServiceImpl {
204
246
& self ,
205
247
_request : Request < common_meta_types:: protobuf:: Empty > ,
206
248
) -> Result < Response < Self :: ExportStream > , Status > {
207
- let meta_node = & self . action_handler . meta_node ;
249
+ let meta_node = & self . meta_node ;
208
250
209
251
let res = meta_node. sto . export ( ) . await ?;
210
252
@@ -225,7 +267,7 @@ impl MetaService for MetaServiceImpl {
225
267
) -> Result < Response < Self :: WatchStream > , Status > {
226
268
let ( tx, rx) = mpsc:: channel ( 4 ) ;
227
269
228
- let meta_node = & self . action_handler . meta_node ;
270
+ let meta_node = & self . meta_node ;
229
271
meta_node. create_watcher_stream ( request. into_inner ( ) , tx) ;
230
272
231
273
let output_stream = tokio_stream:: wrappers:: ReceiverStream :: new ( rx) ;
@@ -246,7 +288,7 @@ impl MetaService for MetaServiceImpl {
246
288
247
289
info ! ( "Receive txn_request: {:?}" , request) ;
248
290
249
- let body = self . action_handler . execute_txn ( request) . await ;
291
+ let body = self . execute_txn ( request) . await ;
250
292
add_meta_metrics_meta_request_inflights ( -1 ) ;
251
293
252
294
incr_meta_metrics_meta_sent_bytes ( body. encoded_len ( ) as u64 ) ;
@@ -259,7 +301,7 @@ impl MetaService for MetaServiceImpl {
259
301
request : Request < MemberListRequest > ,
260
302
) -> Result < Response < MemberListReply > , Status > {
261
303
self . check_token ( request. metadata ( ) ) ?;
262
- let meta_node = & self . action_handler . meta_node ;
304
+ let meta_node = & self . meta_node ;
263
305
let members = meta_node. get_meta_addrs ( ) . await . map_err ( |e| {
264
306
Status :: internal ( format ! ( "Cannot get metasrv member list, error: {:?}" , e) )
265
307
} ) ?;
0 commit comments