Skip to content

Commit e849dde

Browse files
authored
Merge pull request #7538 from drmingdrmer/4-echo-ip
feat(meta-service): new RPC to echo client ip
2 parents 75d7ca6 + 3b1d69b commit e849dde

File tree

8 files changed

+119
-0
lines changed

8 files changed

+119
-0
lines changed

src/meta/grpc/src/grpc_action.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::convert::TryInto;
1616
use std::fmt::Debug;
1717

1818
use common_meta_types::protobuf::meta_service_client::MetaServiceClient;
19+
use common_meta_types::protobuf::ClientInfo;
1920
use common_meta_types::protobuf::RaftRequest;
2021
use common_meta_types::protobuf::WatchRequest;
2122
use common_meta_types::protobuf::WatchResponse;
@@ -35,6 +36,7 @@ use tonic::Request;
3536

3637
use crate::grpc_client::AuthInterceptor;
3738
use crate::message::ExportReq;
39+
use crate::message::GetClientInfo;
3840
use crate::message::GetEndpoints;
3941
use crate::message::MakeClient;
4042

@@ -161,3 +163,7 @@ impl RequestFor for GetEndpoints {
161163
impl RequestFor for TxnRequest {
162164
type Reply = TxnReply;
163165
}
166+
167+
impl RequestFor for GetClientInfo {
168+
type Reply = ClientInfo;
169+
}

src/meta/grpc/src/grpc_client.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use common_grpc::RpcClientTlsConfig;
4141
use common_meta_api::KVApi;
4242
use common_meta_types::anyerror::AnyError;
4343
use common_meta_types::protobuf::meta_service_client::MetaServiceClient;
44+
use common_meta_types::protobuf::ClientInfo;
4445
use common_meta_types::protobuf::Empty;
4546
use common_meta_types::protobuf::ExportedChunk;
4647
use common_meta_types::protobuf::HandshakeRequest;
@@ -213,6 +214,10 @@ impl ClientHandle {
213214
Ok(r)
214215
}
215216

217+
pub async fn get_client_info(&self) -> std::result::Result<ClientInfo, MetaError> {
218+
self.request(message::GetClientInfo {}).await
219+
}
220+
216221
pub async fn make_client(
217222
&self,
218223
) -> std::result::Result<
@@ -379,6 +384,10 @@ impl MetaGrpcClient {
379384
let resp = self.get_endpoints().await;
380385
Ok(message::Response::GetEndpoints(resp))
381386
}
387+
message::Request::GetClientInfo(_) => {
388+
let resp = self.get_client_info().await;
389+
resp.map(message::Response::GetClientInfo)
390+
}
382391
};
383392

384393
debug!(
@@ -752,6 +761,16 @@ impl MetaGrpcClient {
752761
Ok(res.into_inner())
753762
}
754763

764+
/// Export all data in json from metasrv.
765+
#[tracing::instrument(level = "debug", skip_all)]
766+
pub(crate) async fn get_client_info(&self) -> std::result::Result<ClientInfo, MetaError> {
767+
debug!("MetaGrpcClient::get_client_info");
768+
769+
let mut client = self.make_client().await?;
770+
let res = client.get_client_info(Empty {}).await?;
771+
Ok(res.into_inner())
772+
}
773+
755774
#[tracing::instrument(level = "debug", skip(self, v))]
756775
pub(crate) async fn do_write<T, R>(&self, v: T) -> std::result::Result<R, MetaError>
757776
where

src/meta/grpc/src/message.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use common_base::base::tokio::sync::oneshot::Sender;
1616
use common_meta_types::protobuf::meta_service_client::MetaServiceClient;
17+
use common_meta_types::protobuf::ClientInfo;
1718
use common_meta_types::protobuf::ExportedChunk;
1819
use common_meta_types::protobuf::WatchRequest;
1920
use common_meta_types::protobuf::WatchResponse;
@@ -72,6 +73,9 @@ pub enum Request {
7273

7374
/// Get endpoints, for test
7475
GetEndpoints(GetEndpoints),
76+
77+
/// Get info about the client
78+
GetClientInfo(GetClientInfo),
7579
}
7680

7781
/// Meta-client worker-to-handle response body
@@ -86,6 +90,7 @@ pub enum Response {
8690
Export(tonic::codec::Streaming<ExportedChunk>),
8791
MakeClient(MetaServiceClient<InterceptedService<Channel, AuthInterceptor>>),
8892
GetEndpoints(Vec<String>),
93+
GetClientInfo(ClientInfo),
8994
}
9095

9196
/// Export all data stored in metasrv
@@ -101,3 +106,7 @@ pub struct MakeClient {}
101106
/// Get all meta server endpoints
102107
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
103108
pub struct GetEndpoints {}
109+
110+
/// Get info about client
111+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
112+
pub struct GetClientInfo {}

src/meta/grpc/tests/it/grpc_server.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use std::time::Duration;
1919
use common_base::base::tokio;
2020
use common_meta_types::protobuf::meta_service_server::MetaService;
2121
use common_meta_types::protobuf::meta_service_server::MetaServiceServer;
22+
use common_meta_types::protobuf::ClientInfo;
23+
use common_meta_types::protobuf::Empty;
2224
use common_meta_types::protobuf::ExportedChunk;
2325
use common_meta_types::protobuf::HandshakeResponse;
2426
use common_meta_types::protobuf::MemberListReply;
@@ -102,6 +104,13 @@ impl MetaService for GrpcServiceForTestImpl {
102104
) -> Result<Response<MemberListReply>, Status> {
103105
todo!()
104106
}
107+
108+
async fn get_client_info(
109+
&self,
110+
_request: Request<Empty>,
111+
) -> Result<Response<ClientInfo>, Status> {
112+
todo!()
113+
}
105114
}
106115

107116
pub fn start_grpc_server() -> String {

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use common_grpc::GrpcToken;
2424
use common_meta_grpc::MetaGrpcReadReq;
2525
use common_meta_grpc::MetaGrpcWriteReq;
2626
use common_meta_types::protobuf::meta_service_server::MetaService;
27+
use common_meta_types::protobuf::ClientInfo;
28+
use common_meta_types::protobuf::Empty;
2729
use common_meta_types::protobuf::ExportedChunk;
2830
use common_meta_types::protobuf::HandshakeRequest;
2931
use common_meta_types::protobuf::HandshakeResponse;
@@ -264,6 +266,20 @@ impl MetaService for MetaServiceImpl {
264266

265267
Ok(Response::new(resp))
266268
}
269+
270+
async fn get_client_info(
271+
&self,
272+
request: Request<Empty>,
273+
) -> Result<Response<ClientInfo>, Status> {
274+
let r = request.remote_addr();
275+
if let Some(addr) = r {
276+
let resp = ClientInfo {
277+
client_addr: addr.to_string(),
278+
};
279+
return Ok(Response::new(resp));
280+
}
281+
Err(Status::unavailable("can not get client ip address"))
282+
}
267283
}
268284

269285
pub struct ExportStream {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::time::Duration;
16+
17+
use common_base::base::tokio;
18+
use common_meta_grpc::MetaGrpcClient;
19+
use pretty_assertions::assert_eq;
20+
use regex::Regex;
21+
22+
use crate::init_meta_ut;
23+
24+
#[async_entry::test(worker_threads = 3, init = "init_meta_ut!()", tracing_span = "debug")]
25+
async fn test_get_client_info() -> anyhow::Result<()> {
26+
// - Start a metasrv server.
27+
// - Get client ip
28+
29+
let (_tc, addr) = crate::tests::start_metasrv().await?;
30+
31+
let client = MetaGrpcClient::try_create(
32+
vec![addr],
33+
"root",
34+
"xxx",
35+
None,
36+
Some(Duration::from_secs(10)),
37+
None,
38+
)?;
39+
40+
let resp = client.get_client_info().await?;
41+
42+
let client_addr = resp.client_addr;
43+
44+
let masked_addr = Regex::new(r"\d+")
45+
.unwrap()
46+
.replace_all(&client_addr, "1")
47+
.to_string();
48+
49+
assert_eq!("1.1.1.1:1", masked_addr);
50+
Ok(())
51+
}

src/meta/service/tests/it/grpc/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
pub mod metasrv_grpc_api;
1616
mod metasrv_grpc_export;
17+
pub mod metasrv_grpc_get_client_info;
1718
pub mod metasrv_grpc_handshake;
1819
pub mod metasrv_grpc_kv_api;
1920
pub mod metasrv_grpc_kv_api_restart_cluster;

src/meta/types/proto/meta.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ message TxnReply {
143143
string error = 3;
144144
}
145145

146+
message ClientInfo {
147+
// The address of the connected in form of "<ip>:<port>"
148+
string client_addr = 10;
149+
}
150+
146151
service RaftService {
147152

148153
rpc Write(RaftRequest) returns (RaftReply) {}
@@ -181,4 +186,7 @@ service MetaService {
181186

182187
// Get MetaSrv member list endpoints
183188
rpc MemberList(MemberListRequest) returns (MemberListReply);
189+
190+
// Respond with the information about the client.
191+
rpc GetClientInfo(Empty) returns (ClientInfo);
184192
}

0 commit comments

Comments
 (0)