Skip to content

Commit 4ee2dc2

Browse files
authored
fix: Respond with SnapshotMismatch error instead of internal error (#15391)
This fix updates OpenRaft to version 0.9.8 to address an issue where, upon receiving a `SnapshotMismatch` error, the snapshot should be resent from the beginning. This ensures that the leader can correctly reset and reinitiate the snapshot transmission process. And meta-service should return `SnapshotMismatch` error instead of returning an internal gRPC error.
1 parent 61293e2 commit 4ee2dc2

File tree

6 files changed

+89
-25
lines changed

6 files changed

+89
-25
lines changed

โ€ŽCargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€ŽCargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus
123123

124124
# openraft for debugging
125125
#openraft = { git = "https://github.com/drmingdrmer/openraft", branch = "release-0.9", features = [
126-
openraft = { version = "0.9.7", features = [
126+
openraft = { version = "0.9.8", features = [
127127
"serde",
128128
"tracing-log",
129129
"generic-snapshot-data",

โ€Žsrc/meta/service/src/meta_service/raft_service_impl.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -82,18 +82,14 @@ impl RaftServiceImpl {
8282

8383
let is_req = GrpcHelper::parse_req(request)?;
8484

85-
let resp = self
85+
let res = self
8686
.receive_chunked_snapshot(is_req)
8787
.timed(observe_snapshot_recv_spent(&addr))
88-
.await
89-
.map_err(GrpcHelper::internal_err);
88+
.await;
9089

91-
raft_metrics::network::incr_snapshot_recvfrom_result(addr.clone(), resp.is_ok());
90+
raft_metrics::network::incr_snapshot_recvfrom_result(addr.clone(), res.is_ok());
9291

93-
match resp {
94-
Ok(resp) => GrpcHelper::ok_response(resp),
95-
Err(e) => Err(e),
96-
}
92+
GrpcHelper::make_grpc_result(res)
9793
}
9894

9995
async fn do_install_snapshot_v1(
@@ -122,18 +118,14 @@ impl RaftServiceImpl {
122118
done: chunk.done,
123119
};
124120

125-
let resp = self
121+
let res = self
126122
.receive_chunked_snapshot(install_snapshot_req)
127123
.timed(observe_snapshot_recv_spent(&addr))
128-
.await
129-
.map_err(GrpcHelper::internal_err);
124+
.await;
130125

131-
raft_metrics::network::incr_snapshot_recvfrom_result(addr.clone(), resp.is_ok());
126+
raft_metrics::network::incr_snapshot_recvfrom_result(addr.clone(), res.is_ok());
132127

133-
match resp {
134-
Ok(resp) => GrpcHelper::ok_response(resp),
135-
Err(e) => Err(e),
136-
}
128+
GrpcHelper::make_grpc_result(res)
137129
}
138130

139131
/// Receive a chunk based snapshot from the leader.
@@ -227,7 +219,7 @@ impl RaftService for RaftServiceImpl {
227219
.await
228220
.map_err(GrpcHelper::internal_err)?;
229221

230-
GrpcHelper::ok_response(resp)
222+
GrpcHelper::ok_response(&resp)
231223
}
232224
.in_span(root)
233225
.await
@@ -260,7 +252,7 @@ impl RaftService for RaftServiceImpl {
260252

261253
let resp = raft.vote(v_req).await.map_err(GrpcHelper::internal_err)?;
262254

263-
GrpcHelper::ok_response(resp)
255+
GrpcHelper::ok_response(&resp)
264256
}
265257
.in_span(root)
266258
.await

โ€Žsrc/meta/service/tests/it/meta_node/meta_node_replication.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,25 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_meta_raft_store::state_machine::MetaSnapshotId;
1516
use databend_common_meta_sled_store::openraft::LogIdOptionExt;
1617
use databend_common_meta_sled_store::openraft::ServerState;
18+
use databend_common_meta_types::protobuf::SnapshotChunkRequest;
1719
use databend_common_meta_types::Cmd;
20+
use databend_common_meta_types::InstallSnapshotRequest;
1821
use databend_common_meta_types::LogEntry;
1922
use databend_common_meta_types::SeqV;
23+
use databend_common_meta_types::SnapshotMeta;
24+
use databend_common_meta_types::StoredMembership;
2025
use databend_common_meta_types::UpsertKV;
26+
use databend_common_meta_types::Vote;
2127
use databend_meta::meta_service::MetaNode;
2228
use log::info;
29+
use maplit::btreeset;
2330
use test_harness::test;
2431

2532
use crate::testing::meta_service_test_harness;
33+
use crate::tests::meta_node::start_meta_node_cluster;
2634
use crate::tests::meta_node::start_meta_node_non_voter;
2735
use crate::tests::meta_node::timeout;
2836
use crate::tests::service::MetaSrvTestContext;
@@ -135,3 +143,36 @@ async fn test_meta_node_snapshot_replication() -> anyhow::Result<()> {
135143

136144
Ok(())
137145
}
146+
147+
#[test(harness = meta_service_test_harness)]
148+
#[minitrace::trace]
149+
async fn test_raft_service_snapshot_id_mismatch() -> anyhow::Result<()> {
150+
// Test SnapshotIdMismatch error should be responded.
151+
152+
let (mut _nlog, mut tcs) = start_meta_node_cluster(btreeset![0], btreeset![]).await?;
153+
let tc0 = tcs.remove(0);
154+
155+
let mut client0 = tc0.raft_client().await?;
156+
let mut r1 = InstallSnapshotRequest {
157+
vote: Vote::new_committed(10, 2),
158+
meta: SnapshotMeta {
159+
last_log_id: None,
160+
last_membership: StoredMembership::default(),
161+
snapshot_id: MetaSnapshotId::new(None, 1).to_string(),
162+
},
163+
offset: 0,
164+
data: vec![1, 2, 3],
165+
done: false,
166+
};
167+
168+
let req = SnapshotChunkRequest::new_v1(r1.clone());
169+
client0.install_snapshot_v1(req).await?;
170+
171+
r1.meta.snapshot_id = MetaSnapshotId::new(None, 2).to_string();
172+
r1.offset = 3;
173+
let req = SnapshotChunkRequest::new_v1(r1);
174+
let res = client0.install_snapshot_v1(req).await;
175+
println!("res: {:?}", res);
176+
177+
Ok(())
178+
}

โ€Žsrc/meta/service/tests/it/tests/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ impl MetaSrvTestContext {
219219
let addr = self.config.raft_config.raft_api_addr().await?;
220220

221221
// retry 3 times until server starts listening.
222-
for _ in 0..4 {
222+
for _ in 0..3 {
223223
let client = RaftServiceClient::connect(format!("http://{}", addr)).await;
224224
match client {
225225
Ok(x) => return Ok(x),

โ€Žsrc/meta/types/src/grpc_helper.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,17 +144,48 @@ impl GrpcHelper {
144144
Self::parse(&raft_req.data)
145145
}
146146

147+
/// Make a gRPC response result with Ok, ApiError or internal error.
148+
pub fn make_grpc_result<D, E>(
149+
res: Result<D, RaftError<E>>,
150+
) -> Result<tonic::Response<RaftReply>, tonic::Status>
151+
where
152+
D: serde::Serialize + 'static,
153+
E: serde::Serialize + Error + 'static,
154+
{
155+
match res {
156+
Ok(resp) => GrpcHelper::ok_response(&resp),
157+
Err(e) => {
158+
if e.api_error().is_some() {
159+
GrpcHelper::err_response(&e)
160+
} else {
161+
Err(GrpcHelper::internal_err(e))
162+
}
163+
}
164+
}
165+
}
166+
147167
/// Create an Ok response for raft API.
148-
pub fn ok_response<D>(d: D) -> Result<tonic::Response<RaftReply>, tonic::Status>
149-
where D: serde::Serialize {
150-
let data = serde_json::to_string(&d).expect("fail to serialize resp");
168+
pub fn ok_response<D>(d: &D) -> Result<tonic::Response<RaftReply>, tonic::Status>
169+
where D: serde::Serialize + 'static {
170+
let data = serde_json::to_string(d).expect("fail to serialize resp");
151171
let reply = RaftReply {
152172
data,
153173
error: "".to_string(),
154174
};
155175
Ok(tonic::Response::new(reply))
156176
}
157177

178+
/// Create an Ok response contains API error.
179+
pub fn err_response<E>(e: &E) -> Result<tonic::Response<RaftReply>, tonic::Status>
180+
where E: serde::Serialize + 'static {
181+
let error = serde_json::to_string(e).expect("fail to serialize response error");
182+
let reply = RaftReply {
183+
data: "".to_string(),
184+
error,
185+
};
186+
Ok(tonic::Response::new(reply))
187+
}
188+
158189
/// Parse string and decode it into required type.
159190
pub fn parse<T>(s: &str) -> Result<T, tonic::Status>
160191
where T: serde::de::DeserializeOwned {

0 commit comments

Comments
ย (0)