Skip to content

Commit 0c3c0e1

Browse files
authored
refactor: make test_watch_expired_events() less sensitive to inaccurate time (#16966)
* refactor: remove unused sled related config from databend-meta * chore: adjust gRPC logging level * refactor: make test_watch_expired_events() less sensitive to inaccurate time - Fix: #16942 * refactor: add short sleep to test_meta_node_join_with_state to ensure server quits completely
1 parent 7cff135 commit 0c3c0e1

File tree

11 files changed

+44
-93
lines changed

11 files changed

+44
-93
lines changed

src/meta/binaries/metactl/import.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ async fn import_v003(
134134
raft_config: RaftConfig,
135135
lines: impl IntoIterator<Item = Result<String, io::Error>>,
136136
) -> anyhow::Result<Option<LogId>> {
137-
let db = init_get_sled_db(raft_config.raft_dir.clone(), raft_config.sled_cache_size());
137+
let db = init_get_sled_db(raft_config.raft_dir.clone(), 1024 * 1024 * 1024);
138138

139139
let mut n = 0;
140140
let mut max_log_id: Option<LogId> = None;
@@ -386,7 +386,7 @@ async fn init_new_cluster(
386386
fn clear(args: &ImportArgs) -> anyhow::Result<()> {
387387
eprintln!();
388388
eprintln!("Clear All Sled Trees Before Import:");
389-
let db = init_get_sled_db(args.raft_dir.clone().unwrap(), 64 * 1024 * 1024 * 1024);
389+
let db = init_get_sled_db(args.raft_dir.clone().unwrap(), 1024 * 1024 * 1024);
390390

391391
let tree_names = db.tree_names();
392392
for n in tree_names.iter() {

src/meta/process/src/process_meta_dir.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub fn process_sled_db<F>(config: &Config, convert: F) -> anyhow::Result<()>
2525
where F: Fn(RaftStoreEntry) -> Result<Option<RaftStoreEntry>, anyhow::Error> {
2626
let raft_config = &config.raft_config;
2727

28-
let db = init_get_sled_db(raft_config.raft_dir.clone(), 64 * 1024 * 1024 * 1024);
28+
let db = init_get_sled_db(raft_config.raft_dir.clone(), 1024 * 1024 * 1024);
2929

3030
let mut tree_names = db.tree_names();
3131
tree_names.sort();

src/meta/raft-store/src/config.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,6 @@ pub struct RaftConfig {
135135
/// Otherwise this argument is ignored.
136136
pub id: NodeId,
137137

138-
/// For test only: specifies the tree name prefix
139-
pub sled_tree_prefix: String,
140-
141-
/// The maximum memory in MB that sled can use for caching.
142-
pub sled_max_cache_size_mb: u64,
143-
144138
/// The node name. If the user specifies a name,
145139
/// the user-supplied name is used, if not, the default name is used.
146140
pub cluster_name: String,
@@ -190,8 +184,6 @@ impl Default for RaftConfig {
190184
leave_via: vec![],
191185
leave_id: None,
192186
id: 0,
193-
sled_tree_prefix: "".to_string(),
194-
sled_max_cache_size_mb: 10 * 1024,
195187
cluster_name: "foo_cluster".to_string(),
196188
wait_leader_timeout: 70000,
197189
}
@@ -298,16 +290,4 @@ impl RaftConfig {
298290
}
299291
Ok(())
300292
}
301-
302-
/// Create a unique sled::Tree name by prepending a unique prefix.
303-
/// So that multiple instance that depends on a sled::Tree can be used in one process.
304-
/// sled does not allow to open multiple `sled::Db` in one process.
305-
pub fn tree_name(&self, name: impl std::fmt::Display) -> String {
306-
format!("{}{}", self.sled_tree_prefix, name)
307-
}
308-
309-
/// Return the size of sled cache in bytes.
310-
pub fn sled_cache_size(&self) -> u64 {
311-
self.sled_max_cache_size_mb * 1024 * 1024
312-
}
313293
}

src/meta/raft-store/src/ondisk/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,9 @@ impl OnDisk {
159159
return Ok(());
160160
}
161161

162-
let db = init_get_sled_db(config.raft_dir.clone(), config.sled_cache_size());
162+
let db = init_get_sled_db(config.raft_dir.clone(), 1024 * 1024 * 1024);
163163

164-
let tree_name = config.tree_name(TREE_HEADER);
165-
let tree = SledTree::open(&db, &tree_name, config.is_sync())?;
164+
let tree = SledTree::open(&db, TREE_HEADER, config.is_sync())?;
166165
let ks = tree.key_space::<DataHeader>();
167166

168167
let header = ks.get(&Self::KEY_HEADER.to_string()).map_err(|e| {

src/meta/raft-store/src/ondisk/upgrade_to_v004.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl OnDisk {
5959
let raft_log = RaftLogV004::open(raft_log_config)?;
6060
let mut importer = importer::Importer::new(raft_log);
6161

62-
let db = init_get_sled_db(self.config.raft_dir.clone(), self.config.sled_cache_size());
62+
let db = init_get_sled_db(self.config.raft_dir.clone(), 1024 * 1024 * 1024);
6363

6464
// Read the purged index
6565
let first_log_index = {
@@ -213,7 +213,7 @@ impl OnDisk {
213213

214214
self.progress(format_args!(" Remove V003 log from sled db",));
215215

216-
let db = init_get_sled_db(self.config.raft_dir.clone(), self.config.sled_cache_size());
216+
let db = init_get_sled_db(self.config.raft_dir.clone(), 1024 * 1024 * 1024);
217217
for tree_name in db.tree_names() {
218218
if tree_name == "__sled__default" {
219219
continue;

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ use futures::stream::TryChunksError;
5757
use futures::StreamExt;
5858
use futures::TryStreamExt;
5959
use log::debug;
60-
use log::info;
6160
use prost::Message;
6261
use tokio_stream;
6362
use tokio_stream::Stream;
@@ -109,7 +108,7 @@ impl MetaServiceImpl {
109108
#[fastrace::trace]
110109
async fn handle_kv_api(&self, request: Request<RaftRequest>) -> Result<RaftReply, Status> {
111110
let req: MetaGrpcReq = request.try_into()?;
112-
info!("{}: Received MetaGrpcReq: {:?}", func_name!(), req);
111+
debug!("{}: Received MetaGrpcReq: {:?}", func_name!(), req);
113112

114113
let m = &self.meta_node;
115114
let reply = match &req {
@@ -134,7 +133,7 @@ impl MetaServiceImpl {
134133
) -> Result<(Option<Endpoint>, BoxStream<StreamItem>), Status> {
135134
let req: MetaGrpcReadReq = GrpcHelper::parse_req(request)?;
136135

137-
info!("{}: Received ReadRequest: {:?}", func_name!(), req);
136+
debug!("{}: Received ReadRequest: {:?}", func_name!(), req);
138137

139138
let req = ForwardRequest::new(1, req);
140139

@@ -156,7 +155,7 @@ impl MetaServiceImpl {
156155
) -> Result<(Option<Endpoint>, TxnReply), Status> {
157156
let txn = request.into_inner();
158157

159-
info!("{}: Received TxnRequest: {}", func_name!(), txn);
158+
debug!("{}: Received TxnRequest: {}", func_name!(), txn);
160159

161160
let ent = LogEntry::new(Cmd::Transaction(txn.clone()));
162161

src/meta/service/src/configs/outer_v0.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,6 @@ pub struct ConfigViaEnv {
305305
pub kvsrv_single: bool,
306306
pub metasrv_join: Vec<String>,
307307
pub kvsrv_id: u64,
308-
pub sled_tree_prefix: String,
309-
pub sled_max_cache_size_mb: u64,
310308
pub cluster_name: String,
311309
}
312310

@@ -363,8 +361,6 @@ impl From<Config> for ConfigViaEnv {
363361
kvsrv_single: cfg.raft_config.single,
364362
metasrv_join: cfg.raft_config.join,
365363
kvsrv_id: cfg.raft_config.id,
366-
sled_tree_prefix: cfg.raft_config.sled_tree_prefix,
367-
sled_max_cache_size_mb: cfg.raft_config.sled_max_cache_size_mb,
368364
cluster_name: cfg.raft_config.cluster_name,
369365
}
370366
}
@@ -405,8 +401,6 @@ impl Into<Config> for ConfigViaEnv {
405401
// Do not allow to leave via environment variable
406402
leave_id: None,
407403
id: self.kvsrv_id,
408-
sled_tree_prefix: self.sled_tree_prefix,
409-
sled_max_cache_size_mb: self.sled_max_cache_size_mb,
410404
cluster_name: self.cluster_name,
411405
};
412406
let log_config = LogConfig {
@@ -539,7 +533,7 @@ pub struct RaftConfig {
539533

540534
/// The total cache size for snapshot blocks.
541535
///
542-
/// By default it is 1GB.
536+
/// By default, it is 1GB.
543537
#[clap(long, default_value = "1073741824")]
544538
pub snapshot_db_block_cache_size: u64,
545539

@@ -574,14 +568,6 @@ pub struct RaftConfig {
574568
#[clap(long, default_value = "0")]
575569
pub id: u64,
576570

577-
/// For test only: specifies the tree name prefix
578-
#[clap(long, default_value = "")]
579-
pub sled_tree_prefix: String,
580-
581-
/// The maximum memory in MB that sled can use for caching. Default is 10GB
582-
#[clap(long, default_value = "10240")]
583-
pub sled_max_cache_size_mb: u64,
584-
585571
/// The node name. If the user specifies a name, the user-supplied name is used,
586572
/// if not, the default name is used
587573
#[clap(long, default_value = "foo_cluster")]
@@ -630,8 +616,6 @@ impl From<RaftConfig> for InnerRaftConfig {
630616
leave_via: x.leave_via,
631617
leave_id: x.leave_id,
632618
id: x.id,
633-
sled_tree_prefix: x.sled_tree_prefix,
634-
sled_max_cache_size_mb: x.sled_max_cache_size_mb,
635619
cluster_name: x.cluster_name,
636620
wait_leader_timeout: x.wait_leader_timeout,
637621
}
@@ -668,8 +652,6 @@ impl From<InnerRaftConfig> for RaftConfig {
668652
leave_via: inner.leave_via,
669653
leave_id: inner.leave_id,
670654
id: inner.id,
671-
sled_tree_prefix: inner.sled_tree_prefix,
672-
sled_max_cache_size_mb: inner.sled_max_cache_size_mb,
673655
cluster_name: inner.cluster_name,
674656
wait_leader_timeout: inner.wait_leader_timeout,
675657
}

src/meta/service/tests/it/configs.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ cluster_name = "foo_cluster"
8989
assert!(!cfg.raft_config.single);
9090
assert_eq!(cfg.raft_config.join, vec!["j1", "j2"]);
9191
assert_eq!(cfg.raft_config.id, 20);
92-
assert_eq!(cfg.raft_config.sled_tree_prefix, "sled_foo");
9392
assert_eq!(cfg.raft_config.cluster_name, "foo_cluster");
9493
});
9594

src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -311,12 +311,14 @@ async fn test_watch_expired_events() -> anyhow::Result<()> {
311311
// - Before applying, 32 expired keys will be cleaned.
312312
// - When applying, touched expired keys will be cleaned.
313313

314+
fn sec(x: u64) -> Duration {
315+
Duration::from_secs(x)
316+
}
317+
314318
let (_tc, addr) = crate::tests::start_metasrv().await?;
315319

316320
let watch_prefix = "w_";
317321
let now_sec = now();
318-
let expire = now_sec + 11;
319-
// dbg!(now_sec, expire);
320322

321323
info!("--- prepare data that are gonna expire");
322324
{
@@ -332,31 +334,19 @@ async fn test_watch_expired_events() -> anyhow::Result<()> {
332334
for i in 0..(32 + 1) {
333335
let k = format!("w_auto_gc_{}", i);
334336
txn.if_then
335-
.push(TxnOp::put_with_ttl(&k, b(&k), Some(Duration::from_secs(1))));
337+
.push(TxnOp::put_with_ttl(&k, b(&k), Some(sec(1))));
336338
}
337339

338340
// Expired key won't be cleaned when they are read, although read returns None.
339341

340-
txn.if_then.push(TxnOp::put_with_ttl(
341-
"w_b1",
342-
b("w_b1"),
343-
Some(Duration::from_secs(6)),
344-
));
345-
txn.if_then.push(TxnOp::put_with_ttl(
346-
"w_b2",
347-
b("w_b2"),
348-
Some(Duration::from_secs(6)),
349-
));
350-
txn.if_then.push(TxnOp::put_with_ttl(
351-
"w_b3a",
352-
b("w_b3a"),
353-
Some(Duration::from_secs(6)),
354-
));
355-
txn.if_then.push(TxnOp::put_with_ttl(
356-
"w_b3b",
357-
b("w_b3b"),
358-
Some(Duration::from_secs(11)),
359-
));
342+
txn.if_then
343+
.push(TxnOp::put_with_ttl("w_b1", b("w_b1"), Some(sec(6))));
344+
txn.if_then
345+
.push(TxnOp::put_with_ttl("w_b2", b("w_b2"), Some(sec(6))));
346+
txn.if_then
347+
.push(TxnOp::put_with_ttl("w_b3a", b("w_b3a"), Some(sec(6))));
348+
txn.if_then
349+
.push(TxnOp::put_with_ttl("w_b3b", b("w_b3b"), Some(sec(15))));
360350

361351
client.transaction(txn).await?;
362352
}
@@ -373,8 +363,8 @@ async fn test_watch_expired_events() -> anyhow::Result<()> {
373363
watch_client.request(watch).await?
374364
};
375365

376-
info!("--- sleep {} for expiration", expire - now_sec);
377-
tokio::time::sleep(Duration::from_secs(10)).await;
366+
info!("--- sleep 10 for expiration");
367+
tokio::time::sleep(sec(10)).await;
378368

379369
info!("--- apply another txn in another thread to override keys");
380370
{
@@ -430,20 +420,20 @@ async fn test_watch_expired_events() -> anyhow::Result<()> {
430420
"w_b3b",
431421
seq + 3,
432422
"w_b3b",
433-
Some(KvMeta::new_expire(now_sec + 16)),
423+
Some(KvMeta::new_expire(now_sec + 15)),
434424
), // expired
435425
];
436426

437-
// remove the millisecond part of expire_at
427+
// The evaluated expire_at could not equal to the real expire_at, so we need to tidy the expire_at.
438428
fn tidy(mut ev: Event) -> Event {
439429
if let Some(ref mut prev) = ev.prev {
440430
if let Some(ref mut meta) = prev.meta {
441-
meta.expire_at = meta.expire_at.map(|x| x / 1000 * 1000);
431+
meta.expire_at = meta.expire_at.map(|x| x / 10 * 10);
442432
}
443433
}
444434
if let Some(ref mut current) = ev.current {
445435
if let Some(ref mut meta) = current.meta {
446-
meta.expire_at = meta.expire_at.map(|x| x / 1000 * 1000);
436+
meta.expire_at = meta.expire_at.map(|x| x / 10 * 10);
447437
}
448438
}
449439
ev

src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -319,20 +319,20 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> {
319319
tc2.config.raft_config.single = false;
320320
tc2.config.raft_config.join = vec![tc0.config.raft_config.raft_api_addr().await?.to_string()];
321321

322-
let meta_node = MetaNode::start(&tc0.config).await?;
323-
// Initial log, leader blank log, add node-0.
322+
let n1 = MetaNode::start(&tc0.config).await?;
323+
// Initial membership log, leader blank log, add node-0 log.
324324
let mut log_index = 3;
325325

326-
let res = meta_node
326+
let res = n1
327327
.join_cluster(
328328
&tc0.config.raft_config,
329329
tc0.config.grpc_api_advertise_address(),
330330
)
331331
.await?;
332332
assert_eq!(Err("Did not join: --join is empty".to_string()), res);
333333

334-
let meta_node1 = MetaNode::start(&tc1.config).await?;
335-
let res = meta_node1
334+
let n1 = MetaNode::start(&tc1.config).await?;
335+
let res = n1
336336
.join_cluster(
337337
&tc1.config.raft_config,
338338
tc1.config.grpc_api_advertise_address(),
@@ -342,8 +342,7 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> {
342342

343343
// Two membership logs, one add-node log;
344344
log_index += 3;
345-
meta_node1
346-
.raft
345+
n1.raft
347346
.wait(timeout())
348347
.applied_index(Some(log_index), "node-1 join cluster")
349348
.await?;
@@ -354,6 +353,9 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> {
354353
n2.stop().await?;
355354
}
356355

356+
// Wait a second to ensure server quits completely.
357+
sleep(Duration::from_secs(1)).await;
358+
357359
info!("--- Allow to join node-2 with initialized store");
358360
{
359361
let n2 = MetaNode::start(&tc2.config).await?;
@@ -368,8 +370,8 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> {
368370
// Two membership logs, one add-node log;
369371
log_index += 3;
370372

371-
// Add this barrier to ensure all of the logs are applied before quit.
372-
// Otherwise the next time node-2 starts it can not see the applied
373+
// Add this barrier to ensure all the logs are applied before quit.
374+
// Otherwise, the next time node-2 starts it can not see the applied
373375
// membership and believes it has not yet joined into a cluster.
374376
n2.raft
375377
.wait(timeout())
@@ -379,6 +381,9 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> {
379381
n2.stop().await?;
380382
}
381383

384+
// Wait a second to ensure server quits completely.
385+
sleep(Duration::from_secs(1)).await;
386+
382387
info!("--- Not allowed to join node-2 with store with membership");
383388
{
384389
let n2 = MetaNode::start(&tc2.config).await?;

0 commit comments

Comments
 (0)