Skip to content

Commit 0aed25a

Browse files
authored
fix(query): fix unexpected panic message (#16221)
1 parent 09530b8 commit 0aed25a

File tree

11 files changed

+93
-91
lines changed

11 files changed

+93
-91
lines changed

src/common/base/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub mod version;
3737

3838
pub use runtime::dump_backtrace;
3939
pub use runtime::get_all_tasks;
40-
pub use runtime::match_join_handle;
4140
pub use runtime::set_alloc_error_hook;
4241
pub use runtime::AsyncTaskItem;
42+
pub use runtime::JoinHandle;
4343
pub use runtime::GLOBAL_TASK;

src/common/base/src/runtime/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ pub use memory::MemStat;
3939
pub use memory::GLOBAL_MEM_STAT;
4040
pub use runtime::block_on;
4141
pub use runtime::execute_futures_in_parallel;
42-
pub use runtime::match_join_handle;
4342
pub use runtime::spawn;
4443
pub use runtime::spawn_blocking;
4544
pub use runtime::spawn_local;
4645
pub use runtime::try_block_on;
4746
pub use runtime::try_spawn_blocking;
4847
pub use runtime::Dropper;
48+
pub use runtime::JoinHandle;
4949
pub use runtime::Runtime;
5050
pub use runtime::TrySpawn;
5151
pub use runtime::GLOBAL_TASK;

src/common/base/src/runtime/runtime.rs

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,72 @@
1515
use std::backtrace::Backtrace;
1616
use std::future::Future;
1717
use std::panic::Location;
18+
use std::pin::Pin;
1819
use std::sync::Arc;
20+
use std::task::Context;
21+
use std::task::Poll;
1922
use std::time::Duration;
2023
use std::time::Instant;
2124

2225
use databend_common_exception::ErrorCode;
2326
use databend_common_exception::Result;
2427
use futures::future;
28+
use futures::FutureExt;
2529
use log::warn;
2630
use tokio::runtime::Builder;
2731
use tokio::runtime::Handle;
2832
use tokio::sync::oneshot;
2933
use tokio::sync::OwnedSemaphorePermit;
3034
use tokio::sync::Semaphore;
31-
use tokio::task::JoinHandle;
3235

36+
// use tokio::task::JoinHandle;
3337
use crate::runtime::catch_unwind::CatchUnwindFuture;
3438
use crate::runtime::drop_guard;
3539
use crate::runtime::memory::MemStat;
3640
use crate::runtime::Thread;
3741
use crate::runtime::ThreadJoinHandle;
3842
use crate::runtime::ThreadTracker;
3943

44+
pub struct JoinHandle<Output> {
45+
inner: tokio::task::JoinHandle<Output>,
46+
}
47+
48+
impl<Output> JoinHandle<Output> {
49+
pub fn create(inner: tokio::task::JoinHandle<Output>) -> Self {
50+
Self { inner }
51+
}
52+
53+
pub fn abort(&self) {
54+
self.inner.abort();
55+
}
56+
}
57+
58+
impl<Output> Future for JoinHandle<Output> {
59+
type Output = Result<Output>;
60+
61+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62+
match self.inner.poll_unpin(cx) {
63+
Poll::Pending => Poll::Pending,
64+
Poll::Ready(res) => match res {
65+
Ok(res) => Poll::Ready(Ok(res)),
66+
Err(error) => match error.is_panic() {
67+
true => {
68+
let cause = error.into_panic();
69+
Poll::Ready(Err(match cause.downcast_ref::<&'static str>() {
70+
None => match cause.downcast_ref::<String>() {
71+
None => ErrorCode::PanicError("Sorry, unknown panic message"),
72+
Some(message) => ErrorCode::PanicError(message.to_string()),
73+
},
74+
Some(message) => ErrorCode::PanicError(message.to_string()),
75+
}))
76+
}
77+
false => Poll::Ready(Err(ErrorCode::TokioError("Tokio task is cancelled"))),
78+
},
79+
},
80+
}
81+
}
82+
}
83+
4084
/// Methods to spawn tasks.
4185
pub trait TrySpawn {
4286
/// Tries to spawn a new asynchronous task, returning a tokio::JoinHandle for it.
@@ -277,7 +321,7 @@ impl Runtime {
277321
fut(permit).await
278322
}),
279323
));
280-
handlers.push(handler)
324+
handlers.push(JoinHandle::create(handler))
281325
}
282326

283327
Ok(handlers)
@@ -292,11 +336,11 @@ impl Runtime {
292336
R: Send + 'static,
293337
{
294338
#[allow(clippy::disallowed_methods)]
295-
match_join_handle(
339+
let handle = JoinHandle::create(
296340
self.handle
297341
.spawn_blocking(ThreadTracker::tracking_function(f)),
298-
)
299-
.await
342+
);
343+
handle.await.flatten()
300344
}
301345
}
302346

@@ -317,7 +361,7 @@ impl TrySpawn for Runtime {
317361
};
318362

319363
#[expect(clippy::disallowed_methods)]
320-
Ok(self.handle.spawn(task))
364+
Ok(JoinHandle::create(self.handle.spawn(task)))
321365
}
322366
}
323367

@@ -353,26 +397,6 @@ impl Drop for Dropper {
353397
}
354398
}
355399

356-
pub async fn match_join_handle<T>(handle: JoinHandle<Result<T>>) -> Result<T> {
357-
match handle.await {
358-
Ok(Ok(res)) => Ok(res),
359-
Ok(Err(cause)) => Err(cause),
360-
Err(join_error) => match join_error.is_cancelled() {
361-
true => Err(ErrorCode::TokioError("Tokio error is cancelled.")),
362-
false => {
363-
let panic_error = join_error.into_panic();
364-
match panic_error.downcast_ref::<&'static str>() {
365-
None => match panic_error.downcast_ref::<String>() {
366-
None => Err(ErrorCode::PanicError("Sorry, unknown panic message")),
367-
Some(message) => Err(ErrorCode::PanicError(message.to_string())),
368-
},
369-
Some(message) => Err(ErrorCode::PanicError(message.to_string())),
370-
}
371-
}
372-
},
373-
}
374-
}
375-
376400
/// Run multiple futures parallel
377401
/// using a semaphore to limit the parallelism number, and a specified thread pool to run the futures.
378402
/// It waits for all futures to complete and returns their results.
@@ -397,9 +421,7 @@ where
397421
let join_handlers = runtime.try_spawn_batch(semaphore, futures).await?;
398422

399423
// 3. get all the result.
400-
future::try_join_all(join_handlers)
401-
.await
402-
.map_err(|e| ErrorCode::Internal(format!("try join all futures failure, {}", e)))
424+
future::try_join_all(join_handlers).await
403425
}
404426

405427
pub const GLOBAL_TASK: &str = "Zxv39PlwG1ahbF0APRUf03";

src/query/service/src/servers/flight/v1/actions/init_query_fragments.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use databend_common_base::match_join_handle;
1615
use databend_common_base::runtime::ThreadTracker;
1716
use databend_common_base::runtime::TrySpawn;
1817
use databend_common_exception::Result;
@@ -37,7 +36,7 @@ pub async fn init_query_fragments(fragments: QueryFragments) -> Result<()> {
3736
DataExchangeManager::instance().init_query_fragments_plan(&fragments)
3837
}));
3938

40-
if let Err(cause) = match_join_handle(join_handler).await {
39+
if let Err(cause) = join_handler.await.flatten() {
4140
DataExchangeManager::instance().on_finished_query(&query_id);
4241
return Err(cause);
4342
}

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_base::base::GlobalInstance;
2828
use databend_common_base::runtime::GlobalIORuntime;
2929
use databend_common_base::runtime::Thread;
3030
use databend_common_base::runtime::TrySpawn;
31+
use databend_common_base::JoinHandle;
3132
use databend_common_config::GlobalConfig;
3233
use databend_common_exception::ErrorCode;
3334
use databend_common_exception::Result;
@@ -40,7 +41,6 @@ use parking_lot::Mutex;
4041
use parking_lot::ReentrantMutex;
4142
use petgraph::prelude::EdgeRef;
4243
use petgraph::Direction;
43-
use tokio::task::JoinHandle;
4444
use tonic::Status;
4545

4646
use super::exchange_params::ExchangeParams;

src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@ use std::sync::Arc;
1717

1818
use databend_common_base::base::tokio::sync::broadcast::channel;
1919
use databend_common_base::base::tokio::sync::broadcast::Sender;
20-
use databend_common_base::base::tokio::task::JoinHandle;
21-
use databend_common_base::match_join_handle;
2220
use databend_common_base::runtime::Runtime;
2321
use databend_common_base::runtime::TrySpawn;
22+
use databend_common_base::JoinHandle;
2423
use databend_common_catalog::table_context::TableContext;
2524
use databend_common_exception::Result;
2625
use futures_util::future::select;
@@ -161,7 +160,7 @@ impl StatisticsReceiver {
161160
let mut exchanges_handler = std::mem::take(&mut self.exchange_handler);
162161
futures::executor::block_on(async move {
163162
while let Some(exchange_handler) = exchanges_handler.pop() {
164-
match_join_handle(exchange_handler).await?;
163+
exchange_handler.await.flatten()?;
165164
}
166165

167166
Ok(())

src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ use std::sync::Arc;
1616
use std::time::Duration;
1717

1818
use async_channel::Sender;
19-
use databend_common_base::base::tokio::task::JoinHandle;
2019
use databend_common_base::base::tokio::time::sleep;
2120
use databend_common_base::runtime::TrySpawn;
21+
use databend_common_base::JoinHandle;
2222
use databend_common_catalog::table_context::TableContext;
2323
use databend_common_exception::ErrorCode;
2424
use databend_common_exception::Result;

src/query/service/src/servers/http/v1/query/http_query_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ use databend_common_base::base::tokio::time::sleep;
2626
use databend_common_base::base::GlobalInstance;
2727
use databend_common_base::runtime::GlobalIORuntime;
2828
use databend_common_base::runtime::TrySpawn;
29+
use databend_common_base::JoinHandle;
2930
use databend_common_config::InnerConfig;
3031
use databend_common_exception::ErrorCode;
3132
use databend_common_exception::Result;
3233
use databend_storages_common_txn::TxnManagerRef;
3334
use parking_lot::Mutex;
34-
use tokio::task;
3535

3636
use super::expiring_map::ExpiringMap;
3737
use super::HttpQueryContext;
@@ -84,7 +84,7 @@ pub struct HttpQueryManager {
8484
pub(crate) queries: Arc<DashMap<String, Arc<HttpQuery>>>,
8585
pub(crate) removed_queries: Arc<parking_lot::Mutex<LimitedQueue<String>>>,
8686
#[allow(clippy::type_complexity)]
87-
pub(crate) txn_managers: Arc<Mutex<HashMap<String, (TxnManagerRef, task::JoinHandle<()>)>>>,
87+
pub(crate) txn_managers: Arc<Mutex<HashMap<String, (TxnManagerRef, JoinHandle<()>)>>>,
8888
pub(crate) sessions: Mutex<ExpiringMap<String, Arc<Session>>>,
8989
}
9090

src/query/service/src/sessions/query_ctx.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ use chrono::Utc;
3232
use chrono_tz::Tz;
3333
use dashmap::mapref::multiple::RefMulti;
3434
use dashmap::DashMap;
35-
use databend_common_base::base::tokio::task::JoinHandle;
3635
use databend_common_base::base::Progress;
3736
use databend_common_base::base::ProgressValues;
3837
use databend_common_base::runtime::profile::Profile;
3938
use databend_common_base::runtime::profile::ProfileStatisticsName;
4039
use databend_common_base::runtime::TrySpawn;
40+
use databend_common_base::JoinHandle;
4141
use databend_common_catalog::lock::LockTableOption;
4242
use databend_common_catalog::merge_into_join::MergeIntoJoin;
4343
use databend_common_catalog::plan::DataSourceInfo;

src/query/storages/fuse/src/operations/recluster.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -275,19 +275,13 @@ impl FuseTable {
275275
}));
276276
}
277277

278-
match futures::future::try_join_all(works).await {
279-
Err(e) => Err(ErrorCode::StorageOther(format!(
280-
"segment pruning failure, {}",
281-
e
282-
))),
283-
Ok(workers) => {
284-
let mut metas = vec![];
285-
for worker in workers {
286-
let res = worker?;
287-
metas.extend(res);
288-
}
289-
Ok(metas)
290-
}
278+
let mut metas = vec![];
279+
let workers = futures::future::try_join_all(works).await?;
280+
for worker in workers {
281+
let res = worker?;
282+
metas.extend(res);
291283
}
284+
285+
Ok(metas)
292286
}
293287
}

0 commit comments

Comments
 (0)