Skip to content

Commit 8f54e61

Browse files
author
Andrey Koshchiy
authored
Batch split for prewrite and commit requests (#390)
Signed-off-by: Andrey Koshchiy <roguepnz@gmail.com>
1 parent a675082 commit 8f54e61

File tree

6 files changed

+190
-6
lines changed

6 files changed

+190
-6
lines changed

config/tikv.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ region-split-check-diff = "1B"
88
pd-heartbeat-tick-interval = "2s"
99
pd-store-heartbeat-tick-interval = "5s"
1010
split-region-check-tick-interval = "1s"
11+
raft-entry-max-size = "1MB"
1112

1213
[rocksdb]
1314
max-open-files = 10000

src/request/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub use self::{
1515
ResponseWithShard, RetryableMultiRegion,
1616
},
1717
plan_builder::{PlanBuilder, SingleKey},
18-
shard::{HasNextBatch, NextBatch, Shardable},
18+
shard::{Batchable, HasNextBatch, NextBatch, Shardable},
1919
};
2020

2121
pub mod plan;

src/request/shard.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,33 @@ pub trait Shardable {
3838
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()>;
3939
}
4040

41+
pub trait Batchable {
42+
type Item;
43+
44+
fn batches(items: Vec<Self::Item>, batch_size: u64) -> Vec<Vec<Self::Item>> {
45+
let mut batches: Vec<Vec<Self::Item>> = Vec::new();
46+
let mut batch: Vec<Self::Item> = Vec::new();
47+
let mut size = 0;
48+
49+
for item in items {
50+
let item_size = Self::item_size(&item);
51+
if size + item_size >= batch_size && !batch.is_empty() {
52+
batches.push(batch);
53+
batch = Vec::new();
54+
size = 0;
55+
}
56+
size += item_size;
57+
batch.push(item);
58+
}
59+
if !batch.is_empty() {
60+
batches.push(batch)
61+
}
62+
batches
63+
}
64+
65+
fn item_size(item: &Self::Item) -> u64;
66+
}
67+
4168
// Use to iterate in a region for scan requests that have batch size limit.
4269
// HasNextBatch use to get the next batch according to previous response.
4370
pub trait HasNextBatch {
@@ -200,3 +227,60 @@ macro_rules! shardable_range {
200227
}
201228
};
202229
}
230+
231+
#[cfg(test)]
232+
mod test {
233+
use rand::{thread_rng, Rng};
234+
235+
use super::Batchable;
236+
237+
#[test]
238+
fn test_batches() {
239+
let mut rng = thread_rng();
240+
241+
let items: Vec<_> = (0..3)
242+
.map(|_| (0..2).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())
243+
.collect();
244+
245+
let batch_size = 5;
246+
247+
let batches = BatchableTest::batches(items.clone(), batch_size);
248+
249+
assert_eq!(batches.len(), 2);
250+
assert_eq!(batches[0].len(), 2);
251+
assert_eq!(batches[1].len(), 1);
252+
assert_eq!(batches[0][0], items[0]);
253+
assert_eq!(batches[0][1], items[1]);
254+
assert_eq!(batches[1][0], items[2]);
255+
}
256+
257+
#[test]
258+
fn test_batches_big_item() {
259+
let mut rng = thread_rng();
260+
261+
let items: Vec<_> = (0..3)
262+
.map(|_| (0..3).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())
263+
.collect();
264+
265+
let batch_size = 2;
266+
267+
let batches = BatchableTest::batches(items.clone(), batch_size);
268+
269+
assert_eq!(batches.len(), 3);
270+
for i in 0..items.len() {
271+
let batch = &batches[i];
272+
assert_eq!(batch.len(), 1);
273+
assert_eq!(batch[0], items[i]);
274+
}
275+
}
276+
277+
struct BatchableTest;
278+
279+
impl Batchable for BatchableTest {
280+
type Item = Vec<u8>;
281+
282+
fn item_size(item: &Self::Item) -> u64 {
283+
item.len() as u64
284+
}
285+
}
286+
}

src/transaction/requests.rs

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use crate::{
44
collect_first,
55
pd::PdClient,
66
request::{
7-
Collect, CollectSingle, CollectWithShard, DefaultProcessor, HasNextBatch, KvRequest, Merge,
8-
NextBatch, Process, ResponseWithShard, Shardable, SingleKey,
7+
Batchable, Collect, CollectSingle, CollectWithShard, DefaultProcessor, HasNextBatch,
8+
KvRequest, Merge, NextBatch, Process, ResponseWithShard, Shardable, SingleKey,
99
},
1010
store::{store_stream_for_keys, store_stream_for_range, RegionStore},
1111
timestamp::TimestampExt,
@@ -14,14 +14,19 @@ use crate::{
1414
KvPair, Result, Value,
1515
};
1616
use either::Either;
17-
use futures::stream::BoxStream;
17+
use futures::{
18+
stream::{self, BoxStream},
19+
StreamExt,
20+
};
1821
use std::{cmp, iter, sync::Arc};
1922
use tikv_client_common::Error::PessimisticLockError;
2023
use tikv_client_proto::{
2124
kvrpcpb::{self, LockInfo, TxnHeartBeatResponse, TxnInfo},
2225
pdpb::Timestamp,
2326
};
2427

28+
use super::transaction::TXN_COMMIT_BATCH_SIZE;
29+
2530
// implement HasLocks for a response type that has a `pairs` field,
2631
// where locks can be extracted from both the `pairs` and `error` fields
2732
macro_rules! pair_locks {
@@ -256,7 +261,18 @@ impl Shardable for kvrpcpb::PrewriteRequest {
256261
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
257262
let mut mutations = self.mutations.clone();
258263
mutations.sort_by(|a, b| a.key.cmp(&b.key));
264+
259265
store_stream_for_keys(mutations.into_iter(), pd_client.clone())
266+
.flat_map(|result| match result {
267+
Ok((mutations, store)) => stream::iter(kvrpcpb::PrewriteRequest::batches(
268+
mutations,
269+
TXN_COMMIT_BATCH_SIZE,
270+
))
271+
.map(move |batch| Ok((batch, store.clone())))
272+
.boxed(),
273+
Err(e) => stream::iter(Err(e)).boxed(),
274+
})
275+
.boxed()
260276
}
261277

262278
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
@@ -277,6 +293,16 @@ impl Shardable for kvrpcpb::PrewriteRequest {
277293
}
278294
}
279295

296+
impl Batchable for kvrpcpb::PrewriteRequest {
297+
type Item = kvrpcpb::Mutation;
298+
299+
fn item_size(item: &Self::Item) -> u64 {
300+
let mut size = item.get_key().len() as u64;
301+
size += item.get_value().len() as u64;
302+
size
303+
}
304+
}
305+
280306
pub fn new_commit_request(
281307
keys: Vec<Vec<u8>>,
282308
start_version: u64,
@@ -294,7 +320,42 @@ impl KvRequest for kvrpcpb::CommitRequest {
294320
type Response = kvrpcpb::CommitResponse;
295321
}
296322

297-
shardable_keys!(kvrpcpb::CommitRequest);
323+
impl Shardable for kvrpcpb::CommitRequest {
324+
type Shard = Vec<Vec<u8>>;
325+
326+
fn shards(
327+
&self,
328+
pd_client: &Arc<impl PdClient>,
329+
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
330+
let mut keys = self.keys.clone();
331+
keys.sort();
332+
333+
store_stream_for_keys(keys.into_iter(), pd_client.clone())
334+
.flat_map(|result| match result {
335+
Ok((keys, store)) => {
336+
stream::iter(kvrpcpb::CommitRequest::batches(keys, TXN_COMMIT_BATCH_SIZE))
337+
.map(move |batch| Ok((batch, store.clone())))
338+
.boxed()
339+
}
340+
Err(e) => stream::iter(Err(e)).boxed(),
341+
})
342+
.boxed()
343+
}
344+
345+
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
346+
self.set_context(store.region_with_leader.context()?);
347+
self.set_keys(shard.into_iter().map(Into::into).collect());
348+
Ok(())
349+
}
350+
}
351+
352+
impl Batchable for kvrpcpb::CommitRequest {
353+
type Item = Vec<u8>;
354+
355+
fn item_size(item: &Self::Item) -> u64 {
356+
item.len() as u64
357+
}
358+
}
298359

299360
pub fn new_batch_rollback_request(
300361
keys: Vec<Vec<u8>>,

src/transaction/transaction.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,7 @@ const DEFAULT_LOCK_TTL: u64 = 3000;
945945
const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_millis(MAX_TTL / 2);
946946
/// TiKV recommends each RPC packet should be less than around 1MB. We keep KV size of
947947
/// each request below 16KB.
948-
const TXN_COMMIT_BATCH_SIZE: u64 = 16 * 1024;
948+
pub const TXN_COMMIT_BATCH_SIZE: u64 = 16 * 1024;
949949
const TTL_FACTOR: f64 = 6000.0;
950950

951951
/// Optimistic or pessimistic transaction.

tests/integration_tests.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,44 @@ async fn txn_pessimistic() -> Result<()> {
171171
Ok(())
172172
}
173173

174+
#[tokio::test]
175+
#[serial]
176+
async fn txn_split_batch() -> Result<()> {
177+
init().await?;
178+
179+
let client = TransactionClient::new(pd_addrs(), None).await?;
180+
let mut txn = client.begin_optimistic().await?;
181+
let mut rng = thread_rng();
182+
183+
// testing with raft-entry-max-size = "1MB"
184+
let keys_count: usize = 1000;
185+
let val_len = 15000;
186+
187+
let values: Vec<_> = (0..keys_count)
188+
.map(|_| (0..val_len).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())
189+
.collect();
190+
191+
for (i, value) in values.iter().enumerate() {
192+
let key = Key::from(i.to_be_bytes().to_vec());
193+
txn.put(key, value.clone()).await?;
194+
}
195+
196+
txn.commit().await?;
197+
198+
let mut snapshot = client.snapshot(
199+
client.current_timestamp().await?,
200+
TransactionOptions::new_optimistic(),
201+
);
202+
203+
for (i, value) in values.iter().enumerate() {
204+
let key = Key::from(i.to_be_bytes().to_vec());
205+
let from_snapshot = snapshot.get(key).await?.unwrap();
206+
assert_eq!(from_snapshot, value.clone());
207+
}
208+
209+
Ok(())
210+
}
211+
174212
/// bank transfer mainly tests raw put and get
175213
#[tokio::test]
176214
#[serial]

0 commit comments

Comments
 (0)