Skip to content

Commit b12f95e

Browse files
authored
pd: fix get_timestamp potentially hang (#432)
* fix get tso hang Signed-off-by: Ping Yu <yuping@pingcap.com> * wake Signed-off-by: Ping Yu <yuping@pingcap.com> * polish Signed-off-by: Ping Yu <yuping@pingcap.com> --------- Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent 802b361 commit b12f95e

File tree

2 files changed

+40
-25
lines changed

2 files changed

+40
-25
lines changed

src/pd/retry.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,18 +70,12 @@ impl<Cl> RetryClient<Cl> {
7070
}
7171
}
7272

73-
macro_rules! retry {
74-
($self: ident, $tag: literal, |$cluster: ident| $call: expr) => {{
73+
macro_rules! retry_core {
74+
($self: ident, $tag: literal, $call: expr) => {{
7575
let stats = pd_stats($tag);
7676
let mut last_err = Ok(());
7777
for _ in 0..LEADER_CHANGE_RETRY {
78-
// use the block here to drop the guard of the read lock,
79-
// otherwise `reconnect` will try to acquire the write lock and results in a deadlock
80-
let res = {
81-
let $cluster = &mut $self.cluster.write().await.0;
82-
let res = $call.await;
83-
res
84-
};
78+
let res = $call;
8579

8680
match stats.done(res) {
8781
Ok(r) => return Ok(r),
@@ -103,6 +97,28 @@ macro_rules! retry {
10397
}};
10498
}
10599

100+
macro_rules! retry_mut {
101+
($self: ident, $tag: literal, |$cluster: ident| $call: expr) => {{
102+
retry_core!($self, $tag, {
103+
// use the block here to drop the guard of the lock,
104+
// otherwise `reconnect` will try to acquire the write lock and results in a deadlock
105+
let $cluster = &mut $self.cluster.write().await.0;
106+
$call.await
107+
})
108+
}};
109+
}
110+
111+
macro_rules! retry {
112+
($self: ident, $tag: literal, |$cluster: ident| $call: expr) => {{
113+
retry_core!($self, $tag, {
114+
// use the block here to drop the guard of the lock,
115+
// otherwise `reconnect` will try to acquire the write lock and results in a deadlock
116+
let $cluster = &$self.cluster.read().await.0;
117+
$call.await
118+
})
119+
}};
120+
}
121+
106122
impl RetryClient<Cluster> {
107123
pub async fn connect(
108124
endpoints: &[String],
@@ -127,7 +143,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
127143
// These get_* functions will try multiple times to make a request, reconnecting as necessary.
128144
// It does not know about encoding. Caller should take care of it.
129145
async fn get_region(self: Arc<Self>, key: Vec<u8>) -> Result<RegionWithLeader> {
130-
retry!(self, "get_region", |cluster| {
146+
retry_mut!(self, "get_region", |cluster| {
131147
let key = key.clone();
132148
async {
133149
cluster
@@ -141,7 +157,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
141157
}
142158

143159
async fn get_region_by_id(self: Arc<Self>, region_id: RegionId) -> Result<RegionWithLeader> {
144-
retry!(self, "get_region_by_id", |cluster| async {
160+
retry_mut!(self, "get_region_by_id", |cluster| async {
145161
cluster
146162
.get_region_by_id(region_id, self.timeout)
147163
.await
@@ -152,7 +168,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
152168
}
153169

154170
async fn get_store(self: Arc<Self>, id: StoreId) -> Result<metapb::Store> {
155-
retry!(self, "get_store", |cluster| async {
171+
retry_mut!(self, "get_store", |cluster| async {
156172
cluster
157173
.get_store(id, self.timeout)
158174
.await
@@ -161,7 +177,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
161177
}
162178

163179
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>> {
164-
retry!(self, "get_all_stores", |cluster| async {
180+
retry_mut!(self, "get_all_stores", |cluster| async {
165181
cluster
166182
.get_all_stores(self.timeout)
167183
.await
@@ -174,7 +190,7 @@ impl RetryClientTrait for RetryClient<Cluster> {
174190
}
175191

176192
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool> {
177-
retry!(self, "update_gc_safepoint", |cluster| async {
193+
retry_mut!(self, "update_gc_safepoint", |cluster| async {
178194
cluster
179195
.update_safepoint(safepoint, self.timeout)
180196
.await
@@ -257,7 +273,7 @@ mod test {
257273
}
258274

259275
async fn retry_err(client: Arc<MockClient>) -> Result<()> {
260-
retry!(client, "test", |_c| ready(Err(internal_err!("whoops"))))
276+
retry_mut!(client, "test", |_c| ready(Err(internal_err!("whoops"))))
261277
}
262278

263279
async fn retry_ok(client: Arc<MockClient>) -> Result<()> {
@@ -310,7 +326,7 @@ mod test {
310326
client: Arc<MockClient>,
311327
max_retries: Arc<AtomicUsize>,
312328
) -> Result<()> {
313-
retry!(client, "test", |c| {
329+
retry_mut!(client, "test", |c| {
314330
c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
315331

316332
let max_retries = max_retries.fetch_sub(1, Ordering::SeqCst) - 1;

src/pd/timestamp.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,13 @@ async fn run_tso(
9898
let mut responses = pd_client.tso(request_stream).await?.into_inner();
9999

100100
while let Some(Ok(resp)) = responses.next().await {
101-
let mut pending_requests = pending_requests.lock().await;
102-
103-
// Wake up the sending future blocked by too many pending requests as we are consuming
104-
// some of them here.
105-
if pending_requests.len() == MAX_PENDING_COUNT {
106-
sending_future_waker.wake();
101+
{
102+
let mut pending_requests = pending_requests.lock().await;
103+
allocate_timestamps(&resp, &mut pending_requests)?;
107104
}
108105

109-
allocate_timestamps(&resp, &mut pending_requests)?;
106+
// Wake up the sending future blocked by too many pending requests or locked.
107+
sending_future_waker.wake();
110108
}
111109
// TODO: distinguish between unexpected stream termination and expected end of test
112110
info!("TSO stream terminated");
@@ -139,6 +137,7 @@ impl Stream for TsoRequestStream {
139137
{
140138
pending_requests
141139
} else {
140+
this.self_waker.register(cx.waker());
142141
return Poll::Pending;
143142
};
144143
let mut requests = Vec::new();
@@ -148,8 +147,8 @@ impl Stream for TsoRequestStream {
148147
Poll::Ready(Some(sender)) => {
149148
requests.push(sender);
150149
}
151-
Poll::Ready(None) => return Poll::Ready(None),
152-
Poll::Pending => break,
150+
Poll::Ready(None) if requests.is_empty() => return Poll::Ready(None),
151+
_ => break,
153152
}
154153
}
155154

0 commit comments

Comments
 (0)