Skip to content

Commit e6f2251

Browse files
authored
Merge pull request #57 from nrc/minor
Remove some code
2 parents 80dbb52 + 8fbbfbf commit e6f2251

File tree

5 files changed

+32
-117
lines changed

5 files changed

+32
-117
lines changed

src/rpc/client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::{
2323
pd::{PdClient, PdTimestamp, Region, RegionId, RegionVerId, Store, StoreId},
2424
security::SecurityManager,
2525
tikv::KvClient,
26-
util::HandyRwLock,
2726
},
2827
Config, Error, Key, KvPair, Result, Value,
2928
};
@@ -44,7 +43,7 @@ impl RpcClientInner {
4443
let env = Arc::new(
4544
EnvBuilder::new()
4645
.cq_count(CQ_COUNT)
47-
.name_prefix(thd_name!(CLIENT_PREFIX))
46+
.name_prefix(thread_name!(CLIENT_PREFIX))
4847
.build(),
4948
);
5049
let security_mgr = Arc::new(
@@ -111,7 +110,7 @@ impl RpcClientInner {
111110
}
112111

113112
fn kv_client(&self, context: RegionContext) -> Result<(RegionContext, Arc<KvClient>)> {
114-
if let Some(conn) = self.tikv.rl().get(context.address()) {
113+
if let Some(conn) = self.tikv.read().unwrap().get(context.address()) {
115114
return Ok((context, Arc::clone(conn)));
116115
};
117116
info!("connect to tikv endpoint: {:?}", context.address());
@@ -124,7 +123,8 @@ impl RpcClientInner {
124123
)
125124
.map(Arc::new)
126125
.map(|c| {
127-
tikv.wl()
126+
tikv.write()
127+
.unwrap()
128128
.insert(context.address().to_owned(), Arc::clone(&c));
129129
(context, c)
130130
})

src/rpc/pd/client.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@ use kvproto::{metapb, pdpb, pdpb::PdClient as RpcClient};
1414

1515
use crate::{
1616
rpc::{
17+
context::RequestContext,
1718
pd::{
18-
context::{request_context, PdRequestContext},
19-
leader::LeaderClient,
20-
request::Request,
21-
PdTimestamp, Region, RegionId, Store, StoreId,
19+
context::request_context, leader::LeaderClient, request::Request, PdTimestamp, Region,
20+
RegionId, Store, StoreId,
2221
},
2322
security::SecurityManager,
24-
util::HandyRwLock,
2523
},
2624
Error, ErrorKind, Result,
2725
};
@@ -64,7 +62,7 @@ impl PdClient {
6462
timeout: Duration,
6563
) -> Result<PdClient> {
6664
let leader = LeaderClient::connect(env, endpoints, security_mgr, timeout)?;
67-
let cluster_id = leader.rl().cluster_id();
65+
let cluster_id = leader.read().unwrap().cluster_id();
6866

6967
Ok(PdClient {
7068
cluster_id,
@@ -74,7 +72,7 @@ impl PdClient {
7472
}
7573

7674
fn get_leader(&self) -> pdpb::Member {
77-
self.leader.rl().members.get_leader().clone()
75+
self.leader.read().unwrap().members.get_leader().clone()
7876
}
7977

8078
fn get_region_and_leader(
@@ -137,7 +135,7 @@ impl PdClient {
137135

138136
fn execute<Executor, Resp, RpcFuture>(
139137
&self,
140-
mut context: PdRequestContext<Executor>,
138+
mut context: RequestContext<Executor>,
141139
) -> impl Future<Output = Result<Resp>>
142140
where
143141
Executor: FnMut(&RpcClient, CallOption) -> ::grpcio::Result<RpcFuture> + Send + 'static,
@@ -148,7 +146,7 @@ impl PdClient {
148146
let mut executor = context.executor();
149147
let wrapper = move |cli: &RwLock<LeaderClient>| {
150148
let option = CallOption::default().timeout(timeout);
151-
let cli = &cli.rl().client;
149+
let cli = &cli.read().unwrap().client;
152150
executor(cli, option).unwrap().map(|r| match r {
153151
Err(e) => Err(ErrorKind::Grpc(e))?,
154152
Ok(r) => {
@@ -209,7 +207,7 @@ impl PdClient {
209207
}
210208

211209
pub fn get_ts(&self) -> impl Future<Output = Result<PdTimestamp>> {
212-
self.leader.wl().get_ts()
210+
self.leader.write().unwrap().get_ts()
213211
}
214212
}
215213

src/rpc/pd/context.rs

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,22 @@
11
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::ops::{Deref, DerefMut};
4-
53
use lazy_static::*;
64
use prometheus::*;
75

86
use crate::rpc::context::RequestContext;
97

10-
pub struct PdRequestContext<Executor> {
11-
target: RequestContext<Executor>,
12-
}
13-
14-
impl<Executor> Deref for PdRequestContext<Executor> {
15-
type Target = RequestContext<Executor>;
16-
17-
fn deref(&self) -> &Self::Target {
18-
&self.target
19-
}
20-
}
21-
22-
impl<Executor> DerefMut for PdRequestContext<Executor> {
23-
fn deref_mut(&mut self) -> &mut Self::Target {
24-
&mut self.target
25-
}
26-
}
27-
288
pub fn request_context<Executor>(
299
cmd: &'static str,
3010
executor: Executor,
31-
) -> PdRequestContext<Executor> {
32-
PdRequestContext {
33-
target: RequestContext::new(
34-
cmd,
35-
&PD_REQUEST_DURATION_HISTOGRAM_VEC,
36-
&PD_REQUEST_COUNTER_VEC,
37-
&PD_FAILED_REQUEST_DURATION_HISTOGRAM_VEC,
38-
&PD_FAILED_REQUEST_COUNTER_VEC,
39-
executor,
40-
),
41-
}
11+
) -> RequestContext<Executor> {
12+
RequestContext::new(
13+
cmd,
14+
&PD_REQUEST_DURATION_HISTOGRAM_VEC,
15+
&PD_REQUEST_COUNTER_VEC,
16+
&PD_FAILED_REQUEST_DURATION_HISTOGRAM_VEC,
17+
&PD_FAILED_REQUEST_COUNTER_VEC,
18+
executor,
19+
)
4220
}
4321

4422
pub fn observe_tso_batch(batch_size: usize) -> u32 {

src/rpc/pd/leader.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use crate::{
3131
PdTimestamp,
3232
},
3333
security::SecurityManager,
34-
util::HandyRwLock,
3534
},
3635
Error, Result,
3736
};
@@ -130,10 +129,10 @@ impl PdReactor {
130129

131130
fn init(client: &Arc<RwLock<LeaderClient>>, handle: &OtherHandle) {
132131
let client = Arc::clone(client);
133-
let (tx, rx) = client.wl().client.tso().unwrap();
132+
let (tx, rx) = client.write().unwrap().client.tso().unwrap();
134133
let tx = Compat01As03Sink::new(tx);
135134
let rx = Compat01As03::new(rx);
136-
let tso_rx = client.wl().reactor.tso_rx.take().unwrap(); // Receiver<TsoRequest>: Stream
135+
let tso_rx = client.write().unwrap().reactor.tso_rx.take().unwrap(); // Receiver<TsoRequest>: Stream
137136

138137
handle.spawn(
139138
tx.sink_map_err(Into::into)
@@ -157,7 +156,7 @@ impl PdReactor {
157156

158157
handle.spawn(
159158
rx.try_for_each(move |resp| {
160-
let mut client = client.wl();
159+
let mut client = client.write().unwrap();
161160
let reactor = &mut client.reactor;
162161
let tso_pending = reactor.tso_pending.take().unwrap();
163162
reactor.schedule(PdTask::Response(tso_pending, resp));
@@ -173,7 +172,7 @@ impl PdReactor {
173172
}
174173

175174
fn tso_request(client: &Arc<RwLock<LeaderClient>>) {
176-
let mut client = client.wl();
175+
let mut client = client.write().unwrap();
177176
let cluster_id = client.cluster_id;
178177
let reactor = &mut client.reactor;
179178
let mut tso_batch = reactor.tso_buffer.take().unwrap();
@@ -202,7 +201,7 @@ impl PdReactor {
202201
})
203202
.unwrap();
204203
}
205-
client.wl().reactor.tso_buffer = Some(requests);
204+
client.write().unwrap().reactor.tso_buffer = Some(requests);
206205
}
207206

208207
fn dispatch(client: &Arc<RwLock<LeaderClient>>, task: PdTask, handle: &OtherHandle) {
@@ -263,7 +262,7 @@ impl LeaderClient {
263262
timeout,
264263
}));
265264

266-
client.wl().reactor.start(Arc::clone(&client));
265+
client.write().unwrap().reactor.start(Arc::clone(&client));
267266
Ok(client)
268267
}
269268

@@ -275,7 +274,7 @@ impl LeaderClient {
275274
pub fn reconnect(leader: &Arc<RwLock<LeaderClient>>, interval: u64) -> Result<()> {
276275
warn!("updating pd client, blocking the tokio core");
277276
let ((client, members), start) = {
278-
let leader = leader.rl();
277+
let leader = leader.read().unwrap();
279278
if leader.last_update.elapsed() < Duration::from_secs(interval) {
280279
// Avoid unnecessary updating.
281280
return Ok(());
@@ -291,7 +290,7 @@ impl LeaderClient {
291290

292291
{
293292
let leader_clone = Arc::clone(leader);
294-
let mut leader = leader.wl();
293+
let mut leader = leader.write().unwrap();
295294
leader.client = client;
296295
leader.members = members;
297296
leader.last_update = Instant::now();

src/rpc/util.rs

Lines changed: 3 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::{
4-
sync::{mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard},
5-
thread,
6-
time::Duration,
7-
};
3+
use std::{sync::mpsc, thread, time::Duration};
84

95
use lazy_static::*;
106
use tokio_timer::{self, timer::Handle};
@@ -22,31 +18,14 @@ macro_rules! internal_err {
2218
}
2319

2420
/// make a thread name with additional tag inheriting from current thread.
25-
macro_rules! thd_name {
21+
macro_rules! thread_name {
2622
($name:expr) => {{
2723
$crate::rpc::util::get_tag_from_thread_name()
2824
.map(|tag| format!("{}::{}", $name, tag))
2925
.unwrap_or_else(|| $name.to_owned())
3026
}};
3127
}
3228

33-
/// A handy shortcut to replace `RwLock` write/read().unwrap() pattern to
34-
/// shortcut wl and rl.
35-
pub trait HandyRwLock<T> {
36-
fn wl(&self) -> RwLockWriteGuard<T>;
37-
fn rl(&self) -> RwLockReadGuard<T>;
38-
}
39-
40-
impl<T> HandyRwLock<T> for RwLock<T> {
41-
fn wl(&self) -> RwLockWriteGuard<T> {
42-
self.write().unwrap()
43-
}
44-
45-
fn rl(&self) -> RwLockReadGuard<T> {
46-
self.read().unwrap()
47-
}
48-
}
49-
5029
pub fn get_tag_from_thread_name() -> Option<String> {
5130
thread::current()
5231
.name()
@@ -69,7 +48,7 @@ lazy_static! {
6948
fn start_global_timer() -> Handle {
7049
let (tx, rx) = mpsc::channel();
7150
thread::Builder::new()
72-
.name(thd_name!("timer"))
51+
.name(thread_name!("timer"))
7352
.spawn(move || {
7453
let mut timer = tokio_timer::Timer::default();
7554
tx.send(timer.handle()).unwrap();
@@ -83,46 +62,7 @@ fn start_global_timer() -> Handle {
8362

8463
#[cfg(test)]
8564
mod tests {
86-
use super::*;
8765
use futures::compat::Compat01As03;
88-
use std::*;
89-
90-
#[test]
91-
fn test_rwlock_deadlock() {
92-
// If the test runs over 60s, then there is a deadlock.
93-
let mu = RwLock::new(Some(1));
94-
{
95-
let _clone = foo(&mu.rl());
96-
let mut data = mu.wl();
97-
assert!(data.is_some());
98-
*data = None;
99-
}
100-
101-
{
102-
match foo(&mu.rl()) {
103-
Some(_) | None => {
104-
let res = mu.try_write();
105-
assert!(res.is_err());
106-
}
107-
}
108-
}
109-
110-
#[cfg_attr(feature = "cargo-clippy", allow(clippy::clone_on_copy))]
111-
fn foo(a: &Option<usize>) -> Option<usize> {
112-
a.clone()
113-
}
114-
}
115-
116-
#[test]
117-
fn test_internal_error() {
118-
let file_name = file!();
119-
let line_number = line!();
120-
let e = internal_err!("{}", "hi");
121-
assert_eq!(
122-
format!("{}", e),
123-
format!("[{}:{}]: hi", file_name, line_number + 1)
124-
);
125-
}
12666

12767
#[test]
12868
fn test_global_timer() {

0 commit comments

Comments
 (0)