Skip to content

Commit 78754dd

Browse files
authored
feat(meta-service): add FetchAddU64.match_seq (#18258)
If `match_seq: Option<64>` is specified, the record seq number must match the provided value to proceed `FetchAddU64` operation.
1 parent 633351d commit 78754dd

File tree

6 files changed

+185
-11
lines changed

6 files changed

+185
-11
lines changed

src/meta/client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
153153
/// - 2025-06-24: since TODO: add when merge
154154
/// 🖥 server: add `FetchAddU64` operation to the `TxnOp`
155155
///
156+
/// - 2025-06-26: since TODO: add when merge
157+
/// 🖥 server: add `FetchAddU64.match_seq`
156158
///
157159
/// Server feature set:
158160
/// ```yaml

src/meta/kvapi/src/kvapi/test_suite.rs

Lines changed: 125 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ impl kvapi::TestSuite {
9393
self.kv_mget(&builder.build().await).await?;
9494
self.kv_txn_absent_seq_0(&builder.build().await).await?;
9595
self.kv_transaction(&builder.build().await).await?;
96+
self.kv_transaction_fetch_add_u64(&builder.build().await)
97+
.await?;
98+
self.kv_transaction_fetch_add_u64_match_seq(&builder.build().await)
99+
.await?;
96100
self.kv_transaction_with_ttl(&builder.build().await).await?;
97101
self.kv_transaction_delete_match_seq_none(&builder.build().await)
98102
.await?;
@@ -1074,7 +1078,7 @@ impl kvapi::TestSuite {
10741078
}
10751079
);
10761080
assert_eq!(
1077-
resp.responses[2].try_as_fetch_add_u64().unwrap(),
1081+
resp.responses[1].try_as_fetch_add_u64().unwrap(),
10781082
&FetchAddU64Response {
10791083
key: "k2".to_string(),
10801084
before_seq: 0,
@@ -1105,7 +1109,7 @@ impl kvapi::TestSuite {
11051109
}
11061110
);
11071111
assert_eq!(
1108-
resp.responses[2].try_as_fetch_add_u64().unwrap(),
1112+
resp.responses[1].try_as_fetch_add_u64().unwrap(),
11091113
&FetchAddU64Response {
11101114
key: "k2".to_string(),
11111115
before_seq: 2,
@@ -1137,11 +1141,21 @@ impl kvapi::TestSuite {
11371141
}
11381142
);
11391143
assert_eq!(
1140-
resp.responses[2].try_as_fetch_add_u64().unwrap(),
1144+
resp.responses[1].try_as_fetch_add_u64().unwrap(),
11411145
&FetchAddU64Response {
11421146
key: "k2".to_string(),
11431147
before_seq: 4,
11441148
before: 6,
1149+
after_seq: 6,
1150+
after: u64::MAX / 2 + 6,
1151+
}
1152+
);
1153+
assert_eq!(
1154+
resp.responses[2].try_as_fetch_add_u64().unwrap(),
1155+
&FetchAddU64Response {
1156+
key: "k2".to_string(),
1157+
before_seq: 6,
1158+
before: u64::MAX / 2 + 6,
11451159
after_seq: 7,
11461160
after: u64::MAX,
11471161
}
@@ -1151,6 +1165,114 @@ impl kvapi::TestSuite {
11511165
Ok(())
11521166
}
11531167

1168+
/// Tests match_seq must match the record seq to take place the operation.
1169+
#[fastrace::trace]
1170+
pub async fn kv_transaction_fetch_add_u64_match_seq<KV: kvapi::KVApi>(
1171+
&self,
1172+
kv: &KV,
1173+
) -> anyhow::Result<()> {
1174+
// - Add a record via transaction with ttl
1175+
1176+
info!("--- {}", func_path!());
1177+
1178+
info!("--- match_seq zero");
1179+
{
1180+
let txn = TxnRequest::new(vec![], vec![
1181+
TxnOp::fetch_add_u64("k1", 2).match_seq(Some(0)),
1182+
TxnOp::fetch_add_u64("k1", 3).match_seq(Some(0)),
1183+
TxnOp::fetch_add_u64("k1", 4),
1184+
TxnOp::fetch_add_u64("k2", 5),
1185+
]);
1186+
1187+
let resp = kv.transaction(txn).await?;
1188+
1189+
assert_eq!(
1190+
resp.responses[0].try_as_fetch_add_u64().unwrap(),
1191+
&FetchAddU64Response {
1192+
key: "k1".to_string(),
1193+
before_seq: 0,
1194+
before: 0,
1195+
after_seq: 1,
1196+
after: 2,
1197+
}
1198+
);
1199+
assert_eq!(
1200+
resp.responses[1].try_as_fetch_add_u64().unwrap(),
1201+
&FetchAddU64Response {
1202+
key: "k1".to_string(),
1203+
before_seq: 1,
1204+
before: 2,
1205+
after_seq: 1,
1206+
after: 2,
1207+
}
1208+
);
1209+
assert_eq!(
1210+
resp.responses[2].try_as_fetch_add_u64().unwrap(),
1211+
&FetchAddU64Response {
1212+
key: "k1".to_string(),
1213+
before_seq: 1,
1214+
before: 2,
1215+
after_seq: 2,
1216+
after: 6,
1217+
}
1218+
);
1219+
assert_eq!(
1220+
resp.responses[3].try_as_fetch_add_u64().unwrap(),
1221+
&FetchAddU64Response {
1222+
key: "k2".to_string(),
1223+
before_seq: 0,
1224+
before: 0,
1225+
after_seq: 3,
1226+
after: 5,
1227+
}
1228+
);
1229+
}
1230+
1231+
info!("--- match_seq non zero");
1232+
{
1233+
let txn = TxnRequest::new(vec![], vec![
1234+
TxnOp::fetch_add_u64("k1", 2).match_seq(Some(2)),
1235+
TxnOp::fetch_add_u64("k1", 3).match_seq(Some(2)),
1236+
TxnOp::fetch_add_u64("k1", 4),
1237+
]);
1238+
1239+
let resp = kv.transaction(txn).await?;
1240+
1241+
assert_eq!(
1242+
resp.responses[0].try_as_fetch_add_u64().unwrap(),
1243+
&FetchAddU64Response {
1244+
key: "k1".to_string(),
1245+
before_seq: 2,
1246+
before: 6,
1247+
after_seq: 4,
1248+
after: 8,
1249+
}
1250+
);
1251+
assert_eq!(
1252+
resp.responses[1].try_as_fetch_add_u64().unwrap(),
1253+
&FetchAddU64Response {
1254+
key: "k1".to_string(),
1255+
before_seq: 4,
1256+
before: 8,
1257+
after_seq: 4,
1258+
after: 8,
1259+
}
1260+
);
1261+
assert_eq!(
1262+
resp.responses[2].try_as_fetch_add_u64().unwrap(),
1263+
&FetchAddU64Response {
1264+
key: "k1".to_string(),
1265+
before_seq: 4,
1266+
before: 8,
1267+
after_seq: 5,
1268+
after: 12,
1269+
}
1270+
);
1271+
}
1272+
1273+
Ok(())
1274+
}
1275+
11541276
#[fastrace::trace]
11551277
pub async fn kv_transaction_with_ttl<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
11561278
// - Add a record via transaction with ttl

src/meta/raft-store/src/applier.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,8 @@ where SM: StateMachineApi + 'static
577577
) -> Result<(), io::Error> {
578578
let before_seqv = self.sm.get_maybe_expired_kv(&req.key).await?;
579579

580+
let before_seq = before_seqv.seq();
581+
580582
let before = if let Some(seqv) = &before_seqv {
581583
let bytes = &seqv.data;
582584
let u64_res: Result<u64, _> = serde_json::from_slice(bytes.as_slice());
@@ -594,6 +596,14 @@ where SM: StateMachineApi + 'static
594596
0
595597
};
596598

599+
if let Some(match_seq) = req.match_seq {
600+
if match_seq != before_seq {
601+
let r = TxnOpResponse::unchanged_fetch_add_u64(&req.key, before_seq, before);
602+
resp.responses.push(r);
603+
return Ok(());
604+
}
605+
}
606+
597607
let after = before.saturating_add_signed(req.delta);
598608

599609
let (_prev, result) = {
@@ -602,13 +612,8 @@ where SM: StateMachineApi + 'static
602612
self.upsert_kv(&upsert).await?
603613
};
604614

605-
let fetch_add_resp = TxnOpResponse::fetch_add_u64(
606-
req.key.clone(),
607-
before_seqv.seq(),
608-
before,
609-
result.seq(),
610-
after,
611-
);
615+
let fetch_add_resp =
616+
TxnOpResponse::fetch_add_u64(req.key.clone(), before_seq, before, result.seq(), after);
612617

613618
resp.responses.push(fetch_add_resp);
614619

src/meta/types/proto/request.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ message FetchAddU64 {
4747
// The key to fetch and add the delta.
4848
string key = 1;
4949

50+
// Assert the seq number of the record before update.
51+
// - If it does not match, no update will be made and the record seq number won't change.
52+
// - If it is None, the update will always be made.
53+
optional uint64 match_seq = 3;
54+
5055
// The delta to add to the value.
5156
int64 delta = 2;
5257
}

src/meta/types/src/proto_display/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,11 @@ impl Display for ConditionalOperation {
349349

350350
impl Display for FetchAddU64 {
351351
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
352-
write!(f, "FetchAddU64 key={} delta={}", self.key, self.delta)
352+
write!(f, "FetchAddU64 key={} delta={}", self.key, self.delta)?;
353+
if let Some(match_seq) = self.match_seq {
354+
write!(f, " match_seq: {}", match_seq)?;
355+
}
356+
Ok(())
353357
}
354358
}
355359

@@ -466,9 +470,20 @@ mod tests {
466470
fn test_display_fetch_add_u64() {
467471
let req = FetchAddU64 {
468472
key: "k1".to_string(),
473+
match_seq: None,
469474
delta: 1,
470475
};
471476
assert_eq!(req.to_string(), "FetchAddU64 key=k1 delta=1");
477+
478+
let req_with_seq = FetchAddU64 {
479+
key: "k1".to_string(),
480+
match_seq: Some(10),
481+
delta: 1,
482+
};
483+
assert_eq!(
484+
req_with_seq.to_string(),
485+
"FetchAddU64 key=k1 delta=1 match_seq: 10"
486+
);
472487
}
473488

474489
#[test]

src/meta/types/src/proto_ext/txn_ext.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,31 @@ impl pb::TxnOp {
317317
pb::TxnOp {
318318
request: Some(pb::txn_op::Request::FetchAddU64(pb::FetchAddU64 {
319319
key: key.to_string(),
320+
match_seq: None,
320321
delta,
321322
})),
322323
}
323324
}
325+
326+
/// Add a match-sequence-number condition to the operation.
327+
///
328+
/// If the sequence number does not match, the operation won't be take place.
329+
pub fn match_seq(mut self, seq: Option<u64>) -> Self {
330+
let req = self
331+
.request
332+
.as_mut()
333+
.expect("TxnOp must have a non-None request field");
334+
335+
match req {
336+
pb::txn_op::Request::Delete(p) => p.match_seq = seq,
337+
pb::txn_op::Request::FetchAddU64(d) => d.match_seq = seq,
338+
_ => {
339+
unreachable!("Not support match_seq for: {}", req)
340+
}
341+
}
342+
343+
self
344+
}
324345
}
325346

326347
impl pb::TxnOpResponse {
@@ -352,6 +373,10 @@ impl pb::TxnOpResponse {
352373
}
353374
}
354375

376+
pub fn unchanged_fetch_add_u64(key: impl ToString, seq: u64, value: u64) -> Self {
377+
Self::fetch_add_u64(key, seq, value, seq, value)
378+
}
379+
355380
pub fn fetch_add_u64(
356381
key: impl ToString,
357382
before_seq: u64,

0 commit comments

Comments
 (0)