Skip to content

Commit 0bd9509

Browse files
authored
refactor: MetaGrpcClient needs to specify timeout to auto-reconnect to restarted server (#16733)
Before c44ea20, MetaGrpcClient just auto reconnect if the target meta-service server is restarted. After c44ea20, the client behavior changed, if no timeout is set, the client reconnect to the restarted server but it hangs forever. In this commit, in the test: test_kv_api_restart_cluster_token_expired, give the client a fixed timeout to survive a server restart. - Fix: #16704 (comment)
1 parent 4381967 commit 0bd9509

File tree

3 files changed

+126
-105
lines changed

3 files changed

+126
-105
lines changed

โ€Žsrc/meta/service/src/api/grpc_server.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,19 @@ impl GrpcServer {
100100
.add_service(grpc_srv)
101101
.serve_with_shutdown(addr, async move {
102102
let _ = started_tx.send(());
103-
info!("metasrv starts to wait for stop signal: {}", addr);
103+
info!(
104+
"meta-service gRPC(on {}) starts to wait for stop signal",
105+
addr
106+
);
104107
let _ = stop_rx.await;
105-
info!("metasrv receives stop signal: {}", addr);
108+
info!("meta-service gRPC(on {}) receives stop signal", addr);
106109
})
107110
.await;
108111

109-
info!("grpc task returned res: {:?}", res);
112+
info!(
113+
"meta-service gRPC(on {}) task returned res: {:?}",
114+
addr, res
115+
);
110116
}
111117
.in_span(Span::enter_with_local_parent("spawn-grpc")),
112118
);

โ€Žsrc/meta/service/src/meta_service/meta_node.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ impl MetaNode {
287287
srv.serve_with_shutdown(socket_addr, async move {
288288
let _ = running_rx.changed().await;
289289
info!(
290-
"signal received, shutting down: id={} {} ",
290+
"running_rx for Raft server received, shutting down: id={} {} ",
291291
node_id, ip_port
292292
);
293293
})
@@ -422,7 +422,10 @@ impl MetaNode {
422422

423423
if let Err(changed_err) = changed {
424424
// Shutting down.
425-
error!("{} when watching metrics_rx", changed_err);
425+
info!(
426+
"{}; when:(watching metrics_rx); quit subscribe_metrics() loop",
427+
changed_err
428+
);
426429
break;
427430
}
428431

โ€Žsrc/meta/service/tests/it/grpc/metasrv_grpc_kv_api_restart_cluster.rs

Lines changed: 112 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
use std::time::Duration;
1919

2020
use databend_common_base::base::Stoppable;
21-
// use databend_common_meta_client::ClientHandle;
22-
// use databend_common_meta_client::MetaGrpcClient;
21+
use databend_common_meta_client::ClientHandle;
22+
use databend_common_meta_client::MetaGrpcClient;
2323
use databend_common_meta_kvapi::kvapi::KVApi;
2424
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
2525
use log::info;
@@ -116,108 +116,120 @@ async fn test_kv_api_restart_cluster_write_read() -> anyhow::Result<()> {
116116
Ok(())
117117
}
118118

119-
// FIXME: Disable this test until https://github.com/databendlabs/databend/pull/16704/#issuecomment-2442094481 addressed.
120119
/// - Start a cluster of 3.
121120
/// - Test upsert kv and read on different nodes.
122121
/// - Stop and restart the cluster.
123122
/// - Test read kv using same grpc client.
124-
// #[test(harness = meta_service_test_harness)]
125-
// #[fastrace::trace]
126-
// async fn test_kv_api_restart_cluster_token_expired() -> anyhow::Result<()> {
127-
// fn make_key(tc: &MetaSrvTestContext, k: impl std::fmt::Display) -> String {
128-
// let x = &tc.config.raft_config;
129-
// format!("t-restart-cluster-{}-{}-{}", x.config_id, x.id, k)
130-
// }
131-
//
132-
// async fn test_write_read_on_every_node(
133-
// tcs: &[MetaSrvTestContext],
134-
// client: &ClientHandle,
135-
// key_suffix: &str,
136-
// ) -> anyhow::Result<()> {
137-
// info!("--- test write on every node: {}", key_suffix);
138-
//
139-
// for (i, tc) in tcs.iter().enumerate() {
140-
// let k = make_key(tc, key_suffix);
141-
// if i == 0 {
142-
// let res = client.upsert_kv(UpsertKVReq::update(&k, &b(&k))).await?;
143-
// info!("--- upsert res: {:?}", res);
144-
// } else {
145-
// let client = tc.grpc_client().await.unwrap();
146-
// let res = client.upsert_kv(UpsertKVReq::update(&k, &b(&k))).await?;
147-
// info!("--- upsert res: {:?}", res);
148-
// }
149-
//
150-
// let res = client.get_kv(&k).await?;
151-
// let res = res.unwrap();
152-
//
153-
// assert_eq!(k.into_bytes(), res.data);
154-
// }
155-
//
156-
// Ok(())
157-
// }
158-
//
159-
// let tcs = start_metasrv_cluster(&[0, 1, 2]).await?;
160-
// let client = MetaGrpcClient::try_create(
161-
// vec![tcs[0].config.grpc_api_address.clone()],
162-
// "root",
163-
// "xxx",
164-
// None,
165-
// Some(Duration::from_secs(10)),
166-
// None,
167-
// )?;
168-
//
169-
// info!("--- test write on a fresh cluster");
170-
// let key_suffix = "1st";
171-
// test_write_read_on_every_node(&tcs, &client, key_suffix).await?;
172-
//
173-
// info!("--- shutdown the cluster");
174-
// let stopped_tcs = {
175-
// let mut stopped_tcs = vec![];
176-
// for mut tc in tcs {
177-
// assert!(tc.meta_node.is_none());
178-
//
179-
// let mut srv = tc.grpc_srv.take().unwrap();
180-
// srv.stop(None).await?;
181-
//
182-
// stopped_tcs.push(tc);
183-
// }
184-
// stopped_tcs
185-
// };
186-
//
187-
// info!("--- restart the cluster");
188-
// let tcs = {
189-
// let mut tcs = vec![];
190-
// for mut tc in stopped_tcs {
191-
// start_metasrv_with_context(&mut tc).await?;
192-
// tcs.push(tc);
193-
// }
194-
//
195-
// for tc in &tcs {
196-
// info!("--- wait until a leader is observed");
197-
// // Every tcs[i] contains one meta node in this context.
198-
// let g = tc.grpc_srv.as_ref().unwrap();
199-
// let meta_node = g.get_meta_node();
200-
// let metrics = meta_node
201-
// .raft
202-
// .wait(timeout())
203-
// .metrics(|m| m.current_leader.is_some(), "a leader is observed")
204-
// .await?;
205-
//
206-
// info!("got leader, metrics: {:?}", metrics);
207-
// }
208-
// tcs
209-
// };
210-
//
211-
// info!("--- read use old client");
212-
// let tc = &tcs[0];
213-
// let k = make_key(tc, key_suffix);
214-
// let res = client.get_kv(&k).await?;
215-
// let res = res.unwrap();
216-
//
217-
// assert_eq!(b(k), res.data);
218-
//
219-
// Ok(())
220-
// }
123+
#[test(harness = meta_service_test_harness)]
124+
#[fastrace::trace]
125+
async fn test_kv_api_restart_cluster_token_expired() -> anyhow::Result<()> {
126+
fn make_key(tc: &MetaSrvTestContext, k: impl std::fmt::Display) -> String {
127+
let x = &tc.config.raft_config;
128+
format!("t-restart-cluster-{}-{}-{}", x.config_id, x.id, k)
129+
}
130+
131+
async fn test_write_read_on_every_node(
132+
tcs: &[MetaSrvTestContext],
133+
client: &ClientHandle,
134+
key_suffix: &str,
135+
) -> anyhow::Result<()> {
136+
info!("--- test write on every node: {}", key_suffix);
137+
138+
for (i, tc) in tcs.iter().enumerate() {
139+
let k = make_key(tc, key_suffix);
140+
if i == 0 {
141+
let res = client.upsert_kv(UpsertKVReq::update(&k, &b(&k))).await?;
142+
info!("--- upsert res: {:?}", res);
143+
} else {
144+
let client = tc.grpc_client().await.unwrap();
145+
let res = client.upsert_kv(UpsertKVReq::update(&k, &b(&k))).await?;
146+
info!("--- upsert res: {:?}", res);
147+
}
148+
149+
let res = client.get_kv(&k).await?;
150+
let res = res.unwrap();
151+
152+
assert_eq!(k.into_bytes(), res.data);
153+
}
154+
155+
Ok(())
156+
}
157+
158+
let tcs = start_metasrv_cluster(&[0, 1, 2]).await?;
159+
let client = MetaGrpcClient::try_create(
160+
vec![tcs[0].config.grpc_api_address.clone()],
161+
"root",
162+
"xxx",
163+
// Without timeout, the client will not be able to reconnect.
164+
// This is an issue of the http client.
165+
Some(Duration::from_secs(1)),
166+
Some(Duration::from_secs(10)),
167+
None,
168+
)?;
169+
170+
info!("--- test write on a fresh cluster");
171+
let key_suffix = "1st";
172+
test_write_read_on_every_node(&tcs, &client, key_suffix).await?;
173+
174+
info!("--- shutdown the cluster");
175+
let stopped_tcs = {
176+
let mut stopped_tcs = vec![];
177+
for mut tc in tcs {
178+
assert!(tc.meta_node.is_none());
179+
180+
let mut srv = tc.grpc_srv.take().unwrap();
181+
srv.stop(None).await?;
182+
183+
stopped_tcs.push(tc);
184+
}
185+
stopped_tcs
186+
};
187+
188+
info!("--- restart the cluster");
189+
let tcs = {
190+
let mut tcs = vec![];
191+
for mut tc in stopped_tcs {
192+
info!(
193+
"--- starting metasrv: {:?}",
194+
tc.config.raft_config.raft_api_addr().await?
195+
);
196+
start_metasrv_with_context(&mut tc).await?;
197+
198+
info!(
199+
"--- started metasrv: {:?}",
200+
tc.config.raft_config.raft_api_addr().await?
201+
);
202+
203+
// sleep(Duration::from_secs(3)).await;
204+
tcs.push(tc);
205+
}
206+
207+
for tc in &tcs {
208+
info!("--- wait until a leader is observed");
209+
// Every tcs[i] contains one meta node in this context.
210+
let g = tc.grpc_srv.as_ref().unwrap();
211+
let meta_node = g.get_meta_node();
212+
let metrics = meta_node
213+
.raft
214+
.wait(timeout())
215+
.metrics(|m| m.current_leader.is_some(), "a leader is observed")
216+
.await?;
217+
218+
info!("got leader, metrics: {:?}", metrics);
219+
}
220+
tcs
221+
};
222+
223+
info!("--- read use old client");
224+
let tc = &tcs[0];
225+
let k = make_key(tc, key_suffix);
226+
let res = client.get_kv(&k).await?;
227+
let res = res.unwrap();
228+
229+
assert_eq!(b(k), res.data);
230+
231+
Ok(())
232+
}
221233

222234
// Election timeout is 8~12 sec.
223235
// A raft node waits for a interval of election timeout before starting election

0 commit comments

Comments
ย (0)