Skip to content

Commit d376d8c

Browse files
authored
fix: MetaGrpcClient deadlock when drop (#16727)
* fix: MetaGrpcClient deadlock when drop Move the reference to the dedicated runtime `rt` from `MetaGrpcClient` to `ClientHandle`. `rt` is a reference to the dedicated runtime for running `MetaGrpcClient`. If all ClientHandle are dropped, the runtime will be destroyed. This `rt` previously is stored in `MetaGrpcClient`, which leads to a deadlock: - When all `ClientHandle` are dropped, the two workers `worker_loop()` and `auto_sync_interval()` will quit. - These two futures both held a reference to `MetaGrpcClient`. - The last of these(say, `F`) two will drop `MetaGrpcClient.rt` and `Runtime::_dropper` will block waiting for the runtime to shut down. - But `F` is still held, deadlock occurs. Other changes: - `Runtime::try_spawn` and several other spawn methods now accept a name argument for display in async backtrace. - Add async-backtrace to `MetaGrpcClient` methods * revert to use the original runtime.block_on() to keep the runtime blocking on a task
1 parent 3036fa8 commit d376d8c

File tree

12 files changed

+328
-198
lines changed

12 files changed

+328
-198
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/base/src/runtime/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub use runtime::execute_futures_in_parallel;
4242
pub use runtime::spawn;
4343
pub use runtime::spawn_blocking;
4444
pub use runtime::spawn_local;
45+
pub use runtime::spawn_named;
4546
pub use runtime::try_block_on;
4647
pub use runtime::try_spawn_blocking;
4748
pub use runtime::Dropper;

src/common/base/src/runtime/runtime.rs

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ use databend_common_exception::Result;
2727
use databend_common_exception::ResultExt;
2828
use futures::future;
2929
use futures::FutureExt;
30+
use log::info;
3031
use log::warn;
3132
use tokio::runtime::Builder;
3233
use tokio::runtime::Handle;
3334
use tokio::sync::oneshot;
3435
use tokio::sync::OwnedSemaphorePermit;
3536
use tokio::sync::Semaphore;
3637

37-
// use tokio::task::JoinHandle;
3838
use crate::runtime::catch_unwind::CatchUnwindFuture;
3939
use crate::runtime::drop_guard;
4040
use crate::runtime::memory::MemStat;
@@ -88,7 +88,7 @@ pub trait TrySpawn {
8888
///
8989
/// It allows to return an error before spawning the task.
9090
#[track_caller]
91-
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
91+
fn try_spawn<T>(&self, task: T, name: Option<String>) -> Result<JoinHandle<T::Output>>
9292
where
9393
T: Future + Send + 'static,
9494
T::Output: Send + 'static;
@@ -102,18 +102,18 @@ pub trait TrySpawn {
102102
T: Future + Send + 'static,
103103
T::Output: Send + 'static,
104104
{
105-
self.try_spawn(task).unwrap()
105+
self.try_spawn(task, None).unwrap()
106106
}
107107
}
108108

109109
impl<S: TrySpawn> TrySpawn for Arc<S> {
110110
#[track_caller]
111-
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
111+
fn try_spawn<T>(&self, task: T, name: Option<String>) -> Result<JoinHandle<T::Output>>
112112
where
113113
T: Future + Send + 'static,
114114
T::Output: Send + 'static,
115115
{
116-
self.as_ref().try_spawn(task)
116+
self.as_ref().try_spawn(task, name)
117117
}
118118

119119
#[track_caller]
@@ -149,10 +149,14 @@ impl Runtime {
149149

150150
let handle = runtime.handle().clone();
151151

152+
let n = name.clone();
152153
// Block the runtime to shutdown.
153154
let join_handler = Thread::spawn(move || {
154-
// We ignore channel is closed.
155155
let _ = runtime.block_on(recv_stop);
156+
info!(
157+
"Runtime({:?}) received shutdown signal, start to shut down",
158+
n
159+
);
156160

157161
match !cfg!(debug_assertions) {
158162
true => false,
@@ -257,7 +261,11 @@ impl Runtime {
257261
#[allow(clippy::disallowed_methods)]
258262
tokio::task::block_in_place(|| {
259263
self.handle
260-
.block_on(location_future(future, std::panic::Location::caller()))
264+
.block_on(location_future(
265+
future,
266+
std::panic::Location::caller(),
267+
None,
268+
))
261269
.with_context(|| "failed to block on future".to_string())
262270
.flatten()
263271
})
@@ -348,20 +356,28 @@ impl Runtime {
348356

349357
impl TrySpawn for Runtime {
350358
#[track_caller]
351-
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
359+
fn try_spawn<T>(&self, task: T, name: Option<String>) -> Result<JoinHandle<T::Output>>
352360
where
353361
T: Future + Send + 'static,
354362
T::Output: Send + 'static,
355363
{
356364
let task = ThreadTracker::tracking_future(task);
357-
let task = match ThreadTracker::query_id() {
358-
None => async_backtrace::location!(String::from(GLOBAL_TASK_DESC)).frame(task),
359-
Some(query_id) => {
360-
async_backtrace::location!(format!("Running query {} spawn task", query_id))
361-
.frame(task)
365+
366+
let location_name = {
367+
if let Some(name) = name {
368+
name
369+
} else {
370+
match ThreadTracker::query_id() {
371+
None => String::from(GLOBAL_TASK_DESC),
372+
Some(query_id) => {
373+
format!("Running query {} spawn task", query_id)
374+
}
375+
}
362376
}
363377
};
364378

379+
let task = async_backtrace::location!(location_name).frame(task);
380+
365381
#[expect(clippy::disallowed_methods)]
366382
Ok(JoinHandle::create(self.handle.spawn(task)))
367383
}
@@ -380,6 +396,7 @@ impl Drop for Dropper {
380396
// Send a signal to say i am dropping.
381397
if let Some(close_sender) = self.close.take() {
382398
if close_sender.send(()).is_ok() {
399+
info!("close_sender to shutdown Runtime is sent");
383400
match self.join_handler.take().unwrap().join() {
384401
Err(e) => warn!("Runtime dropper panic, {:?}", e),
385402
Ok(true) => {
@@ -436,7 +453,25 @@ where
436453
F::Output: Send + 'static,
437454
{
438455
#[expect(clippy::disallowed_methods)]
439-
tokio::spawn(location_future(future, std::panic::Location::caller()))
456+
tokio::spawn(location_future(
457+
future,
458+
std::panic::Location::caller(),
459+
None,
460+
))
461+
}
462+
463+
#[track_caller]
464+
pub fn spawn_named<F>(future: F, name: String) -> tokio::task::JoinHandle<F::Output>
465+
where
466+
F: Future + Send + 'static,
467+
F::Output: Send + 'static,
468+
{
469+
#[expect(clippy::disallowed_methods)]
470+
tokio::spawn(location_future(
471+
future,
472+
std::panic::Location::caller(),
473+
Some(name),
474+
))
440475
}
441476

442477
#[track_caller]
@@ -446,7 +481,11 @@ where
446481
F::Output: Send + 'static,
447482
{
448483
#[expect(clippy::disallowed_methods)]
449-
tokio::task::spawn_local(location_future(future, std::panic::Location::caller()))
484+
tokio::task::spawn_local(location_future(
485+
future,
486+
std::panic::Location::caller(),
487+
None,
488+
))
450489
}
451490

452491
#[track_caller]
@@ -476,8 +515,11 @@ where
476515
pub fn block_on<F: Future>(future: F) -> F::Output {
477516
#[expect(clippy::disallowed_methods)]
478517
tokio::task::block_in_place(|| {
479-
tokio::runtime::Handle::current()
480-
.block_on(location_future(future, std::panic::Location::caller()))
518+
tokio::runtime::Handle::current().block_on(location_future(
519+
future,
520+
std::panic::Location::caller(),
521+
None,
522+
))
481523
})
482524
}
483525

@@ -487,14 +529,19 @@ pub fn try_block_on<F: Future>(future: F) -> std::result::Result<F::Output, F> {
487529
Err(_) => Err(future),
488530
#[expect(clippy::disallowed_methods)]
489531
Ok(handler) => Ok(tokio::task::block_in_place(|| {
490-
handler.block_on(location_future(future, std::panic::Location::caller()))
532+
handler.block_on(location_future(
533+
future,
534+
std::panic::Location::caller(),
535+
None,
536+
))
491537
})),
492538
}
493539
}
494540

495541
fn location_future<F>(
496542
future: F,
497543
frame_location: &'static Location,
544+
frame_name: Option<String>,
498545
) -> impl Future<Output = F::Output>
499546
where
500547
F: Future,
@@ -506,9 +553,13 @@ where
506553
// TODO: tracking payload
507554
let future = ThreadTracker::tracking_future(future);
508555

509-
let frame_name = std::any::type_name::<F>()
510-
.trim_end_matches("::{{closure}}")
511-
.to_string();
556+
let frame_name = if let Some(n) = frame_name {
557+
n
558+
} else {
559+
std::any::type_name::<F>()
560+
.trim_end_matches("::{{closure}}")
561+
.to_string()
562+
};
512563

513564
async_backtrace::location!(
514565
frame_name,

src/meta/client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ test = true
1515

1616
[dependencies]
1717
anyerror = { workspace = true }
18+
async-backtrace = { workspace = true }
1819
databend-common-arrow = { workspace = true }
1920
databend-common-base = { workspace = true }
2021
databend-common-grpc = { workspace = true }

src/meta/client/src/established_client.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,15 @@ impl EstablishedClient {
147147
self.error.lock().take()
148148
}
149149

150+
#[async_backtrace::framed]
150151
pub async fn kv_api(
151152
&mut self,
152153
request: impl tonic::IntoRequest<RaftRequest>,
153154
) -> Result<Response<RaftReply>, Status> {
154155
self.client.kv_api(request).await.update_client(self)
155156
}
156157

158+
#[async_backtrace::framed]
157159
pub async fn kv_read_v1(
158160
&mut self,
159161
request: impl tonic::IntoRequest<RaftRequest>,
@@ -162,41 +164,47 @@ impl EstablishedClient {
162164
resp.update_client(self)
163165
}
164166

167+
#[async_backtrace::framed]
165168
pub async fn export(
166169
&mut self,
167170
request: impl tonic::IntoRequest<Empty>,
168171
) -> Result<Response<Streaming<ExportedChunk>>, Status> {
169172
self.client.export(request).await.update_client(self)
170173
}
171174

175+
#[async_backtrace::framed]
172176
pub async fn export_v1(
173177
&mut self,
174178
request: impl tonic::IntoRequest<pb::ExportRequest>,
175179
) -> Result<Response<Streaming<ExportedChunk>>, Status> {
176180
self.client.export_v1(request).await.update_client(self)
177181
}
178182

183+
#[async_backtrace::framed]
179184
pub async fn watch(
180185
&mut self,
181186
request: impl tonic::IntoRequest<WatchRequest>,
182187
) -> Result<Response<Streaming<WatchResponse>>, Status> {
183188
self.client.watch(request).await.update_client(self)
184189
}
185190

191+
#[async_backtrace::framed]
186192
pub async fn transaction(
187193
&mut self,
188194
request: impl tonic::IntoRequest<TxnRequest>,
189195
) -> Result<Response<TxnReply>, Status> {
190196
self.client.transaction(request).await.update_client(self)
191197
}
192198

199+
#[async_backtrace::framed]
193200
pub async fn member_list(
194201
&mut self,
195202
request: impl tonic::IntoRequest<MemberListRequest>,
196203
) -> Result<Response<MemberListReply>, Status> {
197204
self.client.member_list(request).await.update_client(self)
198205
}
199206

207+
#[async_backtrace::framed]
200208
pub async fn get_cluster_status(
201209
&mut self,
202210
request: impl tonic::IntoRequest<Empty>,
@@ -207,6 +215,7 @@ impl EstablishedClient {
207215
.update_client(self)
208216
}
209217

218+
#[async_backtrace::framed]
210219
pub async fn get_client_info(
211220
&mut self,
212221
request: impl tonic::IntoRequest<Empty>,

0 commit comments

Comments
 (0)