Skip to content

Commit d9d2ad7

Browse files
authored
refactor: in snapshot-v2 protocol, move head to end to inform explicitly streaming end (#15399)
1 parent 95eb6df commit d9d2ad7

File tree

5 files changed

+205
-91
lines changed

5 files changed

+205
-91
lines changed

src/meta/service/src/meta_service/mod.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod errors;
16+
mod forwarder;
17+
mod meta_node_kv_api_impl;
18+
19+
pub(crate) mod snapshot_receiver;
20+
21+
pub mod meta_leader;
22+
pub mod meta_node;
23+
pub mod raft_service_impl;
24+
1525
pub use forwarder::MetaForwarder;
1626
pub use meta_node::MetaNode;
1727
pub use raft_service_impl::RaftServiceImpl;
@@ -20,10 +30,3 @@ pub use crate::message::ForwardRequest;
2030
pub use crate::message::ForwardRequestBody;
2131
pub use crate::message::JoinRequest;
2232
pub use crate::message::LeaveRequest;
23-
24-
mod errors;
25-
mod forwarder;
26-
pub mod meta_leader;
27-
pub mod meta_node;
28-
mod meta_node_kv_api_impl;
29-
pub mod raft_service_impl;

src/meta/service/src/meta_service/raft_service_impl.rs

Lines changed: 23 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use std::sync::Arc;
1919
use std::time::Duration;
2020

21-
use databend_common_base::base::tokio::io::AsyncWriteExt;
2221
use databend_common_base::base::tokio::sync::Mutex;
2322
use databend_common_base::future::TimingFutureExt;
2423
use databend_common_meta_client::MetaGrpcReadReq;
@@ -35,14 +34,11 @@ use databend_common_meta_types::InstallSnapshotError;
3534
use databend_common_meta_types::InstallSnapshotRequest;
3635
use databend_common_meta_types::InstallSnapshotResponse;
3736
use databend_common_meta_types::RaftError;
38-
use databend_common_meta_types::Snapshot;
3937
use databend_common_meta_types::SnapshotMeta;
4038
use databend_common_meta_types::TypeConfig;
4139
use databend_common_meta_types::Vote;
4240
use databend_common_metrics::count::Count;
4341
use futures::TryStreamExt;
44-
use log::debug;
45-
use log::info;
4642
use minitrace::full_name;
4743
use minitrace::prelude::*;
4844
use tonic::codegen::BoxStream;
@@ -53,6 +49,7 @@ use tonic::Streaming;
5349

5450
use crate::message::ForwardRequest;
5551
use crate::message::ForwardRequestBody;
52+
use crate::meta_service::snapshot_receiver::Receiver;
5653
use crate::meta_service::MetaNode;
5754
use crate::metrics::raft_metrics;
5855

@@ -168,93 +165,42 @@ impl RaftServiceImpl {
168165
) -> Result<Response<RaftReply>, Status> {
169166
let addr = remote_addr(&request);
170167

171-
let _g = snapshot_recv_inflight(&addr).counter_guard();
172-
173-
let mut strm = request.into_inner();
174-
175-
// Extract the first chunk to get the rpc_meta.
176-
let (format, req_vote, snapshot_meta) = {
177-
let Some(chunk) = strm.try_next().await? else {
178-
return Err(GrpcHelper::invalid_arg("empty snapshot stream"));
179-
};
180-
181-
let Some(meta) = &chunk.rpc_meta else {
182-
return Err(GrpcHelper::invalid_arg(
183-
"first SnapshotChunkRequestV2.rpc_meta is None",
184-
));
185-
};
186-
187-
if !chunk.chunk.is_empty() {
188-
return Err(GrpcHelper::invalid_arg(
189-
"first SnapshotChunkRequestV2.chunk should not contain any data",
190-
));
191-
}
192-
193-
let rpc_mta: (String, Vote, SnapshotMeta) = GrpcHelper::parse(meta)?;
194-
rpc_mta
195-
};
168+
let _guard = snapshot_recv_inflight(&addr).counter_guard();
196169

197-
// Snapshot format is not used for now.
198-
let _ = format;
199-
200-
info!(
201-
format :% = &format,
202-
req_vote :% = &req_vote,
203-
snapshot_meta :% = &snapshot_meta;
204-
"Begin receiving snapshot v2 stream from: {}",
205-
addr
206-
);
207-
208-
let mut snapshot_data = self
170+
let snapshot_data = self
209171
.meta_node
210172
.raft
211173
.begin_receiving_snapshot()
212174
.await
213175
.map_err(GrpcHelper::internal_err)?;
214176

215-
let mut ith = 0;
216-
let mut total_len = 0;
217-
while let Some(chunk) = strm.try_next().await? {
218-
let data_len = chunk.chunk.len() as u64;
219-
total_len += data_len;
220-
debug!(
221-
len = data_len,
222-
total_len = total_len;
223-
"received {ith}-th snapshot chunk from {addr}"
224-
);
225-
226-
ith += 1;
227-
if ith % 100 == 0 {
228-
info!(
229-
total_len = total_len;
230-
"received {ith}-th snapshot chunk from {addr}"
231-
);
232-
}
233-
234-
raft_metrics::network::incr_recvfrom_bytes(addr.clone(), data_len);
177+
let mut receiver = Receiver::new(&addr, snapshot_data);
235178

236-
snapshot_data.write_all(&chunk.chunk).await?;
237-
}
179+
let mut strm = request.into_inner();
238180

239-
snapshot_data.shutdown().await?;
181+
while let Some(chunk) = strm.try_next().await? {
182+
let snapshot = receiver.receive(chunk).await?;
240183

241-
let snapshot = Snapshot {
242-
meta: snapshot_meta,
243-
snapshot: snapshot_data,
244-
};
184+
if let Some((_format, req_vote, snapshot)) = snapshot {
185+
let res = self
186+
.meta_node
187+
.raft
188+
.install_full_snapshot(req_vote, snapshot)
189+
.await
190+
.map_err(GrpcHelper::internal_err);
245191

246-
let res = self
247-
.meta_node
248-
.raft
249-
.install_full_snapshot(req_vote, snapshot)
250-
.await
251-
.map_err(GrpcHelper::internal_err);
192+
raft_metrics::network::incr_snapshot_recvfrom_result(addr.clone(), res.is_ok());
252193

253-
raft_metrics::network::incr_snapshot_recvfrom_result(addr.clone(), res.is_ok());
194+
let resp = res?;
254195

255-
let resp = res?;
196+
return GrpcHelper::ok_response(&resp);
197+
}
198+
}
256199

257-
GrpcHelper::ok_response(&resp)
200+
Err(Status::invalid_argument(format!(
201+
"snapshot stream is closed without finishing: {}",
202+
receiver.stat_str()
203+
)))
258204
}
259205
}
260206

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt;
16+
17+
use databend_common_base::base::tokio::io::AsyncWriteExt;
18+
use databend_common_meta_types::protobuf::SnapshotChunkRequestV2;
19+
use databend_common_meta_types::Snapshot;
20+
use databend_common_meta_types::SnapshotData;
21+
use databend_common_meta_types::SnapshotMeta;
22+
use databend_common_meta_types::Vote;
23+
use log::debug;
24+
use log::info;
25+
use tonic::Status;
26+
27+
use crate::metrics::raft_metrics;
28+
29+
pub(crate) struct Receiver {
30+
remote_addr: String,
31+
32+
snapshot_data: Option<Box<SnapshotData>>,
33+
34+
/// number of bytes received.
35+
n_received: usize,
36+
37+
/// number of bytes received.
38+
size_received: usize,
39+
}
40+
41+
impl Receiver {
42+
/// Create a new snapshot receiver with an empty snapshot.
43+
pub(crate) fn new(remote_addr: impl ToString, snapshot_data: Box<SnapshotData>) -> Self {
44+
let remote_addr = remote_addr.to_string();
45+
info!("Begin receiving snapshot v2 stream from: {}", remote_addr);
46+
47+
Receiver {
48+
remote_addr,
49+
snapshot_data: Some(snapshot_data),
50+
n_received: 0,
51+
size_received: 0,
52+
}
53+
}
54+
55+
pub(crate) fn stat_str(&self) -> impl fmt::Display {
56+
format!(
57+
"received {} chunks, {} bytes from {}",
58+
self.n_received, self.size_received, self.remote_addr
59+
)
60+
}
61+
62+
pub(crate) async fn receive(
63+
&mut self,
64+
chunk: SnapshotChunkRequestV2,
65+
) -> Result<Option<(String, Vote, Snapshot)>, Status> {
66+
// 1. update stat
67+
self.update_stat(&chunk);
68+
69+
// 2. write chunk to local snapshot_data
70+
{
71+
let snapshot_data = self.snapshot_data.as_mut().ok_or_else(|| {
72+
Status::internal("snapshot_data is already shutdown when receiving snapshot chunk")
73+
})?;
74+
75+
snapshot_data.write_all(&chunk.chunk).await.map_err(|e| {
76+
Status::internal(format!(
77+
"{} when writing chunk to local temp snapshot_data",
78+
e
79+
))
80+
})?;
81+
}
82+
83+
// 3. if it is the last chunk, finish and return the snapshot.
84+
{
85+
let end = self.load_end(&chunk).map_err(|e| {
86+
Status::invalid_argument(format!("{} when loading last chunk rpc_meta", e))
87+
})?;
88+
89+
let Some((format, vote, snapshot_meta)) = end else {
90+
return Ok(None);
91+
};
92+
93+
info!(
94+
"snapshot from {} is completely received, format: {}, vote: {:?}, meta: {:?}, size: {}",
95+
self.remote_addr, format, vote, snapshot_meta, self.size_received
96+
);
97+
98+
// Safe unwrap: snapshot_data is guaranteed to be Some in the above code.
99+
let mut snapshot_data = self.snapshot_data.take().unwrap();
100+
101+
snapshot_data.shutdown().await.map_err(|e| {
102+
Status::internal(format!("{} when shutdown local temp snapshot_data", e))
103+
})?;
104+
105+
Ok(Some((format, vote, Snapshot {
106+
meta: snapshot_meta,
107+
snapshot: snapshot_data,
108+
})))
109+
}
110+
}
111+
112+
fn update_stat(&mut self, chunk: &SnapshotChunkRequestV2) {
113+
let data_len = chunk.chunk.len();
114+
self.n_received += 1;
115+
self.size_received += data_len;
116+
117+
debug!(
118+
len = data_len,
119+
total_len = self.size_received;
120+
"received {}-th snapshot chunk from {}",
121+
self.n_received, self.remote_addr
122+
);
123+
124+
if self.n_received % 100 == 0 {
125+
info!(
126+
total_len = self.size_received;
127+
"received {}-th snapshot chunk from {}",
128+
self.n_received, self.remote_addr
129+
);
130+
}
131+
132+
raft_metrics::network::incr_recvfrom_bytes(self.remote_addr.clone(), data_len as u64);
133+
}
134+
135+
/// Load meta data from the last chunk.
136+
fn load_end(
137+
&self,
138+
chunk: &SnapshotChunkRequestV2,
139+
) -> Result<Option<(String, Vote, SnapshotMeta)>, serde_json::Error> {
140+
let Some(meta) = &chunk.rpc_meta else {
141+
return Ok(None);
142+
};
143+
144+
let (format, vote, snapshot_meta): (String, Vote, SnapshotMeta) =
145+
serde_json::from_str(meta)?;
146+
147+
Ok(Some((format, vote, snapshot_meta)))
148+
}
149+
}

src/meta/service/tests/it/meta_node/meta_node_replication.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,12 @@ async fn test_raft_service_install_snapshot_v2() -> anyhow::Result<()> {
206206

207207
let mut client0 = tc0.raft_client().await?;
208208

209-
let last = log_id(10, 2, 4);
209+
let last_log_id = log_id(10, 2, 4);
210210

211211
let snapshot_meta = SnapshotMeta {
212-
last_log_id: Some(last),
212+
last_log_id: Some(last_log_id),
213213
last_membership: StoredMembership::default(),
214-
snapshot_id: MetaSnapshotId::new(Some(last), 1).to_string(),
214+
snapshot_id: MetaSnapshotId::new(Some(last_log_id), 1).to_string(),
215215
};
216216

217217
let data = [
@@ -220,13 +220,15 @@ async fn test_raft_service_install_snapshot_v2() -> anyhow::Result<()> {
220220
r#"{"StateMachineMeta":{"key":"LastMembership","value":{"Membership":{"log_id":{"leader_id":{"term":3,"node_id":3},"index":3},"membership":{"configs":[],"nodes":{}}}}}}"#,
221221
];
222222

223+
// Complete transmit
224+
223225
let strm_data = [
224-
SnapshotChunkRequestV2::new_head(Vote::new_committed(10, 2), snapshot_meta),
225226
SnapshotChunkRequestV2::new_chunk(data[0].to_bytes().to_vec()),
226227
SnapshotChunkRequestV2::new_chunk("\n".as_bytes().to_vec()),
227228
SnapshotChunkRequestV2::new_chunk(data[1].to_bytes().to_vec()),
228229
SnapshotChunkRequestV2::new_chunk("\n".as_bytes().to_vec()),
229230
SnapshotChunkRequestV2::new_chunk(data[2].to_bytes().to_vec()),
231+
SnapshotChunkRequestV2::new_end_chunk(Vote::new_committed(10, 2), snapshot_meta),
230232
];
231233

232234
let resp = client0.install_snapshot_v2(stream::iter(strm_data)).await?;
@@ -239,7 +241,21 @@ async fn test_raft_service_install_snapshot_v2() -> anyhow::Result<()> {
239241
let meta_node = tc0.meta_node.as_ref().unwrap();
240242
let m = meta_node.raft.metrics().borrow().clone();
241243

242-
assert_eq!(Some(last), m.snapshot);
244+
assert_eq!(Some(last_log_id), m.snapshot);
245+
246+
// Incomplete
247+
248+
let strm_data = [
249+
SnapshotChunkRequestV2::new_chunk(data[0].to_bytes().to_vec()),
250+
SnapshotChunkRequestV2::new_chunk("\n".as_bytes().to_vec()),
251+
];
252+
253+
let err = client0
254+
.install_snapshot_v2(stream::iter(strm_data))
255+
.await
256+
.unwrap_err();
257+
258+
assert_eq!(err.code(), tonic::Code::InvalidArgument);
243259

244260
Ok(())
245261
}

src/meta/types/src/proto_ext/snapshot_chunk_request_ext.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ impl SnapshotChunkRequest {
4444
}
4545

4646
impl SnapshotChunkRequestV2 {
47-
/// Build the first chunk of a snapshot stream, which contains vote and snapshot meta, without data.
48-
pub fn new_head(vote: Vote, snapshot_meta: SnapshotMeta) -> Self {
47+
/// Build the last chunk of a snapshot stream, which contains vote and snapshot meta, without data.
48+
pub fn new_end_chunk(vote: Vote, snapshot_meta: SnapshotMeta) -> Self {
4949
let meta = ("ndjson".to_string(), vote, snapshot_meta);
5050
let rpc_meta = serde_json::to_string(&meta).unwrap();
5151

0 commit comments

Comments
 (0)