Skip to content

Commit 1f546e1

Browse files
committed
support file based transaction
Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent bbaf317 commit 1f546e1

File tree

11 files changed

+294
-70
lines changed

11 files changed

+294
-70
lines changed

proto/kvrpcpb.proto

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ message PrewriteRequest {
108108
uint64 max_commit_ts = 14;
109109
// The level of assertion to use on this prewrte request.
110110
AssertionLevel assertion_level = 15;
111+
112+
// Reserved for file based transaction.
113+
repeated uint64 txn_file_chunks = 100;
111114
}
112115

113116
message PrewriteResponse {
@@ -196,6 +199,9 @@ message TxnHeartBeatRequest {
196199
uint64 start_version = 3;
197200
// The new TTL the sender would like.
198201
uint64 advise_lock_ttl = 4;
202+
203+
// Reserved for file based transaction.
204+
bool is_txn_file = 100;
199205
}
200206

201207
message TxnHeartBeatResponse {
@@ -232,6 +238,9 @@ message CheckTxnStatusRequest {
232238
// lock, the transaction status could not be decided if the primary lock is pessimistic too and
233239
// it's still uncertain.
234240
bool resolving_pessimistic_lock = 8;
241+
242+
// Reserved for file based transaction.
243+
bool is_txn_file = 100;
235244
}
236245

237246
message CheckTxnStatusResponse {
@@ -282,6 +291,9 @@ message CommitRequest {
282291
repeated bytes keys = 3;
283292
// Timestamp for the end of the transaction. Must be greater than `start_version`.
284293
uint64 commit_version = 4;
294+
295+
// Reserved for file based transaction.
296+
bool is_txn_file = 100;
285297
}
286298

287299
message CommitResponse {
@@ -348,6 +360,9 @@ message BatchRollbackRequest {
348360
uint64 start_version = 2;
349361
// The keys to rollback.
350362
repeated bytes keys = 3;
363+
364+
// Reserved for file based transaction.
365+
bool is_txn_file = 100;
351366
}
352367

353368
message BatchRollbackResponse {
@@ -387,6 +402,9 @@ message ResolveLockRequest {
387402
repeated TxnInfo txn_infos = 4;
388403
// Only resolve specified keys.
389404
repeated bytes keys = 5;
405+
406+
// Reserved for file based transaction.
407+
bool is_txn_file = 100;
390408
}
391409

392410
message ResolveLockResponse {
@@ -774,6 +792,9 @@ message LockInfo {
774792
bool use_async_commit = 8;
775793
uint64 min_commit_ts = 9;
776794
repeated bytes secondaries = 10;
795+
796+
// Reserved for file based transaction.
797+
bool is_txn_file = 100;
777798
}
778799

779800
message KeyError {
@@ -1009,6 +1030,9 @@ message MvccInfo {
10091030
message TxnInfo {
10101031
uint64 txn = 1;
10111032
uint64 status = 2;
1033+
1034+
// Reserved for file based transaction.
1035+
bool is_txn_file = 100;
10121036
}
10131037

10141038
enum Action {

src/common/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ pub enum Error {
108108
inner: Box<Error>,
109109
success_keys: Vec<Vec<u8>>,
110110
},
111+
#[error("Transaction not found error: {:?}", _0)]
112+
TxnNotFound(kvrpcpb::TxnNotFound),
111113
}
112114

113115
impl From<crate::proto::errorpb::Error> for Error {

src/generated/kvrpcpb.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ pub struct PrewriteRequest {
126126
/// The level of assertion to use on this prewrte request.
127127
#[prost(enumeration = "AssertionLevel", tag = "15")]
128128
pub assertion_level: i32,
129+
/// Reserved for file based transaction.
130+
#[prost(uint64, repeated, tag = "100")]
131+
pub txn_file_chunks: ::prost::alloc::vec::Vec<u64>,
129132
}
130133
#[allow(clippy::derive_partial_eq_without_eq)]
131134
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -255,6 +258,9 @@ pub struct TxnHeartBeatRequest {
255258
/// The new TTL the sender would like.
256259
#[prost(uint64, tag = "4")]
257260
pub advise_lock_ttl: u64,
261+
/// Reserved for file based transaction.
262+
#[prost(bool, tag = "100")]
263+
pub is_txn_file: bool,
258264
}
259265
#[allow(clippy::derive_partial_eq_without_eq)]
260266
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -304,6 +310,9 @@ pub struct CheckTxnStatusRequest {
304310
/// it's still uncertain.
305311
#[prost(bool, tag = "8")]
306312
pub resolving_pessimistic_lock: bool,
313+
/// Reserved for file based transaction.
314+
#[prost(bool, tag = "100")]
315+
pub is_txn_file: bool,
307316
}
308317
#[allow(clippy::derive_partial_eq_without_eq)]
309318
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -373,6 +382,9 @@ pub struct CommitRequest {
373382
/// Timestamp for the end of the transaction. Must be greater than `start_version`.
374383
#[prost(uint64, tag = "4")]
375384
pub commit_version: u64,
385+
/// Reserved for file based transaction.
386+
#[prost(bool, tag = "100")]
387+
pub is_txn_file: bool,
376388
}
377389
#[allow(clippy::derive_partial_eq_without_eq)]
378390
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -470,6 +482,9 @@ pub struct BatchRollbackRequest {
470482
/// The keys to rollback.
471483
#[prost(bytes = "vec", repeated, tag = "3")]
472484
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
485+
/// Reserved for file based transaction.
486+
#[prost(bool, tag = "100")]
487+
pub is_txn_file: bool,
473488
}
474489
#[allow(clippy::derive_partial_eq_without_eq)]
475490
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -528,6 +543,9 @@ pub struct ResolveLockRequest {
528543
/// Only resolve specified keys.
529544
#[prost(bytes = "vec", repeated, tag = "5")]
530545
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
546+
/// Reserved for file based transaction.
547+
#[prost(bool, tag = "100")]
548+
pub is_txn_file: bool,
531549
}
532550
#[allow(clippy::derive_partial_eq_without_eq)]
533551
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -1050,6 +1068,9 @@ pub struct LockInfo {
10501068
pub min_commit_ts: u64,
10511069
#[prost(bytes = "vec", repeated, tag = "10")]
10521070
pub secondaries: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
1071+
/// Reserved for file based transaction.
1072+
#[prost(bool, tag = "100")]
1073+
pub is_txn_file: bool,
10531074
}
10541075
#[allow(clippy::derive_partial_eq_without_eq)]
10551076
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -1344,6 +1365,9 @@ pub struct TxnInfo {
13441365
pub txn: u64,
13451366
#[prost(uint64, tag = "2")]
13461367
pub status: u64,
1368+
/// Reserved for file based transaction.
1369+
#[prost(bool, tag = "100")]
1370+
pub is_txn_file: bool,
13471371
}
13481372
#[allow(clippy::derive_partial_eq_without_eq)]
13491373
#[derive(Clone, PartialEq, ::prost::Message)]

src/raw/requests.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,6 @@ mod test {
501501

502502
use super::*;
503503
use crate::backoff::DEFAULT_REGION_BACKOFF;
504-
use crate::backoff::OPTIMISTIC_BACKOFF;
505504
use crate::mock::MockKvClient;
506505
use crate::mock::MockPdClient;
507506
use crate::proto::kvrpcpb;
@@ -542,7 +541,6 @@ mod test {
542541
};
543542
let encoded_scan = EncodedRequest::new(scan, client.get_codec());
544543
let plan = crate::request::PlanBuilder::new(client, encoded_scan)
545-
.resolve_lock(OPTIMISTIC_BACKOFF)
546544
.retry_multi_region(DEFAULT_REGION_BACKOFF)
547545
.merge(Collect)
548546
.plan();

src/request/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ mod test {
110110
use crate::Error;
111111
use crate::Key;
112112
use crate::Result;
113+
use crate::TimestampExt as _;
113114

114115
#[tokio::test]
115116
async fn test_region_retry() {
@@ -201,7 +202,7 @@ mod test {
201202

202203
let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
203204
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
204-
.resolve_lock(Backoff::no_jitter_backoff(1, 1, 3))
205+
.resolve_lock(Timestamp::max(), Backoff::no_jitter_backoff(1, 1, 3))
205206
.retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
206207
.extract_error()
207208
.plan();
@@ -228,14 +229,14 @@ mod test {
228229

229230
// does not extract error
230231
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req.clone())
231-
.resolve_lock(OPTIMISTIC_BACKOFF)
232+
.resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF)
232233
.retry_multi_region(OPTIMISTIC_BACKOFF)
233234
.plan();
234235
assert!(plan.execute().await.is_ok());
235236

236237
// extract error
237238
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
238-
.resolve_lock(OPTIMISTIC_BACKOFF)
239+
.resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF)
239240
.retry_multi_region(OPTIMISTIC_BACKOFF)
240241
.extract_error()
241242
.plan();

src/request/plan.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::transaction::ResolveLocksOptions;
3434
use crate::util::iter::FlatMapOkIterExt;
3535
use crate::Error;
3636
use crate::Result;
37+
use crate::Timestamp;
3738

3839
/// A plan for how to execute a request. A user builds up a plan with various
3940
/// options, then exectutes it.
@@ -544,6 +545,7 @@ pub struct DefaultProcessor;
544545

545546
pub struct ResolveLock<P: Plan, PdC: PdClient> {
546547
pub inner: P,
548+
pub timestamp: Timestamp,
547549
pub pd_client: Arc<PdC>,
548550
pub backoff: Backoff,
549551
}
@@ -552,6 +554,7 @@ impl<P: Plan, PdC: PdClient> Clone for ResolveLock<P, PdC> {
552554
fn clone(&self) -> Self {
553555
ResolveLock {
554556
inner: self.inner.clone(),
557+
timestamp: self.timestamp.clone(),
555558
pd_client: self.pd_client.clone(),
556559
backoff: self.backoff.clone(),
557560
}
@@ -579,7 +582,8 @@ where
579582
}
580583

581584
let pd_client = self.pd_client.clone();
582-
let live_locks = resolve_locks(locks, pd_client.clone()).await?;
585+
let live_locks =
586+
resolve_locks(locks, self.timestamp.clone(), pd_client.clone()).await?;
583587
if live_locks.is_empty() {
584588
result = self.inner.execute().await?;
585589
} else {
@@ -856,6 +860,7 @@ mod test {
856860
use super::*;
857861
use crate::mock::MockPdClient;
858862
use crate::proto::kvrpcpb::BatchGetResponse;
863+
use crate::TimestampExt as _;
859864

860865
#[derive(Clone)]
861866
struct ErrPlan;
@@ -889,6 +894,7 @@ mod test {
889894
let plan = RetryableMultiRegion {
890895
inner: ResolveLock {
891896
inner: ErrPlan,
897+
timestamp: Timestamp::max(),
892898
backoff: Backoff::no_backoff(),
893899
pd_client: Arc::new(MockPdClient::default()),
894900
},

src/request/plan_builder.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::transaction::HasLocks;
3030
use crate::transaction::ResolveLocksContext;
3131
use crate::transaction::ResolveLocksOptions;
3232
use crate::Result;
33+
use crate::Timestamp;
3334

3435
/// Builder type for plans (see that module for more).
3536
pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
@@ -69,14 +70,19 @@ impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted> {
6970

7071
impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
7172
/// If there is a lock error, then resolve the lock and retry the request.
72-
pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
73+
pub fn resolve_lock(
74+
self,
75+
timestamp: Timestamp,
76+
backoff: Backoff,
77+
) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
7378
where
7479
P::Result: HasLocks,
7580
{
7681
PlanBuilder {
7782
pd_client: self.pd_client.clone(),
7883
plan: ResolveLock {
7984
inner: self.plan,
85+
timestamp,
8086
backoff,
8187
pd_client: self.pd_client,
8288
},

src/timestamp.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ pub trait TimestampExt: Sized {
2222
fn from_version(version: u64) -> Self;
2323
/// Convert u64 to an optional timestamp, where `0` represents no timestamp.
2424
fn try_from_version(version: u64) -> Option<Self>;
25+
/// Return the maximum timestamp.
26+
fn max() -> Self {
27+
Self::from_version(u64::MAX)
28+
}
2529
}
2630

2731
impl TimestampExt for Timestamp {

0 commit comments

Comments
 (0)