Skip to content

Commit 0a6e529

Browse files
authored
feat(meta-service): Add PutSequential operation for ordered key generation (#18312)
Introduce `PutSequential` operation that creates sequential keys using a specified `prefix` and sequence generator. This enables applications to build ordered data structures like queues, which are essential for implementing distributed locks and semaphores. Operation details: - User provides a key `prefix` and sequence generator key - Sequence generator stores JSON-encoded `u64` values for `FetchAddU64` compatibility - Target key format: `<prefix>{next_val(<sequence_key>)}` - Key generation happens atomically during state machine application Use cases: - Distributed queue implementations - Ordered task scheduling - Distributed lock/semaphore mechanisms - Any scenario requiring guaranteed key ordering The operation leverages the existing conflict-free sequence generation infrastructure to ensure high concurrency and consistent ordering across distributed environments.
1 parent 3434c5e commit 0a6e529

File tree

16 files changed

+535
-96
lines changed

16 files changed

+535
-96
lines changed

src/meta/binaries/metabench/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ async fn benchmark_table_copy_file(
333333
&copied_file_ident,
334334
serialize_struct(&copied_file_value).unwrap(),
335335
)
336-
.with_ttl(param.ttl_ms);
336+
.with_ttl(param.ttl_ms.map(Duration::from_millis));
337337

338338
txn.if_then.push(put_op);
339339
}

src/meta/client/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
162162
/// - 2025-07-03: since TODO: add when merged
163163
/// 🖥 server: adaptive `expire_at` support both seconds and milliseconds.
164164
///
165+
/// - 2025-07-04: since TODO: add when merged
166+
/// 🖥 server: add `PutSequential`.
167+
///
165168
/// Server feature set:
166169
/// ```yaml
167170
/// server_features:

src/meta/kvapi-test-suite/src/test_suite.rs renamed to src/meta/kvapi-test-suite/src/kvapi_test_suite.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use databend_common_meta_kvapi::kvapi;
1818
use databend_common_meta_types::protobuf as pb;
1919
use databend_common_meta_types::protobuf::BooleanExpression;
2020
use databend_common_meta_types::protobuf::FetchAddU64Response;
21+
use databend_common_meta_types::protobuf::KvMeta;
2122
use databend_common_meta_types::seq_value::KVMeta;
2223
use databend_common_meta_types::seq_value::SeqV;
2324
use databend_common_meta_types::txn_condition;
@@ -98,6 +99,9 @@ impl TestSuite {
9899
.await?;
99100
self.kv_transaction_fetch_add_u64_match_seq(&builder.build().await)
100101
.await?;
102+
self.kv_txn_put_sequential(&builder.build().await).await?;
103+
self.kv_txn_put_sequential_expire_and_ttl(&builder.build().await)
104+
.await?;
101105
self.kv_transaction_with_ttl(&builder.build().await).await?;
102106
self.kv_transaction_delete_match_seq_none(&builder.build().await)
103107
.await?;
@@ -1348,6 +1352,125 @@ impl TestSuite {
13481352
Ok(())
13491353
}
13501354

1355+
/// Tests match_seq must match the record seq to take place the operation.
1356+
#[fastrace::trace]
1357+
pub async fn kv_txn_put_sequential<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
1358+
info!("--- {}", func_path!());
1359+
1360+
let txn = TxnRequest::new(vec![], vec![
1361+
TxnOp::put_sequential("k1/", "seq1", b("v1")),
1362+
TxnOp::put_sequential("k2/", "seq1", b("v2")),
1363+
]);
1364+
1365+
let resp = kv.transaction(txn).await?;
1366+
1367+
assert_eq!(resp.responses.len(), 2);
1368+
1369+
assert_eq!(resp.responses[0].try_as_put().unwrap(), &TxnPutResponse {
1370+
key: "k1/000_000_000_000_000_000_000".to_string(),
1371+
prev_value: None,
1372+
current: Some(pb::SeqV::with_meta(
1373+
2,
1374+
Some(KvMeta::default()),
1375+
b("v1").to_vec()
1376+
)),
1377+
});
1378+
assert_eq!(resp.responses[1].try_as_put().unwrap(), &TxnPutResponse {
1379+
key: "k2/000_000_000_000_000_000_001".to_string(),
1380+
prev_value: None,
1381+
current: Some(pb::SeqV::with_meta(
1382+
4,
1383+
Some(KvMeta::default()),
1384+
b("v2").to_vec()
1385+
)),
1386+
});
1387+
1388+
// insert again
1389+
1390+
let txn = TxnRequest::new(vec![], vec![
1391+
TxnOp::put_sequential("k1/", "seq1", b("v1")),
1392+
TxnOp::put_sequential("k2/", "seq1", b("v2")),
1393+
]);
1394+
1395+
let resp = kv.transaction(txn).await?;
1396+
1397+
assert_eq!(resp.responses.len(), 2);
1398+
1399+
assert_eq!(resp.responses[0].try_as_put().unwrap(), &TxnPutResponse {
1400+
key: "k1/000_000_000_000_000_000_002".to_string(),
1401+
prev_value: None,
1402+
current: Some(pb::SeqV::with_meta(
1403+
6,
1404+
Some(KvMeta::default()),
1405+
b("v1").to_vec()
1406+
)),
1407+
});
1408+
assert_eq!(resp.responses[1].try_as_put().unwrap(), &TxnPutResponse {
1409+
key: "k2/000_000_000_000_000_000_003".to_string(),
1410+
prev_value: None,
1411+
current: Some(pb::SeqV::with_meta(
1412+
8,
1413+
Some(KvMeta::default()),
1414+
b("v2").to_vec()
1415+
)),
1416+
});
1417+
1418+
Ok(())
1419+
}
1420+
1421+
#[fastrace::trace]
1422+
pub async fn kv_txn_put_sequential_expire_and_ttl<KV: kvapi::KVApi>(
1423+
&self,
1424+
kv: &KV,
1425+
) -> anyhow::Result<()> {
1426+
// - Add a record via transaction with ttl
1427+
1428+
info!("--- {}", func_path!());
1429+
1430+
let now_ms = SeqV::<()>::now_ms();
1431+
1432+
let txn = TxnRequest::new(vec![], vec![
1433+
TxnOp::put_sequential("k1/", "seq1", b("v1")).with_expires_at_ms(Some(now_ms + 1000)),
1434+
TxnOp::put_sequential("k2/", "seq1", b("v2"))
1435+
.with_ttl(Some(Duration::from_millis(1000))),
1436+
]);
1437+
1438+
let resp = kv.transaction(txn).await?;
1439+
1440+
assert_eq!(resp.responses.len(), 2);
1441+
1442+
assert_eq!(resp.responses[0].try_as_put().unwrap(), &TxnPutResponse {
1443+
key: "k1/000_000_000_000_000_000_000".to_string(),
1444+
prev_value: None,
1445+
current: Some(pb::SeqV::with_meta(
1446+
2,
1447+
Some(KvMeta::new_expire(now_ms + 1000)),
1448+
b("v1").to_vec()
1449+
)),
1450+
});
1451+
1452+
let got = resp.responses[1].try_as_put().unwrap();
1453+
assert_eq!(got.key, "k2/000_000_000_000_000_000_001".to_string());
1454+
assert_eq!(got.prev_value, None);
1455+
let current = got.current.clone().unwrap();
1456+
assert_eq!(current.seq, 4);
1457+
assert_eq!(current.data, b("v2"));
1458+
assert!(current.meta.unwrap().close_to(
1459+
&KvMeta::new_expire(now_ms + 1000),
1460+
Duration::from_millis(1000)
1461+
));
1462+
1463+
sleep(Duration::from_millis(2000)).await;
1464+
1465+
let got = kv.get_kv("k1/000_000_000_000_000_000_000").await?;
1466+
assert!(got.is_none(), "k1 should be expired");
1467+
1468+
let got = kv.get_kv("k2/000_000_000_000_000_000_001").await?;
1469+
assert!(got.is_none(), "k2 should be expired");
1470+
1471+
Ok(())
1472+
}
1473+
13511474
#[fastrace::trace]
13521475
pub async fn kv_transaction_with_ttl<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
13531476
// - Add a record via transaction with ttl

src/meta/kvapi-test-suite/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414

1515
#![allow(clippy::uninlined_format_args)]
1616

17-
mod test_suite;
18-
pub use test_suite::TestSuite;
17+
mod kvapi_test_suite;
18+
pub use kvapi_test_suite::TestSuite;

src/meta/process/src/kv_processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ where F: Fn(&str, Vec<u8>) -> Result<Vec<u8>, anyhow::Error>
214214
Request::Delete(_) => {}
215215
Request::DeleteByPrefix(_) => {}
216216
Request::FetchAddU64(_) => {}
217+
Request::PutSequential(_) => {}
217218
}
218219

219220
Ok(TxnOp { request: Some(req) })

0 commit comments

Comments
 (0)