Skip to content

Introduce KeySpace interface for API v2. #353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
1f314ee
fix type checking for new interface
iosmanthus Jun 21, 2022
aa6e4b5
cargo fmt
iosmanthus Jun 21, 2022
222908e
git fire: changing sig for PdRpcClient
iosmanthus Jun 22, 2022
605aed8
fix TxnApiV1 test
iosmanthus Jun 22, 2022
9164cf0
fix all tests for API v1
iosmanthus Jun 23, 2022
7668c37
fix integration tests for API v1
iosmanthus Jun 23, 2022
dd1c7b9
remove specification feature
iosmanthus Jun 23, 2022
60585f6
resolve conflict from master
iosmanthus Jun 24, 2022
3a0df00
reduce type annotation
iosmanthus Jun 24, 2022
7c3de1a
recover impl Trait for resolve_locks*
iosmanthus Jun 24, 2022
02377b7
git fire!: wip api v2
iosmanthus Jun 28, 2022
59002d1
impl KeyspaceCodec, along with RawKeyspaceCodec and TxnKeyspaceCodec
iosmanthus Jul 1, 2022
a440794
fix fmt check
iosmanthus Jul 1, 2022
0a72d31
rename request_codec.rs to codec.rs
iosmanthus Jul 1, 2022
19c3123
git fire!: wip impl RequestCodec for requests
iosmanthus Jul 1, 2022
624c68e
impl KvRequest for raw requests
iosmanthus Jul 4, 2022
71af50d
cargo fmt
iosmanthus Jul 5, 2022
a5f326f
impl KvRequest for txn requests
iosmanthus Jul 5, 2022
5efbc12
make check
iosmanthus Jul 5, 2022
b534cfd
impl Default for KeyspaceId
iosmanthus Jul 5, 2022
434e10d
cargo fmt
iosmanthus Jul 6, 2022
317116b
decode in place
iosmanthus Jul 6, 2022
5c5809a
introduce impl_kv_request to simplify code
iosmanthus Jul 6, 2022
701964d
refactor impl_kv_request macro
iosmanthus Jul 6, 2022
db38d95
fix tests
iosmanthus Jul 6, 2022
3660ca1
move default impl into RequestCodecExt
iosmanthus Jul 7, 2022
7d68247
fix api version context
iosmanthus Jul 7, 2022
81cbddd
move request codec into raw/txn owned module
iosmanthus Jul 8, 2022
5bd0f6e
fix doc test
iosmanthus Jul 8, 2022
1c1ce17
fix wrong TxnCodec mark trait for raw::Keyspace
iosmanthus Jul 9, 2022
051a157
refactor ApiVx with Mode generic para
iosmanthus Jul 11, 2022
159de5c
Merge branch 'master' of github.com:tikv/client-rust into api-v2
iosmanthus Jul 11, 2022
bfd1f28
remove dynamic dispatch while encoding request in Dispatch
iosmanthus Jul 11, 2022
7fc61ea
make IsDefault private in crate
iosmanthus Jul 11, 2022
6baaa94
remove impl Deref for Key
iosmanthus Jul 11, 2022
07c7af7
fix potential index out of range while meets corrupted keyspace prefix
iosmanthus Jul 12, 2022
0ed902d
introduce codec type parameter for Snapshot and Transcation
iosmanthus Jul 12, 2022
182a457
bound Client to right codec
iosmanthus Jul 12, 2022
0e76e10
fix encode range issue with unbound end
iosmanthus Jul 14, 2022
0c3c9e3
Merge branch 'master' of github.com:tikv/client-rust into api-v2
iosmanthus Jul 14, 2022
f46cea8
introduce newer version of proto
iosmanthus Jul 21, 2022
130e1b0
git fire: impl load keyspace
iosmanthus Jul 21, 2022
3819c8a
introduce keyspace mgr from keyspace service
iosmanthus Jul 21, 2022
a2047eb
fix clippy
iosmanthus Jul 22, 2022
45af3c5
refine trait bound of raw::Client
iosmanthus Jul 22, 2022
9de6f50
remove phatom of txn::Client
iosmanthus Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
//! # })}
//! ```

#![feature(specialization)]
#![feature(explicit_generic_args_with_impl_trait)]
#[macro_use]
pub mod request;
#[macro_use]
Expand Down
65 changes: 41 additions & 24 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::{collections::HashMap, marker::PhantomData, sync::Arc, thread};

use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use grpcio::{EnvBuilder, Environment};
use slog::Logger;
use tokio::sync::RwLock;

use tikv_client_pd::Cluster;
use tikv_client_proto::{kvrpcpb, metapb};
use tikv_client_store::{KvClient, KvConnect, TikvConnect};

use crate::{
compat::stream_fn,
kv::codec,
pd::{retry::RetryClientTrait, RetryClient},
region::{RegionId, RegionVerId, RegionWithLeader},
region_cache::RegionCache,
request::request_codec::RequestCodec,
store::RegionStore,
BoundRange, Config, Key, Result, SecurityManager, Timestamp,
};
use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use grpcio::{EnvBuilder, Environment};
use slog::Logger;
use std::{collections::HashMap, sync::Arc, thread};
use tikv_client_pd::Cluster;
use tikv_client_proto::{kvrpcpb, metapb};
use tikv_client_store::{KvClient, KvConnect, TikvConnect};
use tokio::sync::RwLock;

const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "tikv-client";
Expand All @@ -42,6 +46,7 @@ const CLIENT_PREFIX: &str = "tikv-client";
#[async_trait]
pub trait PdClient: Send + Sync + 'static {
type KvClient: KvClient + Send + Sync + 'static;
type RequestCodec: RequestCodec;

/// In transactional API, `region` is decoded (keys in raw format).
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore>;
Expand Down Expand Up @@ -204,22 +209,27 @@ pub trait PdClient: Send + Sync + 'static {
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;

async fn invalidate_region_cache(&self, ver_id: RegionVerId);

fn get_request_codec(&self) -> Self::RequestCodec;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should return reference to Self::RequestCodec?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the only use of RequestCodec is in the construction of Dispatch, and it requires a move, I think Self::RequestCodec is more direct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then we can probably rename it to create_request_codec or new_request_codec which means it's actually a factory method. It's not going to be called many times, and move is the right semantic.

}

/// This client converts requests for the logical TiKV cluster into requests
/// for a single TiKV store using PD and internal logic.
pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
pub struct PdRpcClient<C, KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
pd: Arc<RetryClient<Cl>>,
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
enable_codec: bool,
region_cache: RegionCache<RetryClient<Cl>>,
logger: Logger,
// TODO: change to a real codec.
_phantom: PhantomData<C>,
}

#[async_trait]
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
impl<C: RequestCodec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<C, KvC> {
type KvClient = KvC::KvClient;
type RequestCodec = C;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
let store_id = region.get_store_id()?;
Expand Down Expand Up @@ -260,15 +270,19 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
self.region_cache.invalidate_region_cache(ver_id).await
}

fn get_request_codec(&self) -> Self::RequestCodec {
todo!()
}
}

impl PdRpcClient<TikvConnect, Cluster> {
impl<C> PdRpcClient<C, TikvConnect, Cluster> {
pub async fn connect(
pd_endpoints: &[String],
config: Config,
enable_codec: bool,
logger: Logger,
) -> Result<PdRpcClient> {
) -> Result<PdRpcClient<C, TikvConnect, Cluster>> {
PdRpcClient::new(
config.clone(),
|env, security_mgr| TikvConnect::new(env, security_mgr, config.timeout),
Expand All @@ -291,14 +305,14 @@ fn thread_name(prefix: &str) -> String {
.unwrap_or_else(|| prefix.to_owned())
}

impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
pub async fn new<PdFut, MakeKvC, MakePd>(
config: Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_codec: bool,
logger: Logger,
) -> Result<PdRpcClient<KvC, Cl>>
) -> Result<PdRpcClient<C, KvC, Cl>>
where
PdFut: Future<Output = Result<RetryClient<Cl>>>,
MakeKvC: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> KvC,
Expand Down Expand Up @@ -329,6 +343,8 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
enable_codec,
region_cache: RegionCache::new(pd),
logger,
// TODO
_phantom: PhantomData,
})
}

Expand All @@ -352,10 +368,11 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {

#[cfg(test)]
pub mod test {
use super::*;
use futures::{executor, executor::block_on};

use crate::mock::*;

use futures::{executor, executor::block_on};
use super::*;

#[tokio::test]
async fn test_kv_client_caching() {
Expand Down Expand Up @@ -396,7 +413,7 @@ pub mod test {
vec![1].into(),
vec![2].into(),
vec![3].into(),
vec![5, 2].into()
vec![5, 2].into(),
]
);
assert_eq!(
Expand Down Expand Up @@ -458,36 +475,36 @@ pub mod test {
vec![
kvrpcpb::KeyRange {
start_key: k1.clone(),
end_key: k2.clone()
end_key: k2.clone(),
},
kvrpcpb::KeyRange {
start_key: k1,
end_key: k_split.clone()
}
end_key: k_split.clone(),
},
]
);
assert_eq!(ranges2.0, 2);
assert_eq!(
ranges2.1,
vec![kvrpcpb::KeyRange {
start_key: k_split.clone(),
end_key: k3
end_key: k3,
}]
);
assert_eq!(ranges3.0, 1);
assert_eq!(
ranges3.1,
vec![kvrpcpb::KeyRange {
start_key: k2,
end_key: k_split.clone()
end_key: k_split.clone(),
}]
);
assert_eq!(ranges4.0, 2);
assert_eq!(
ranges4.1,
vec![kvrpcpb::KeyRange {
start_key: k_split,
end_key: k4
end_key: k4,
}]
);
assert!(stream.next().is_none());
Expand Down
38 changes: 22 additions & 16 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use core::ops::Range;
use std::{str::FromStr, sync::Arc, u32};
use std::{marker::PhantomData, str::FromStr, sync::Arc, u32};

use slog::{Drain, Logger};
use tikv_client_common::Error;
Expand All @@ -12,7 +12,7 @@ use crate::{
config::Config,
pd::{PdClient, PdRpcClient},
raw::lowering::*,
request::{Collect, CollectSingle, Plan},
request::{request_codec::RequestCodec, Collect, CollectSingle, Plan},
Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value,
};

Expand All @@ -26,15 +26,16 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
/// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
/// awaited to execute.
#[derive(Clone)]
pub struct Client<PdC: PdClient = PdRpcClient> {
pub struct Client<C, PdC: PdClient = PdRpcClient<C>> {
rpc: Arc<PdC>,
cf: Option<ColumnFamily>,
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
atomic: bool,
logger: Logger,
_phantom: std::marker::PhantomData<C>,
}

impl Client<PdRpcClient> {
impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
/// Create a raw [`Client`] and connect to the TiKV cluster.
///
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
Expand Down Expand Up @@ -103,6 +104,7 @@ impl Client<PdRpcClient> {
cf: None,
atomic: false,
logger,
_phantom: PhantomData,
})
}

Expand Down Expand Up @@ -137,6 +139,7 @@ impl Client<PdRpcClient> {
cf: Some(cf),
atomic: self.atomic,
logger: self.logger.clone(),
_phantom: PhantomData,
}
}

Expand All @@ -154,11 +157,12 @@ impl Client<PdRpcClient> {
cf: self.cf.clone(),
atomic: true,
logger: self.logger.clone(),
_phantom: PhantomData,
}
}
}

impl<PdC: PdClient> Client<PdC> {
impl<C: RequestCodec, PdC: PdClient> Client<C, PdC> {
/// Create a new 'get' request.
///
/// Once resolved this request will result in the fetching of the value associated with the
Expand All @@ -179,7 +183,7 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!(self.logger, "invoking raw get request");
let request = new_raw_get_request(key.into(), self.cf.clone());
let request = new_raw_get_request::<C>(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
Expand Down Expand Up @@ -211,7 +215,8 @@ impl<PdC: PdClient> Client<PdC> {
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_get request");
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
let request =
new_raw_batch_get_request::<C>(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
Expand Down Expand Up @@ -239,7 +244,8 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!(self.logger, "invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let request =
new_raw_put_request::<C>(key.into(), value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
Expand Down Expand Up @@ -271,7 +277,7 @@ impl<PdC: PdClient> Client<PdC> {
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_put request");
let request = new_raw_batch_put_request(
let request = new_raw_batch_put_request::<C>(
pairs.into_iter().map(Into::into),
self.cf.clone(),
self.atomic,
Expand Down Expand Up @@ -303,7 +309,7 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
debug!(self.logger, "invoking raw delete request");
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
let request = new_raw_delete_request::<C>(key.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
Expand Down Expand Up @@ -334,7 +340,7 @@ impl<PdC: PdClient> Client<PdC> {
debug!(self.logger, "invoking raw batch_delete request");
self.assert_non_atomic()?;
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
new_raw_batch_delete_request::<C>(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
Expand Down Expand Up @@ -369,7 +375,7 @@ impl<PdC: PdClient> Client<PdC> {
) -> Result<()> {
debug!(self.logger, "invoking raw delete_range request");
self.assert_non_atomic()?;
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
let request = new_raw_delete_range_request::<C>(range.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(backoff)
.extract_error()
Expand Down Expand Up @@ -520,7 +526,7 @@ impl<PdC: PdClient> Client<PdC> {
) -> Result<(Option<Value>, bool)> {
debug!(self.logger, "invoking raw compare_and_swap request");
self.assert_atomic()?;
let req = new_cas_request(
let req = new_cas_request::<C>(
key.into(),
new_value.into(),
previous_value.into(),
Expand All @@ -543,7 +549,7 @@ impl<PdC: PdClient> Client<PdC> {
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
let copr_version_req = copr_version_req.into();
semver::VersionReq::from_str(&copr_version_req)?;
let req = new_raw_coprocessor_request(
let req = new_raw_coprocessor_request::<C>(
copr_name.into(),
copr_version_req,
ranges.into_iter().map(Into::into),
Expand All @@ -570,7 +576,7 @@ impl<PdC: PdClient> Client<PdC> {
});
}

let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
let request = new_raw_scan_request::<C>(range.into(), limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
Expand All @@ -595,7 +601,7 @@ impl<PdC: PdClient> Client<PdC> {
});
}

let request = new_raw_batch_scan_request(
let request = new_raw_batch_scan_request::<C>(
ranges.into_iter().map(Into::into),
each_limit,
key_only,
Expand Down
Loading