Skip to content

Commit 67ee61b

Browse files
authored
feat: add stream api for get, mget and list (#13299)
* feat: add stream api for get, mget and list In this commit, meta grpc service provides new api `kv_read_v1() -> Stream<StreamItem>`. Service and client are both ready but not yet used by databend-query yet. This new API support `get` single kv, `mget` multiple kv and `list` kv by a prefix. The result is returned in a stream. Raft service also provides a `kv_read_v1()` API for internal forwarding such a request to the leader. Other changes: rename internal term `PrefixList -> List`. * chore: fix testing impl of MetaService * chore: fix clippy
1 parent 51289d7 commit 67ee61b

File tree

18 files changed

+652
-37
lines changed

18 files changed

+652
-37
lines changed

src/meta/client/src/grpc_action.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use common_meta_kvapi::kvapi::UpsertKVReq;
2626
use common_meta_types::protobuf::meta_service_client::MetaServiceClient;
2727
use common_meta_types::protobuf::ClientInfo;
2828
use common_meta_types::protobuf::RaftRequest;
29+
use common_meta_types::protobuf::StreamItem;
2930
use common_meta_types::protobuf::WatchRequest;
3031
use common_meta_types::protobuf::WatchResponse;
3132
use common_meta_types::InvalidArgument;
@@ -36,12 +37,14 @@ use log::debug;
3637
use tonic::codegen::InterceptedService;
3738
use tonic::transport::Channel;
3839
use tonic::Request;
40+
use tonic::Streaming;
3941

4042
use crate::grpc_client::AuthInterceptor;
4143
use crate::message::ExportReq;
4244
use crate::message::GetClientInfo;
4345
use crate::message::GetEndpoints;
4446
use crate::message::MakeClient;
47+
use crate::message::Streamed;
4548

4649
/// Bind a request type to its corresponding response type.
4750
pub trait RequestFor {
@@ -86,18 +89,62 @@ impl MetaGrpcReq {
8689
}
8790
}
8891

92+
#[derive(
93+
serde::Serialize,
94+
serde::Deserialize,
95+
Debug,
96+
Clone,
97+
PartialEq,
98+
Eq,
99+
derive_more::From,
100+
derive_more::TryInto,
101+
)]
102+
pub enum MetaGrpcReadReq {
103+
GetKV(GetKVReq),
104+
MGetKV(MGetKVReq),
105+
ListKV(ListKVReq),
106+
}
107+
108+
impl MetaGrpcReadReq {
109+
pub fn to_raft_request(&self) -> Result<RaftRequest, InvalidArgument> {
110+
let raft_request = RaftRequest {
111+
data: serde_json::to_string(self)
112+
.map_err(|e| InvalidArgument::new(e, "fail to encode request"))?,
113+
};
114+
115+
debug!(
116+
req = as_debug!(&raft_request);
117+
"build raft_request"
118+
);
119+
120+
Ok(raft_request)
121+
}
122+
}
123+
89124
impl RequestFor for GetKVReq {
90125
type Reply = GetKVReply;
91126
}
92127

128+
impl RequestFor for Streamed<GetKVReq> {
129+
type Reply = Streaming<StreamItem>;
130+
}
131+
93132
impl RequestFor for MGetKVReq {
94133
type Reply = MGetKVReply;
95134
}
96135

136+
impl RequestFor for Streamed<MGetKVReq> {
137+
type Reply = Streaming<StreamItem>;
138+
}
139+
97140
impl RequestFor for ListKVReq {
98141
type Reply = ListKVReply;
99142
}
100143

144+
impl RequestFor for Streamed<ListKVReq> {
145+
type Reply = Streaming<StreamItem>;
146+
}
147+
101148
impl RequestFor for UpsertKVReq {
102149
type Reply = UpsertKVReply;
103150
}

src/meta/client/src/grpc_client.rs

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use common_grpc::RpcClientConf;
4343
use common_grpc::RpcClientTlsConfig;
4444
use common_meta_api::reply::reply_to_api_result;
4545
use common_meta_types::anyerror::AnyError;
46+
use common_meta_types::protobuf as pb;
4647
use common_meta_types::protobuf::meta_service_client::MetaServiceClient;
4748
use common_meta_types::protobuf::ClientInfo;
4849
use common_meta_types::protobuf::Empty;
@@ -81,16 +82,19 @@ use tonic::transport::Channel;
8182
use tonic::Code;
8283
use tonic::Request;
8384
use tonic::Status;
85+
use tonic::Streaming;
8486

8587
use crate::from_digit_ver;
8688
use crate::grpc_action::RequestFor;
8789
use crate::grpc_metrics;
8890
use crate::message;
8991
use crate::to_digit_ver;
92+
use crate::MetaGrpcReadReq;
9093
use crate::MetaGrpcReq;
9194
use crate::METACLI_COMMIT_SEMVER;
9295
use crate::MIN_METASRV_SEMVER;
9396

97+
const RPC_RETRIES: usize = 2;
9498
const AUTH_TOKEN_KEY: &str = "auth-token-bin";
9599

96100
#[derive(Debug)]
@@ -403,19 +407,46 @@ impl MetaGrpcClient {
403407
.await;
404408
message::Response::Get(resp)
405409
}
410+
message::Request::StreamGet(r) => {
411+
let strm = self
412+
.kv_read_v1(MetaGrpcReadReq::GetKV(r.into_inner()))
413+
.timed_ge(threshold(), info_spent("MetaGrpcClient::kv_read_v1(GetKV)"))
414+
.await;
415+
message::Response::StreamGet(strm)
416+
}
406417
message::Request::MGet(r) => {
407418
let resp = self
408419
.kv_api(r)
409420
.timed_ge(threshold(), info_spent("MetaGrpcClient::kv_api"))
410421
.await;
411422
message::Response::MGet(resp)
412423
}
413-
message::Request::PrefixList(r) => {
424+
message::Request::StreamMGet(r) => {
425+
let strm = self
426+
.kv_read_v1(MetaGrpcReadReq::MGetKV(r.into_inner()))
427+
.timed_ge(
428+
threshold(),
429+
info_spent("MetaGrpcClient::kv_read_v1(MGetKV)"),
430+
)
431+
.await;
432+
message::Response::StreamMGet(strm)
433+
}
434+
message::Request::List(r) => {
414435
let resp = self
415436
.kv_api(r)
416437
.timed_ge(threshold(), info_spent("MetaGrpcClient::kv_api"))
417438
.await;
418-
message::Response::PrefixList(resp)
439+
message::Response::List(resp)
440+
}
441+
message::Request::StreamList(r) => {
442+
let strm = self
443+
.kv_read_v1(MetaGrpcReadReq::ListKV(r.into_inner()))
444+
.timed_ge(
445+
threshold(),
446+
info_spent("MetaGrpcClient::kv_read_v1(ListKV)"),
447+
)
448+
.await;
449+
message::Response::StreamMGet(strm)
419450
}
420451
message::Request::Upsert(r) => {
421452
let resp = self
@@ -898,7 +929,7 @@ impl MetaGrpcClient {
898929
.to_raft_request()
899930
.map_err(MetaNetworkError::InvalidArgument)?;
900931

901-
for i in 0..2 {
932+
for i in 0..RPC_RETRIES {
902933
let req = common_tracing::inject_span_to_tonic_request(Request::new(raft_req.clone()));
903934

904935
let mut client = self
@@ -929,7 +960,54 @@ impl MetaGrpcClient {
929960
return Ok(resp);
930961
}
931962

932-
unreachable!("impossible to reach here");
963+
unreachable!("impossible to quit loop without error or success");
964+
}
965+
966+
#[minitrace::trace]
967+
pub(crate) async fn kv_read_v1(
968+
&self,
969+
grpc_req: MetaGrpcReadReq,
970+
) -> Result<Streaming<pb::StreamItem>, MetaError> {
971+
debug!(
972+
req = as_debug!(&grpc_req);
973+
"MetaGrpcClient::kv_api request"
974+
);
975+
976+
let raft_req: RaftRequest = grpc_req
977+
.to_raft_request()
978+
.map_err(MetaNetworkError::InvalidArgument)?;
979+
980+
for i in 0..RPC_RETRIES {
981+
let req = common_tracing::inject_span_to_tonic_request(Request::new(raft_req.clone()));
982+
983+
let mut client = self
984+
.make_client()
985+
.timed_ge(threshold(), info_spent("MetaGrpcClient::make_client"))
986+
.await?;
987+
988+
let result = client
989+
.kv_read_v1(req)
990+
.timed_ge(threshold(), info_spent("client::kv_read_v1"))
991+
.await;
992+
993+
debug!(
994+
result = as_debug!(&result);
995+
"MetaGrpcClient::kv_read_v1 result, {}-th try", i
996+
);
997+
998+
if let Err(ref e) = result {
999+
if status_is_retryable(e) {
1000+
self.mark_as_unhealthy();
1001+
continue;
1002+
}
1003+
}
1004+
1005+
let strm = result?.into_inner();
1006+
1007+
return Ok(strm);
1008+
}
1009+
1010+
unreachable!("impossible to quit loop without error or success");
9331011
}
9341012

9351013
#[minitrace::trace]

src/meta/client/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ mod message;
2222

2323
pub use common_meta_api::reply::reply_to_api_result;
2424
pub use common_meta_api::reply::reply_to_meta_result;
25+
pub use grpc_action::MetaGrpcReadReq;
2526
pub use grpc_action::MetaGrpcReq;
2627
pub use grpc_action::RequestFor;
2728
pub use grpc_client::ClientHandle;
2829
pub use grpc_client::MetaGrpcClient;
2930
pub use message::ClientWorkerRequest;
31+
pub use message::Streamed;
3032
use once_cell::sync::Lazy;
3133
use semver::BuildMetadata;
3234
use semver::Prerelease;
@@ -67,6 +69,9 @@ pub static METACLI_COMMIT_SEMVER: Lazy<Version> = Lazy::new(|| {
6769
///
6870
/// - 2023-10-11: since 1.2.153:
6971
/// Meta service: add: pb::SeqV.meta field to support record expiration.
72+
///
73+
/// - 2023-10-17: since TODO(fill in when merged):
74+
/// Meta service: add: stream api: kv_read_v1().
7075
pub static MIN_METASRV_SEMVER: Version = Version {
7176
major: 1,
7277
minor: 1,

src/meta/client/src/message.rs

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use common_meta_kvapi::kvapi::UpsertKVReq;
2626
use common_meta_types::protobuf::meta_service_client::MetaServiceClient;
2727
use common_meta_types::protobuf::ClientInfo;
2828
use common_meta_types::protobuf::ExportedChunk;
29+
use common_meta_types::protobuf::StreamItem;
2930
use common_meta_types::protobuf::WatchRequest;
3031
use common_meta_types::protobuf::WatchResponse;
3132
use common_meta_types::MetaClientError;
@@ -34,6 +35,7 @@ use common_meta_types::TxnReply;
3435
use common_meta_types::TxnRequest;
3536
use tonic::codegen::InterceptedService;
3637
use tonic::transport::Channel;
38+
use tonic::Streaming;
3739

3840
use crate::grpc_client::AuthInterceptor;
3941

@@ -57,6 +59,16 @@ impl fmt::Debug for ClientWorkerRequest {
5759
}
5860
}
5961

62+
/// Mark an RPC to return a stream.
63+
#[derive(Debug, Clone)]
64+
pub struct Streamed<T>(pub T);
65+
66+
impl<T> Streamed<T> {
67+
pub fn into_inner(self) -> T {
68+
self.0
69+
}
70+
}
71+
6072
/// Meta-client handle-to-worker request body
6173
#[derive(Debug, Clone, derive_more::From)]
6274
pub enum Request {
@@ -67,7 +79,16 @@ pub enum Request {
6779
MGet(MGetKVReq),
6880

6981
/// List KVs by key prefix
70-
PrefixList(ListKVReq),
82+
List(ListKVReq),
83+
84+
/// Get KV, returning a stream
85+
StreamGet(Streamed<GetKVReq>),
86+
87+
/// Get multiple KV, returning a stream.
88+
StreamMGet(Streamed<MGetKVReq>),
89+
90+
/// List KVs by key prefix, returning a stream.
91+
StreamList(Streamed<ListKVReq>),
7192

7293
/// Update or insert KV
7394
Upsert(UpsertKVReq),
@@ -96,7 +117,10 @@ impl Request {
96117
match self {
97118
Request::Get(_) => "Get",
98119
Request::MGet(_) => "MGet",
99-
Request::PrefixList(_) => "PrefixList",
120+
Request::List(_) => "PrefixList",
121+
Request::StreamGet(_) => "StreamGet",
122+
Request::StreamMGet(_) => "StreamMGet",
123+
Request::StreamList(_) => "StreamPrefixList",
100124
Request::Upsert(_) => "Upsert",
101125
Request::Txn(_) => "Txn",
102126
Request::Watch(_) => "Watch",
@@ -113,7 +137,10 @@ impl Request {
113137
pub enum Response {
114138
Get(Result<GetKVReply, MetaError>),
115139
MGet(Result<MGetKVReply, MetaError>),
116-
PrefixList(Result<ListKVReply, MetaError>),
140+
List(Result<ListKVReply, MetaError>),
141+
StreamGet(Result<Streaming<StreamItem>, MetaError>),
142+
StreamMGet(Result<Streaming<StreamItem>, MetaError>),
143+
StreamList(Result<Streaming<StreamItem>, MetaError>),
117144
Upsert(Result<UpsertKVReply, MetaError>),
118145
Txn(Result<TxnReply, MetaError>),
119146
Watch(Result<tonic::codec::Streaming<WatchResponse>, MetaError>),
@@ -126,21 +153,6 @@ pub enum Response {
126153
}
127154

128155
impl Response {
129-
pub fn is_err(&self) -> bool {
130-
match self {
131-
Response::Get(res) => res.is_err(),
132-
Response::MGet(res) => res.is_err(),
133-
Response::PrefixList(res) => res.is_err(),
134-
Response::Upsert(res) => res.is_err(),
135-
Response::Txn(res) => res.is_err(),
136-
Response::Watch(res) => res.is_err(),
137-
Response::Export(res) => res.is_err(),
138-
Response::MakeClient(res) => res.is_err(),
139-
Response::GetEndpoints(res) => res.is_err(),
140-
Response::GetClientInfo(res) => res.is_err(),
141-
}
142-
}
143-
144156
pub fn err(&self) -> Option<&(dyn std::error::Error + 'static)> {
145157
let e = match self {
146158
Response::Get(res) => res
@@ -151,7 +163,19 @@ impl Response {
151163
.as_ref()
152164
.err()
153165
.map(|x| x as &(dyn std::error::Error + 'static)),
154-
Response::PrefixList(res) => res
166+
Response::List(res) => res
167+
.as_ref()
168+
.err()
169+
.map(|x| x as &(dyn std::error::Error + 'static)),
170+
Response::StreamGet(res) => res
171+
.as_ref()
172+
.err()
173+
.map(|x| x as &(dyn std::error::Error + 'static)),
174+
Response::StreamMGet(res) => res
175+
.as_ref()
176+
.err()
177+
.map(|x| x as &(dyn std::error::Error + 'static)),
178+
Response::StreamList(res) => res
155179
.as_ref()
156180
.err()
157181
.map(|x| x as &(dyn std::error::Error + 'static)),

0 commit comments

Comments
 (0)