Skip to content

Commit 477ac94

Browse files
committed
Improve readability of utils
Also removes two tests which afaict are testing the std lib, not this crate Signed-off-by: Nick Cameron <nrc@ncameron.org>
1 parent 89aee91 commit 477ac94

File tree

4 files changed

+19
-81
lines changed

4 files changed

+19
-81
lines changed

src/rpc/client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use crate::{
2424
pd::{PdClient, PdTimestamp, Region, RegionId, RegionVerId, Store, StoreId},
2525
security::SecurityManager,
2626
tikv::KvClient,
27-
util::HandyRwLock,
2827
},
2928
Config, Error, Key, KvPair, Result, Value,
3029
};
@@ -45,7 +44,7 @@ impl RpcClientInner {
4544
let env = Arc::new(
4645
EnvBuilder::new()
4746
.cq_count(CQ_COUNT)
48-
.name_prefix(thd_name!(CLIENT_PREFIX))
47+
.name_prefix(thread_name!(CLIENT_PREFIX))
4948
.build(),
5049
);
5150
let security_mgr = Arc::new(
@@ -112,7 +111,7 @@ impl RpcClientInner {
112111
}
113112

114113
fn kv_client(&self, context: RegionContext) -> Result<(RegionContext, Arc<KvClient>)> {
115-
if let Some(conn) = self.tikv.rl().get(context.address()) {
114+
if let Some(conn) = self.tikv.read().unwrap().get(context.address()) {
116115
return Ok((context, Arc::clone(conn)));
117116
};
118117
info!("connect to tikv endpoint: {:?}", context.address());
@@ -125,7 +124,8 @@ impl RpcClientInner {
125124
)
126125
.map(Arc::new)
127126
.map(|c| {
128-
tikv.wl()
127+
tikv.write()
128+
.unwrap()
129129
.insert(context.address().to_owned(), Arc::clone(&c));
130130
(context, c)
131131
})

src/rpc/pd/client.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use crate::{
2121
PdTimestamp, Region, RegionId, Store, StoreId,
2222
},
2323
security::SecurityManager,
24-
util::HandyRwLock,
2524
},
2625
Error, ErrorKind, Result,
2726
};
@@ -64,7 +63,7 @@ impl PdClient {
6463
timeout: Duration,
6564
) -> Result<PdClient> {
6665
let leader = LeaderClient::connect(env, endpoints, security_mgr, timeout)?;
67-
let cluster_id = leader.rl().cluster_id();
66+
let cluster_id = leader.read().unwrap().cluster_id();
6867

6968
Ok(PdClient {
7069
cluster_id,
@@ -74,7 +73,7 @@ impl PdClient {
7473
}
7574

7675
fn get_leader(&self) -> pdpb::Member {
77-
self.leader.rl().members.get_leader().clone()
76+
self.leader.read().unwrap().members.get_leader().clone()
7877
}
7978

8079
fn get_region_and_leader(
@@ -148,7 +147,7 @@ impl PdClient {
148147
let mut executor = context.executor();
149148
let wrapper = move |cli: &RwLock<LeaderClient>| {
150149
let option = CallOption::default().timeout(timeout);
151-
let cli = &cli.rl().client;
150+
let cli = &cli.read().unwrap().client;
152151
executor(cli, option).unwrap().map(|r| match r {
153152
Err(e) => Err(ErrorKind::Grpc(e))?,
154153
Ok(r) => {
@@ -209,7 +208,7 @@ impl PdClient {
209208
}
210209

211210
pub fn get_ts(&self) -> impl Future<Output = Result<PdTimestamp>> {
212-
self.leader.wl().get_ts()
211+
self.leader.write().unwrap().get_ts()
213212
}
214213
}
215214

src/rpc/pd/leader.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use crate::{
3131
PdTimestamp,
3232
},
3333
security::SecurityManager,
34-
util::HandyRwLock,
3534
},
3635
Error, Result,
3736
};
@@ -131,10 +130,10 @@ impl PdReactor {
131130

132131
fn init(client: &Arc<RwLock<LeaderClient>>, handle: &OtherHandle) {
133132
let client = Arc::clone(client);
134-
let (tx, rx) = client.wl().client.tso().unwrap();
133+
let (tx, rx) = client.write().unwrap().client.tso().unwrap();
135134
let tx = Compat01As03Sink::new(tx);
136135
let rx = Compat01As03::new(rx);
137-
let tso_rx = client.wl().reactor.tso_rx.take().unwrap(); // Receiver<TsoRequest>: Stream
136+
let tso_rx = client.write().unwrap().reactor.tso_rx.take().unwrap(); // Receiver<TsoRequest>: Stream
138137

139138
handle.spawn(
140139
tx.sink_map_err(Into::into)
@@ -158,7 +157,7 @@ impl PdReactor {
158157

159158
handle.spawn(
160159
rx.try_for_each(move |resp| {
161-
let mut client = client.wl();
160+
let mut client = client.write().unwrap();
162161
let reactor = &mut client.reactor;
163162
let tso_pending = reactor.tso_pending.take().unwrap();
164163
reactor.schedule(PdTask::Response(tso_pending, resp));
@@ -174,7 +173,7 @@ impl PdReactor {
174173
}
175174

176175
fn tso_request(client: &Arc<RwLock<LeaderClient>>) {
177-
let mut client = client.wl();
176+
let mut client = client.write().unwrap();
178177
let cluster_id = client.cluster_id;
179178
let reactor = &mut client.reactor;
180179
let mut tso_batch = reactor.tso_buffer.take().unwrap();
@@ -203,7 +202,7 @@ impl PdReactor {
203202
})
204203
.unwrap();
205204
}
206-
client.wl().reactor.tso_buffer = Some(requests);
205+
client.write().unwrap().reactor.tso_buffer = Some(requests);
207206
}
208207

209208
fn dispatch(client: &Arc<RwLock<LeaderClient>>, task: PdTask, handle: &OtherHandle) {
@@ -264,7 +263,7 @@ impl LeaderClient {
264263
timeout,
265264
}));
266265

267-
client.wl().reactor.start(Arc::clone(&client));
266+
client.write().unwrap().reactor.start(Arc::clone(&client));
268267
Ok(client)
269268
}
270269

@@ -276,7 +275,7 @@ impl LeaderClient {
276275
pub fn reconnect(leader: &Arc<RwLock<LeaderClient>>, interval: u64) -> Result<()> {
277276
warn!("updating pd client, blocking the tokio core");
278277
let ((client, members), start) = {
279-
let leader = leader.rl();
278+
let leader = leader.read().unwrap();
280279
if leader.last_update.elapsed() < Duration::from_secs(interval) {
281280
// Avoid unnecessary updating.
282281
return Ok(());
@@ -292,7 +291,7 @@ impl LeaderClient {
292291

293292
{
294293
let leader_clone = Arc::clone(leader);
295-
let mut leader = leader.wl();
294+
let mut leader = leader.write().unwrap();
296295
leader.client = client;
297296
leader.members = members;
298297
leader.last_update = Instant::now();

src/rpc/util.rs

Lines changed: 3 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::{
4-
sync::{mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard},
5-
thread,
6-
time::Duration,
7-
};
3+
use std::{sync::mpsc, thread, time::Duration};
84

95
use lazy_static::*;
106
use tokio_timer::{self, timer::Handle};
@@ -22,31 +18,14 @@ macro_rules! internal_err {
2218
}
2319

2420
/// make a thread name with additional tag inheriting from current thread.
25-
macro_rules! thd_name {
21+
macro_rules! thread_name {
2622
($name:expr) => {{
2723
$crate::rpc::util::get_tag_from_thread_name()
2824
.map(|tag| format!("{}::{}", $name, tag))
2925
.unwrap_or_else(|| $name.to_owned())
3026
}};
3127
}
3228

33-
/// A handy shortcut to replace `RwLock` write/read().unwrap() pattern to
34-
/// shortcut wl and rl.
35-
pub trait HandyRwLock<T> {
36-
fn wl(&self) -> RwLockWriteGuard<T>;
37-
fn rl(&self) -> RwLockReadGuard<T>;
38-
}
39-
40-
impl<T> HandyRwLock<T> for RwLock<T> {
41-
fn wl(&self) -> RwLockWriteGuard<T> {
42-
self.write().unwrap()
43-
}
44-
45-
fn rl(&self) -> RwLockReadGuard<T> {
46-
self.read().unwrap()
47-
}
48-
}
49-
5029
pub fn get_tag_from_thread_name() -> Option<String> {
5130
thread::current()
5231
.name()
@@ -69,7 +48,7 @@ lazy_static! {
6948
fn start_global_timer() -> Handle {
7049
let (tx, rx) = mpsc::channel();
7150
thread::Builder::new()
72-
.name(thd_name!("timer"))
51+
.name(thread_name!("timer"))
7352
.spawn(move || {
7453
let mut timer = tokio_timer::Timer::default();
7554
tx.send(timer.handle()).unwrap();
@@ -83,46 +62,7 @@ fn start_global_timer() -> Handle {
8362

8463
#[cfg(test)]
8564
mod tests {
86-
use super::*;
8765
use futures::compat::Compat01As03;
88-
use std::*;
89-
90-
#[test]
91-
fn test_rwlock_deadlock() {
92-
// If the test runs over 60s, then there is a deadlock.
93-
let mu = RwLock::new(Some(1));
94-
{
95-
let _clone = foo(&mu.rl());
96-
let mut data = mu.wl();
97-
assert!(data.is_some());
98-
*data = None;
99-
}
100-
101-
{
102-
match foo(&mu.rl()) {
103-
Some(_) | None => {
104-
let res = mu.try_write();
105-
assert!(res.is_err());
106-
}
107-
}
108-
}
109-
110-
#[cfg_attr(feature = "cargo-clippy", allow(clippy::clone_on_copy))]
111-
fn foo(a: &Option<usize>) -> Option<usize> {
112-
a.clone()
113-
}
114-
}
115-
116-
#[test]
117-
fn test_internal_error() {
118-
let file_name = file!();
119-
let line_number = line!();
120-
let e = internal_err!("{}", "hi");
121-
assert_eq!(
122-
format!("{}", e),
123-
format!("[{}:{}]: hi", file_name, line_number + 1)
124-
);
125-
}
12666

12767
#[test]
12868
fn test_global_timer() {

0 commit comments

Comments
 (0)