Skip to content

Commit d01fa36

Browse files
authored
feat: Add: stream based snapshot API install_snapshot_v2() to meta-service (#15395)
feature: Add: stream based snapshot API install_snapshot_v2() to meta-service
1 parent 9a6e320 commit d01fa36

File tree

9 files changed

+233
-7
lines changed

9 files changed

+233
-7
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus
123123

124124
# openraft for debugging
125125
#openraft = { git = "https://github.com/drmingdrmer/openraft", branch = "release-0.9", features = [
126-
openraft = { version = "0.9.8", features = [
126+
openraft = { version = "0.9.9", features = [
127127
"serde",
128128
"tracing-log",
129129
"generic-snapshot-data",

src/meta/service/src/meta_service/raft_service_impl.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use std::sync::Arc;
1919
use std::time::Duration;
2020

21+
use databend_common_base::base::tokio::io::AsyncWriteExt;
2122
use databend_common_base::base::tokio::sync::Mutex;
2223
use databend_common_base::future::TimingFutureExt;
2324
use databend_common_meta_client::MetaGrpcReadReq;
@@ -27,22 +28,28 @@ use databend_common_meta_types::protobuf::raft_service_server::RaftService;
2728
use databend_common_meta_types::protobuf::RaftReply;
2829
use databend_common_meta_types::protobuf::RaftRequest;
2930
use databend_common_meta_types::protobuf::SnapshotChunkRequest;
31+
use databend_common_meta_types::protobuf::SnapshotChunkRequestV2;
3032
use databend_common_meta_types::protobuf::StreamItem;
3133
use databend_common_meta_types::GrpcHelper;
3234
use databend_common_meta_types::InstallSnapshotError;
3335
use databend_common_meta_types::InstallSnapshotRequest;
3436
use databend_common_meta_types::InstallSnapshotResponse;
3537
use databend_common_meta_types::RaftError;
38+
use databend_common_meta_types::Snapshot;
3639
use databend_common_meta_types::SnapshotMeta;
3740
use databend_common_meta_types::TypeConfig;
3841
use databend_common_meta_types::Vote;
3942
use databend_common_metrics::count::Count;
43+
use futures::TryStreamExt;
44+
use log::debug;
45+
use log::info;
4046
use minitrace::full_name;
4147
use minitrace::prelude::*;
4248
use tonic::codegen::BoxStream;
4349
use tonic::Request;
4450
use tonic::Response;
4551
use tonic::Status;
52+
use tonic::Streaming;
4653

4754
use crate::message::ForwardRequest;
4855
use crate::message::ForwardRequestBody;
@@ -154,6 +161,101 @@ impl RaftServiceImpl {
154161
Ok(resp)
155162
}
156163
}
164+
165+
async fn do_install_snapshot_v2(
166+
&self,
167+
request: Request<Streaming<SnapshotChunkRequestV2>>,
168+
) -> Result<Response<RaftReply>, Status> {
169+
let addr = remote_addr(&request);
170+
171+
let _g = snapshot_recv_inflight(&addr).counter_guard();
172+
173+
let mut strm = request.into_inner();
174+
175+
// Extract the first chunk to get the rpc_meta.
176+
let (format, req_vote, snapshot_meta) = {
177+
let Some(chunk) = strm.try_next().await? else {
178+
return Err(GrpcHelper::invalid_arg("empty snapshot stream"));
179+
};
180+
181+
let Some(meta) = &chunk.rpc_meta else {
182+
return Err(GrpcHelper::invalid_arg(
183+
"first SnapshotChunkRequestV2.rpc_meta is None",
184+
));
185+
};
186+
187+
if !chunk.chunk.is_empty() {
188+
return Err(GrpcHelper::invalid_arg(
189+
"first SnapshotChunkRequestV2.chunk should not contain any data",
190+
));
191+
}
192+
193+
let rpc_mta: (String, Vote, SnapshotMeta) = GrpcHelper::parse(meta)?;
194+
rpc_mta
195+
};
196+
197+
// Snapshot format is not used for now.
198+
let _ = format;
199+
200+
info!(
201+
format :% = &format,
202+
req_vote :% = &req_vote,
203+
snapshot_meta :% = &snapshot_meta;
204+
"Begin receiving snapshot v2 stream from: {}",
205+
addr
206+
);
207+
208+
let mut snapshot_data = self
209+
.meta_node
210+
.raft
211+
.begin_receiving_snapshot()
212+
.await
213+
.map_err(GrpcHelper::internal_err)?;
214+
215+
let mut ith = 0;
216+
let mut total_len = 0;
217+
while let Some(chunk) = strm.try_next().await? {
218+
let data_len = chunk.chunk.len() as u64;
219+
total_len += data_len;
220+
debug!(
221+
len = data_len,
222+
total_len = total_len;
223+
"received {ith}-th snapshot chunk from {addr}"
224+
);
225+
226+
ith += 1;
227+
if ith % 100 == 0 {
228+
info!(
229+
total_len = total_len;
230+
"received {ith}-th snapshot chunk from {addr}"
231+
);
232+
}
233+
234+
raft_metrics::network::incr_recvfrom_bytes(addr.clone(), data_len);
235+
236+
snapshot_data.write_all(&chunk.chunk).await?;
237+
}
238+
239+
snapshot_data.shutdown().await?;
240+
241+
let snapshot = Snapshot {
242+
meta: snapshot_meta,
243+
snapshot: snapshot_data,
244+
};
245+
246+
let res = self
247+
.meta_node
248+
.raft
249+
.install_full_snapshot(req_vote, snapshot)
250+
.await
251+
.map_err(GrpcHelper::internal_err);
252+
253+
raft_metrics::network::incr_snapshot_recvfrom_result(addr.clone(), res.is_ok());
254+
255+
let resp = res?;
256+
257+
GrpcHelper::ok_response(&resp)
258+
}
157259
}
158260

159261
#[async_trait::async_trait]
@@ -241,6 +343,14 @@ impl RaftService for RaftServiceImpl {
241343
self.do_install_snapshot_v1(request).in_span(root).await
242344
}
243345

346+
async fn install_snapshot_v2(
347+
&self,
348+
request: Request<Streaming<SnapshotChunkRequestV2>>,
349+
) -> Result<Response<RaftReply>, Status> {
350+
let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
351+
self.do_install_snapshot_v2(request).in_span(root).await
352+
}
353+
244354
async fn vote(&self, request: Request<RaftRequest>) -> Result<Response<RaftReply>, Status> {
245355
let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
246356

src/meta/service/tests/it/meta_node/meta_node_replication.rs

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,27 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_arrow::arrow::array::ViewType;
1516
use databend_common_meta_raft_store::state_machine::MetaSnapshotId;
17+
use databend_common_meta_sled_store::openraft::error::SnapshotMismatch;
18+
use databend_common_meta_sled_store::openraft::testing::log_id;
1619
use databend_common_meta_sled_store::openraft::LogIdOptionExt;
1720
use databend_common_meta_sled_store::openraft::ServerState;
1821
use databend_common_meta_types::protobuf::SnapshotChunkRequest;
22+
use databend_common_meta_types::protobuf::SnapshotChunkRequestV2;
1923
use databend_common_meta_types::Cmd;
24+
use databend_common_meta_types::InstallSnapshotError;
2025
use databend_common_meta_types::InstallSnapshotRequest;
2126
use databend_common_meta_types::LogEntry;
27+
use databend_common_meta_types::RaftError;
2228
use databend_common_meta_types::SeqV;
2329
use databend_common_meta_types::SnapshotMeta;
30+
use databend_common_meta_types::SnapshotResponse;
2431
use databend_common_meta_types::StoredMembership;
2532
use databend_common_meta_types::UpsertKV;
2633
use databend_common_meta_types::Vote;
2734
use databend_meta::meta_service::MetaNode;
35+
use futures::stream;
2836
use log::info;
2937
use maplit::btreeset;
3038
use test_harness::test;
@@ -171,8 +179,67 @@ async fn test_raft_service_snapshot_id_mismatch() -> anyhow::Result<()> {
171179
r1.meta.snapshot_id = MetaSnapshotId::new(None, 2).to_string();
172180
r1.offset = 3;
173181
let req = SnapshotChunkRequest::new_v1(r1);
174-
let res = client0.install_snapshot_v1(req).await;
175-
println!("res: {:?}", res);
182+
let resp = client0.install_snapshot_v1(req).await?;
183+
184+
let reply = resp.into_inner();
185+
186+
let err: RaftError<InstallSnapshotError> = serde_json::from_str(&reply.error)?;
187+
188+
assert_eq!(
189+
err.api_error().unwrap(),
190+
&InstallSnapshotError::SnapshotMismatch(SnapshotMismatch {
191+
expect: (MetaSnapshotId::new(None, 2), 0).into(),
192+
got: (MetaSnapshotId::new(None, 2), 3).into(),
193+
})
194+
);
195+
196+
Ok(())
197+
}
198+
199+
#[test(harness = meta_service_test_harness)]
200+
#[minitrace::trace]
201+
async fn test_raft_service_install_snapshot_v2() -> anyhow::Result<()> {
202+
// Transmit snapshot in one-piece in a stream via API install_snapshot_v2.
203+
204+
let (_nlog, mut tcs) = start_meta_node_cluster(btreeset![0], btreeset![]).await?;
205+
let tc0 = tcs.remove(0);
206+
207+
let mut client0 = tc0.raft_client().await?;
208+
209+
let last = log_id(10, 2, 4);
210+
211+
let snapshot_meta = SnapshotMeta {
212+
last_log_id: Some(last),
213+
last_membership: StoredMembership::default(),
214+
snapshot_id: MetaSnapshotId::new(Some(last), 1).to_string(),
215+
};
216+
217+
let data = [
218+
r#"{"DataHeader":{"key":"header","value":{"version":"V002","upgrading":null}}}"#,
219+
r#"{"StateMachineMeta":{"key":"LastApplied","value":{"LogId":{"leader_id":{"term":10,"node_id":2},"index":4}}}}"#,
220+
r#"{"StateMachineMeta":{"key":"LastMembership","value":{"Membership":{"log_id":{"leader_id":{"term":3,"node_id":3},"index":3},"membership":{"configs":[],"nodes":{}}}}}}"#,
221+
];
222+
223+
let strm_data = [
224+
SnapshotChunkRequestV2::new_head(Vote::new_committed(10, 2), snapshot_meta),
225+
SnapshotChunkRequestV2::new_chunk(data[0].to_bytes().to_vec()),
226+
SnapshotChunkRequestV2::new_chunk("\n".as_bytes().to_vec()),
227+
SnapshotChunkRequestV2::new_chunk(data[1].to_bytes().to_vec()),
228+
SnapshotChunkRequestV2::new_chunk("\n".as_bytes().to_vec()),
229+
SnapshotChunkRequestV2::new_chunk(data[2].to_bytes().to_vec()),
230+
];
231+
232+
let resp = client0.install_snapshot_v2(stream::iter(strm_data)).await?;
233+
let reply = resp.into_inner();
234+
235+
let resp: SnapshotResponse = serde_json::from_str(&reply.data)?;
236+
237+
assert_eq!(resp.vote, Vote::new_committed(10, 2));
238+
239+
let meta_node = tc0.meta_node.as_ref().unwrap();
240+
let m = meta_node.raft.metrics().borrow().clone();
241+
242+
assert_eq!(Some(last), m.snapshot);
176243

177244
Ok(())
178245
}

src/meta/service/tests/it/testing.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ fn setup_test() {
5858
let t = tempfile::tempdir().expect("create temp dir to sled db");
5959
databend_common_meta_sled_store::init_temp_sled_db(t);
6060

61-
let guards = init_logging("meta_unittests", &Config::new_testing(), BTreeMap::new());
61+
let mut config = Config::new_testing();
62+
config.file.prefix_filter = "".to_string();
63+
64+
let guards = init_logging("meta_unittests", &config, BTreeMap::new());
6265
Box::leak(Box::new(guards));
6366
});
6467
}

src/meta/types/proto/meta.proto

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ message SnapshotChunkRequest {
198198

199199
// json serialized meta data, including vote and snapshot_meta.
200200
// ```text
201-
// { vote: Vote, meta: SnapshotMeta }
201+
// (Vote, SnapshotMeta)
202202
// ```
203203
string rpc_meta = 10;
204204

@@ -211,6 +211,25 @@ message SnapshotChunkV1 {
211211
bytes data = 12;
212212
}
213213

214+
// The item of snapshot chunk stream.
215+
//
216+
// The first item contains `rpc_meta`,
217+
// including the application defined format of this snapshot data,
218+
// the leader vote and snapshot-meta.
219+
//
220+
// Since the second item, the `rpc_meta` should be empty and will be ignored by the receiving end.
221+
message SnapshotChunkRequestV2 {
222+
223+
// json serialized meta data, including vote and snapshot_meta.
224+
// ```text
225+
// (SnapshotFormat, Vote, SnapshotMeta)
226+
// ```
227+
optional string rpc_meta = 10;
228+
229+
// Snapshot data chunk
230+
bytes chunk = 20;
231+
}
232+
214233
service RaftService {
215234

216235
// Forward a request to another node.
@@ -225,6 +244,7 @@ service RaftService {
225244
rpc AppendEntries(RaftRequest) returns (RaftReply);
226245
rpc InstallSnapshot(RaftRequest) returns (RaftReply);
227246
rpc InstallSnapshotV1(SnapshotChunkRequest) returns (RaftReply);
247+
rpc InstallSnapshotV2(stream SnapshotChunkRequestV2) returns (RaftReply);
228248
rpc Vote(RaftRequest) returns (RaftReply);
229249
}
230250

src/meta/types/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ pub use crate::raft_types::RaftMetrics;
147147
pub use crate::raft_types::RemoteError;
148148
pub use crate::raft_types::Snapshot;
149149
pub use crate::raft_types::SnapshotMeta;
150+
pub use crate::raft_types::SnapshotMismatch;
150151
pub use crate::raft_types::SnapshotResponse;
151152
pub use crate::raft_types::StorageError;
152153
pub use crate::raft_types::StorageIOError;

src/meta/types/src/proto_ext/snapshot_chunk_request_ext.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313
// limitations under the License.
1414

1515
use crate::protobuf::SnapshotChunkRequest;
16+
use crate::protobuf::SnapshotChunkRequestV2;
1617
use crate::protobuf::SnapshotChunkV1;
1718
use crate::InstallSnapshotRequest;
19+
use crate::SnapshotMeta;
20+
use crate::Vote;
1821

1922
impl SnapshotChunkRequest {
2023
/// Return the length of the data in the chunk.
@@ -39,3 +42,24 @@ impl SnapshotChunkRequest {
3942
}
4043
}
4144
}
45+
46+
impl SnapshotChunkRequestV2 {
47+
/// Build the first chunk of a snapshot stream, which contains vote and snapshot meta, without data.
48+
pub fn new_head(vote: Vote, snapshot_meta: SnapshotMeta) -> Self {
49+
let meta = ("ndjson".to_string(), vote, snapshot_meta);
50+
let rpc_meta = serde_json::to_string(&meta).unwrap();
51+
52+
SnapshotChunkRequestV2 {
53+
rpc_meta: Some(rpc_meta),
54+
chunk: vec![],
55+
}
56+
}
57+
58+
/// Build a chunk item with data.
59+
pub fn new_chunk(chunk: Vec<u8>) -> Self {
60+
SnapshotChunkRequestV2 {
61+
rpc_meta: None,
62+
chunk,
63+
}
64+
}
65+
}

src/meta/types/src/raft_types.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ pub type InstallSnapshotRequest = openraft::raft::InstallSnapshotRequest<TypeCon
7777
pub type InstallSnapshotResponse = openraft::raft::InstallSnapshotResponse<NodeId>;
7878
pub type SnapshotResponse = openraft::raft::SnapshotResponse<NodeId>;
7979
pub type InstallSnapshotError = openraft::error::InstallSnapshotError;
80+
pub type SnapshotMismatch = openraft::error::SnapshotMismatch;
8081
pub type VoteRequest = openraft::raft::VoteRequest<NodeId>;
8182
pub type VoteResponse = openraft::raft::VoteResponse<NodeId>;
8283

0 commit comments

Comments
 (0)