Skip to content

Commit a8105f5

Browse files
authored
feat(meta): add VoteV001 RPC with native protobuf types (#18396)
* feat(meta): add VoteV001 RPC with native protobuf types - Add VoteRequest and VoteResponse protobuf messages to raft.proto - Implement VoteV001 RPC endpoint alongside existing Vote RPC - Add conversion traits between protobuf and internal types - Implement backward compatibility with fallback to legacy Vote RPC - Update version compatibility matrix for new RPC method * chore: update change log of meta-service server * chore: fix lint * chore: fix lint
1 parent 26c007f commit a8105f5

File tree

9 files changed

+154
-8
lines changed

9 files changed

+154
-8
lines changed

src/meta/client/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,10 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
159159
/// - 2025-07-01: since TODO: add when enables sequence storage v1
160160
/// 👥 client: new sequence API v1: depends on `FetchAddU64`.
161161
///
162-
/// - 2025-07-03: since TODO: add when merged
162+
/// - 2025-07-03: since 1.2.770
163163
/// 🖥 server: adaptive `expire_at` support both seconds and milliseconds.
164164
///
165-
/// - 2025-07-04: since TODO: add when merged
165+
/// - 2025-07-04: since 1.2.770
166166
/// 🖥 server: add `PutSequential`.
167167
///
168168
/// Server feature set:

src/meta/service/src/meta_service/raft_service_impl.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,41 @@ impl RaftService for RaftServiceImpl {
291291
.await
292292
}
293293

294+
async fn vote_v001(
295+
&self,
296+
request: Request<pb::VoteRequest>,
297+
) -> Result<Response<pb::VoteResponse>, Status> {
298+
let root = databend_common_tracing::start_trace_for_remote_request(func_path!(), &request);
299+
let remote_addr = remote_addr(&request);
300+
301+
async {
302+
let v_req_pb = request.into_inner();
303+
304+
let v_req: VoteRequest = v_req_pb.into();
305+
306+
let v_req_summary = v_req.summary();
307+
308+
info!(
309+
"RaftServiceImpl::vote_v001: from:{remote_addr} start: {}",
310+
v_req_summary
311+
);
312+
313+
let raft = &self.meta_node.raft;
314+
315+
let resp = raft.vote(v_req).await.map_err(GrpcHelper::internal_err)?;
316+
317+
info!(
318+
"RaftServiceImpl::vote_v001: from:{remote_addr} done: {}",
319+
v_req_summary
320+
);
321+
322+
let resp_pb = pb::VoteResponse::from(resp);
323+
Ok(Response::new(resp_pb))
324+
}
325+
.in_span(root)
326+
.await
327+
}
328+
294329
async fn transfer_leader(
295330
&self,
296331
request: Request<pb::TransferLeaderRequest>,

src/meta/service/src/network.rs

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -601,18 +601,47 @@ impl RaftNetworkV2<TypeConfig> for Network {
601601
) -> Result<VoteResponse, RPCError> {
602602
info!(id = self.id, target = self.target, rpc = rpc.summary(); "send_vote");
603603

604-
let raft_req = GrpcHelper::encode_raft_request(&rpc).map_err(|e| Unreachable::new(&e))?;
604+
let mut client = self
605+
.take_client()
606+
.log_elapsed_debug("Raft NetworkConnection vote take_client()")
607+
.await?;
608+
609+
// First, try VoteV001 with native protobuf types
610+
let vote_req_pb = pb::VoteRequest::from(rpc.clone());
611+
let req_v001 = GrpcHelper::traced_req(vote_req_pb);
612+
613+
let grpc_res_v001 = client.vote_v001(req_v001).await;
614+
info!(
615+
"vote_v001: resp from target={} {:?}",
616+
self.target, grpc_res_v001
617+
);
618+
619+
match grpc_res_v001 {
620+
Ok(response) => {
621+
// VoteV001 succeeded, parse the VoteResponse directly
622+
self.client.lock().await.replace(client);
623+
let vote_response = response.into_inner();
624+
let vote_resp: VoteResponse = vote_response.into();
625+
return Ok(vote_resp);
626+
}
627+
Err(e) => {
628+
// Only fall back for specific status codes indicating method not implemented
629+
if matches!(e.code(), tonic::Code::Unimplemented | tonic::Code::NotFound) {
630+
warn!(target = self.target, rpc = rpc.summary(); "vote_v001 not implemented, falling back to vote: {}", e);
631+
} else {
632+
// For other errors, don't fall back - return the error
633+
return Err(RPCError::Unreachable(self.status_to_unreachable(e.clone())));
634+
}
635+
}
636+
}
605637

638+
// Fallback to old Vote RPC using RaftRequest
639+
let raft_req = GrpcHelper::encode_raft_request(&rpc).map_err(|e| Unreachable::new(&e))?;
606640
let req = GrpcHelper::traced_req(raft_req);
607641

608642
let bytes = req.get_ref().data.len() as u64;
609643
raft_metrics::network::incr_sendto_bytes(&self.target, bytes);
610644

611-
let mut client = self
612-
.take_client()
613-
.log_elapsed_debug("Raft NetworkConnection vote take_client()")
614-
.await?;
615-
616645
let grpc_res = client.vote(req).await;
617646
info!("vote: resp from target={} {:?}", self.target, grpc_res);
618647

src/meta/service/src/version.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ pub(crate) mod raft {
118118
del_provide(("install_snapshot", 2), "2024-07-02", (1, 2, 552)),
119119
add_provide(("install_snapshot", 3), "2024-07-02", (1, 2, 552)),
120120
del_provide(("install_snapshot", 1), "2025-07-02", (1, 2, 769)),
121+
add_provide(("vote", 1), "2025-07-20", (1, 0, 0)), // TODO: fix the version
121122
];
122123

123124
/// The client features that raft server depends on.

src/meta/types/proto/meta.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ service RaftService {
353353
// Added in 1.2.547, 2024-06-27
354354
rpc InstallSnapshotV003(stream SnapshotChunkRequestV003) returns (SnapshotResponseV003);
355355
rpc Vote(RaftRequest) returns (RaftReply);
356+
rpc VoteV001(VoteRequest) returns (VoteResponse);
356357
rpc TransferLeader(TransferLeaderRequest) returns (Empty);
357358
}
358359

src/meta/types/proto/raft.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,15 @@ message LogId {
3434
uint64 term = 1;
3535
uint64 node_id = 2;
3636
uint64 index = 3;
37+
}
38+
39+
message VoteRequest {
40+
Vote vote = 1;
41+
LogId last_log_id = 2;
42+
}
43+
44+
message VoteResponse {
45+
Vote vote = 1;
46+
bool vote_granted = 2;
47+
LogId last_log_id = 3;
3748
}

src/meta/types/src/proto_ext/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,5 @@ mod txn_delete_request_ext;
3939
mod txn_delete_response_ext;
4040
mod txn_put_request_ext;
4141
mod txn_put_response_ext;
42+
mod vote_request_ext;
43+
mod vote_response_ext;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::protobuf as pb;
16+
use crate::raft_types;
17+
18+
impl From<raft_types::VoteRequest> for pb::VoteRequest {
19+
fn from(req: raft_types::VoteRequest) -> Self {
20+
pb::VoteRequest {
21+
vote: Some(req.vote.into()),
22+
last_log_id: req.last_log_id.map(|log_id| log_id.into()),
23+
}
24+
}
25+
}
26+
27+
impl From<pb::VoteRequest> for raft_types::VoteRequest {
28+
fn from(req: pb::VoteRequest) -> Self {
29+
let vote: raft_types::Vote = req.vote.unwrap_or_default().into();
30+
let last_log_id = req.last_log_id.map(|log_id| log_id.into());
31+
raft_types::VoteRequest::new(vote, last_log_id)
32+
}
33+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::protobuf as pb;
16+
use crate::raft_types;
17+
18+
impl From<raft_types::VoteResponse> for pb::VoteResponse {
19+
fn from(resp: raft_types::VoteResponse) -> Self {
20+
pb::VoteResponse {
21+
vote: Some(resp.vote.into()),
22+
vote_granted: resp.vote_granted,
23+
last_log_id: resp.last_log_id.map(|log_id| log_id.into()),
24+
}
25+
}
26+
}
27+
28+
impl From<pb::VoteResponse> for raft_types::VoteResponse {
29+
fn from(resp: pb::VoteResponse) -> Self {
30+
let vote: raft_types::Vote = resp.vote.unwrap_or_default().into();
31+
let last_log_id = resp.last_log_id.map(|log_id| log_id.into());
32+
raft_types::VoteResponse::new(vote, last_log_id, resp.vote_granted)
33+
}
34+
}

0 commit comments

Comments
 (0)