Skip to content

Commit 5013671

Browse files
committed
refactor(meta-service): add MetaGrpcReq, as a replacement of MetaGrpcReadReq and MetaGrpcWriteReq.
There is no need to distinguish read/write at the request level.
1 parent 2f8945a commit 5013671

File tree

4 files changed

+56
-42
lines changed

4 files changed

+56
-42
lines changed

src/meta/client/src/grpc_action.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,33 @@ pub enum MetaGrpcReadReq {
5959
ListKV(ListKVReq), // since 2022-05-23
6060
}
6161

62+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, derive_more::From)]
63+
pub enum MetaGrpcReq {
64+
UpsertKV(UpsertKVReq),
65+
66+
GetKV(GetKVReq),
67+
MGetKV(MGetKVReq),
68+
ListKV(ListKVReq),
69+
}
70+
71+
impl From<MetaGrpcWriteReq> for MetaGrpcReq {
72+
fn from(r: MetaGrpcWriteReq) -> Self {
73+
match r {
74+
MetaGrpcWriteReq::UpsertKV(x) => x.into(),
75+
}
76+
}
77+
}
78+
79+
impl From<MetaGrpcReadReq> for MetaGrpcReq {
80+
fn from(r: MetaGrpcReadReq) -> Self {
81+
match r {
82+
MetaGrpcReadReq::GetKV(x) => x.into(),
83+
MetaGrpcReadReq::MGetKV(x) => x.into(),
84+
MetaGrpcReadReq::ListKV(x) => x.into(),
85+
}
86+
}
87+
}
88+
6289
/// Try convert tonic::Request<RaftRequest> to DoActionAction.
6390
impl TryInto<MetaGrpcWriteReq> for Request<RaftRequest> {
6491
type Error = tonic::Status;

src/meta/client/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod kv_api_impl;
1818
mod message;
1919

2020
pub use grpc_action::MetaGrpcReadReq;
21+
pub use grpc_action::MetaGrpcReq;
2122
pub use grpc_action::MetaGrpcWriteReq;
2223
pub use grpc_action::RequestFor;
2324
pub use grpc_client::ClientHandle;

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use common_base::base::tokio::sync::mpsc;
2222
use common_grpc::GrpcClaim;
2323
use common_grpc::GrpcToken;
2424
use common_meta_client::MetaGrpcReadReq;
25+
use common_meta_client::MetaGrpcReq;
2526
use common_meta_client::MetaGrpcWriteReq;
2627
use common_meta_types::protobuf::meta_service_server::MetaService;
2728
use common_meta_types::protobuf::ClientInfo;
@@ -154,13 +155,14 @@ impl MetaService for MetaServiceImpl {
154155

155156
incr_meta_metrics_meta_recv_bytes(request.get_ref().encoded_len() as u64);
156157

157-
let action: MetaGrpcWriteReq = request.try_into()?;
158+
let req: MetaGrpcWriteReq = request.try_into()?;
159+
let req: MetaGrpcReq = req.into();
158160

159161
add_meta_metrics_meta_request_inflights(1);
160162

161-
info!("Receive write_action: {:?}", action);
163+
info!("Receive write_action: {:?}", req);
162164

163-
let body = self.action_handler.execute_write(action).await;
165+
let body = self.action_handler.execute_kv_req(req).await;
164166

165167
add_meta_metrics_meta_request_inflights(-1);
166168

@@ -175,13 +177,14 @@ impl MetaService for MetaServiceImpl {
175177

176178
incr_meta_metrics_meta_recv_bytes(request.get_ref().encoded_len() as u64);
177179

178-
let action: MetaGrpcReadReq = request.try_into()?;
180+
let req: MetaGrpcReadReq = request.try_into()?;
181+
let req: MetaGrpcReq = req.into();
179182

180183
add_meta_metrics_meta_request_inflights(1);
181184

182-
info!("Receive read_action: {:?}", action);
185+
info!("Receive read_action: {:?}", req);
183186

184-
let res = self.action_handler.execute_read(action).await;
187+
let res = self.action_handler.execute_kv_req(req).await;
185188

186189
add_meta_metrics_meta_request_inflights(-1);
187190

src/meta/service/src/executor/action_handler.rs

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
use std::sync::Arc;
1616

1717
use common_meta_api::KVApi;
18-
use common_meta_client::MetaGrpcReadReq;
19-
use common_meta_client::MetaGrpcWriteReq;
20-
use common_meta_client::RequestFor;
18+
use common_meta_client::MetaGrpcReq;
2119
use common_meta_types::protobuf::RaftReply;
22-
use common_meta_types::MetaError;
2320
use common_meta_types::TxnReply;
2421
use common_meta_types::TxnRequest;
2522

@@ -31,48 +28,34 @@ pub struct ActionHandler {
3128
pub(crate) meta_node: Arc<MetaNode>,
3229
}
3330

34-
#[async_trait::async_trait]
35-
pub trait RequestHandler<T>: Sync + Send
36-
where T: RequestFor
37-
{
38-
async fn handle(&self, req: T) -> Result<T::Reply, MetaError>;
39-
}
40-
4131
impl ActionHandler {
4232
pub fn create(meta_node: Arc<MetaNode>) -> Self {
4333
ActionHandler { meta_node }
4434
}
4535

46-
pub async fn execute_write(&self, action: MetaGrpcWriteReq) -> RaftReply {
36+
pub async fn execute_kv_req(&self, req: MetaGrpcReq) -> RaftReply {
4737
// To keep the code IDE-friendly, we manually expand the enum variants and dispatch them one by one
4838

49-
match action {
50-
MetaGrpcWriteReq::UpsertKV(a) => {
51-
let r = self.meta_node.upsert_kv(a).await;
52-
incr_meta_metrics_meta_request_result(r.is_ok());
53-
RaftReply::from(r)
39+
match req {
40+
MetaGrpcReq::UpsertKV(a) => {
41+
let res = self.meta_node.upsert_kv(a).await;
42+
incr_meta_metrics_meta_request_result(res.is_ok());
43+
RaftReply::from(res)
5444
}
55-
}
56-
}
57-
58-
pub async fn execute_read(&self, action: MetaGrpcReadReq) -> RaftReply {
59-
// To keep the code IDE-friendly, we manually expand the enum variants and dispatch them one by one
60-
61-
match action {
62-
MetaGrpcReadReq::GetKV(a) => {
63-
let r = self.meta_node.get_kv(&a.key).await;
64-
incr_meta_metrics_meta_request_result(r.is_ok());
65-
RaftReply::from(r)
45+
MetaGrpcReq::GetKV(a) => {
46+
let res = self.meta_node.get_kv(&a.key).await;
47+
incr_meta_metrics_meta_request_result(res.is_ok());
48+
RaftReply::from(res)
6649
}
67-
MetaGrpcReadReq::MGetKV(a) => {
68-
let r = self.meta_node.mget_kv(&a.keys).await;
69-
incr_meta_metrics_meta_request_result(r.is_ok());
70-
RaftReply::from(r)
50+
MetaGrpcReq::MGetKV(a) => {
51+
let res = self.meta_node.mget_kv(&a.keys).await;
52+
incr_meta_metrics_meta_request_result(res.is_ok());
53+
RaftReply::from(res)
7154
}
72-
MetaGrpcReadReq::ListKV(a) => {
73-
let r = self.meta_node.prefix_list_kv(&a.prefix).await;
74-
incr_meta_metrics_meta_request_result(r.is_ok());
75-
RaftReply::from(r)
55+
MetaGrpcReq::ListKV(a) => {
56+
let res = self.meta_node.prefix_list_kv(&a.prefix).await;
57+
incr_meta_metrics_meta_request_result(res.is_ok());
58+
RaftReply::from(res)
7659
}
7760
}
7861
}

0 commit comments

Comments
 (0)