Skip to content

Commit 01c592f

Browse files
authored
Merge pull request #7555 from drmingdrmer/8-grpc-req
refactor(meta-service): remove redundant ActionHandler; move logic into MetaServiceImpl
2 parents 2ede632 + 3c76ac2 commit 01c592f

File tree

6 files changed

+86
-124
lines changed

6 files changed

+86
-124
lines changed

src/meta/client/src/grpc_action.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,33 @@ pub enum MetaGrpcReadReq {
5959
ListKV(ListKVReq), // since 2022-05-23
6060
}
6161

62+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, derive_more::From)]
63+
pub enum MetaGrpcReq {
64+
UpsertKV(UpsertKVReq),
65+
66+
GetKV(GetKVReq),
67+
MGetKV(MGetKVReq),
68+
ListKV(ListKVReq),
69+
}
70+
71+
impl From<MetaGrpcWriteReq> for MetaGrpcReq {
72+
fn from(r: MetaGrpcWriteReq) -> Self {
73+
match r {
74+
MetaGrpcWriteReq::UpsertKV(x) => x.into(),
75+
}
76+
}
77+
}
78+
79+
impl From<MetaGrpcReadReq> for MetaGrpcReq {
80+
fn from(r: MetaGrpcReadReq) -> Self {
81+
match r {
82+
MetaGrpcReadReq::GetKV(x) => x.into(),
83+
MetaGrpcReadReq::MGetKV(x) => x.into(),
84+
MetaGrpcReadReq::ListKV(x) => x.into(),
85+
}
86+
}
87+
}
88+
6289
/// Try convert tonic::Request<RaftRequest> to DoActionAction.
6390
impl TryInto<MetaGrpcWriteReq> for Request<RaftRequest> {
6491
type Error = tonic::Status;

src/meta/client/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod kv_api_impl;
1818
mod message;
1919

2020
pub use grpc_action::MetaGrpcReadReq;
21+
pub use grpc_action::MetaGrpcReq;
2122
pub use grpc_action::MetaGrpcWriteReq;
2223
pub use grpc_action::RequestFor;
2324
pub use grpc_client::ClientHandle;

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use common_arrow::arrow_format::flight::data::BasicAuth;
2121
use common_base::base::tokio::sync::mpsc;
2222
use common_grpc::GrpcClaim;
2323
use common_grpc::GrpcToken;
24+
use common_meta_api::KVApi;
2425
use common_meta_client::MetaGrpcReadReq;
26+
use common_meta_client::MetaGrpcReq;
2527
use common_meta_client::MetaGrpcWriteReq;
2628
use common_meta_types::protobuf::meta_service_server::MetaService;
2729
use common_meta_types::protobuf::ClientInfo;
@@ -48,11 +50,11 @@ use tonic::Status;
4850
use tonic::Streaming;
4951
use tracing::info;
5052

51-
use crate::executor::ActionHandler;
5253
use crate::meta_service::meta_service_impl::GrpcStream;
5354
use crate::meta_service::MetaNode;
5455
use crate::metrics::add_meta_metrics_meta_request_inflights;
5556
use crate::metrics::incr_meta_metrics_meta_recv_bytes;
57+
use crate::metrics::incr_meta_metrics_meta_request_result;
5658
use crate::metrics::incr_meta_metrics_meta_sent_bytes;
5759
use crate::version::from_digit_ver;
5860
use crate::version::to_digit_ver;
@@ -61,14 +63,14 @@ use crate::version::MIN_METACLI_SEMVER;
6163

6264
pub struct MetaServiceImpl {
6365
token: GrpcToken,
64-
action_handler: ActionHandler,
66+
pub(crate) meta_node: Arc<MetaNode>,
6567
}
6668

6769
impl MetaServiceImpl {
6870
pub fn create(meta_node: Arc<MetaNode>) -> Self {
6971
Self {
7072
token: GrpcToken::create(),
71-
action_handler: ActionHandler::create(meta_node),
73+
meta_node,
7274
}
7375
}
7476

@@ -84,6 +86,47 @@ impl MetaServiceImpl {
8486
})?;
8587
Ok(claim)
8688
}
89+
90+
pub async fn execute_kv_req(&self, req: MetaGrpcReq) -> RaftReply {
91+
// To keep the code IDE-friendly, we manually expand the enum variants and dispatch them one by one
92+
93+
match req {
94+
MetaGrpcReq::UpsertKV(a) => {
95+
let res = self.meta_node.upsert_kv(a).await;
96+
incr_meta_metrics_meta_request_result(res.is_ok());
97+
RaftReply::from(res)
98+
}
99+
MetaGrpcReq::GetKV(a) => {
100+
let res = self.meta_node.get_kv(&a.key).await;
101+
incr_meta_metrics_meta_request_result(res.is_ok());
102+
RaftReply::from(res)
103+
}
104+
MetaGrpcReq::MGetKV(a) => {
105+
let res = self.meta_node.mget_kv(&a.keys).await;
106+
incr_meta_metrics_meta_request_result(res.is_ok());
107+
RaftReply::from(res)
108+
}
109+
MetaGrpcReq::ListKV(a) => {
110+
let res = self.meta_node.prefix_list_kv(&a.prefix).await;
111+
incr_meta_metrics_meta_request_result(res.is_ok());
112+
RaftReply::from(res)
113+
}
114+
}
115+
}
116+
117+
pub async fn execute_txn(&self, req: TxnRequest) -> TxnReply {
118+
let ret = self.meta_node.transaction(req).await;
119+
incr_meta_metrics_meta_request_result(ret.is_ok());
120+
121+
match ret {
122+
Ok(resp) => resp,
123+
Err(err) => TxnReply {
124+
success: false,
125+
error: serde_json::to_string(&err).expect("fail to serialize"),
126+
responses: vec![],
127+
},
128+
}
129+
}
87130
}
88131

89132
#[async_trait::async_trait]
@@ -154,13 +197,14 @@ impl MetaService for MetaServiceImpl {
154197

155198
incr_meta_metrics_meta_recv_bytes(request.get_ref().encoded_len() as u64);
156199

157-
let action: MetaGrpcWriteReq = request.try_into()?;
200+
let req: MetaGrpcWriteReq = request.try_into()?;
201+
let req: MetaGrpcReq = req.into();
158202

159203
add_meta_metrics_meta_request_inflights(1);
160204

161-
info!("Receive write_action: {:?}", action);
205+
info!("Receive write_action: {:?}", req);
162206

163-
let body = self.action_handler.execute_write(action).await;
207+
let body = self.execute_kv_req(req).await;
164208

165209
add_meta_metrics_meta_request_inflights(-1);
166210

@@ -175,13 +219,14 @@ impl MetaService for MetaServiceImpl {
175219

176220
incr_meta_metrics_meta_recv_bytes(request.get_ref().encoded_len() as u64);
177221

178-
let action: MetaGrpcReadReq = request.try_into()?;
222+
let req: MetaGrpcReadReq = request.try_into()?;
223+
let req: MetaGrpcReq = req.into();
179224

180225
add_meta_metrics_meta_request_inflights(1);
181226

182-
info!("Receive read_action: {:?}", action);
227+
info!("Receive read_action: {:?}", req);
183228

184-
let res = self.action_handler.execute_read(action).await;
229+
let res = self.execute_kv_req(req).await;
185230

186231
add_meta_metrics_meta_request_inflights(-1);
187232

@@ -201,7 +246,7 @@ impl MetaService for MetaServiceImpl {
201246
&self,
202247
_request: Request<common_meta_types::protobuf::Empty>,
203248
) -> Result<Response<Self::ExportStream>, Status> {
204-
let meta_node = &self.action_handler.meta_node;
249+
let meta_node = &self.meta_node;
205250

206251
let res = meta_node.sto.export().await?;
207252

@@ -222,7 +267,7 @@ impl MetaService for MetaServiceImpl {
222267
) -> Result<Response<Self::WatchStream>, Status> {
223268
let (tx, rx) = mpsc::channel(4);
224269

225-
let meta_node = &self.action_handler.meta_node;
270+
let meta_node = &self.meta_node;
226271
meta_node.create_watcher_stream(request.into_inner(), tx);
227272

228273
let output_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
@@ -243,7 +288,7 @@ impl MetaService for MetaServiceImpl {
243288

244289
info!("Receive txn_request: {:?}", request);
245290

246-
let body = self.action_handler.execute_txn(request).await;
291+
let body = self.execute_txn(request).await;
247292
add_meta_metrics_meta_request_inflights(-1);
248293

249294
incr_meta_metrics_meta_sent_bytes(body.encoded_len() as u64);
@@ -256,7 +301,7 @@ impl MetaService for MetaServiceImpl {
256301
request: Request<MemberListRequest>,
257302
) -> Result<Response<MemberListReply>, Status> {
258303
self.check_token(request.metadata())?;
259-
let meta_node = &self.action_handler.meta_node;
304+
let meta_node = &self.meta_node;
260305
let members = meta_node.get_meta_addrs().await.map_err(|e| {
261306
Status::internal(format!("Cannot get metasrv member list, error: {:?}", e))
262307
})?;

src/meta/service/src/executor/action_handler.rs

Lines changed: 0 additions & 93 deletions
This file was deleted.

src/meta/service/src/executor/mod.rs

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/meta/service/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
pub mod api;
1818
pub mod configs;
19-
pub mod executor;
2019
pub mod export;
2120
pub mod meta_service;
2221
pub mod metrics;

0 commit comments

Comments
 (0)