Skip to content

Commit fea4409

Browse files
authored
fix(cluster): fix the possibility of connection leak when the cluster state is broken. (#16842)
* fix(cluster): fix connect leak if server is broken status * fix(cluster): fix connect leak if server is broken status
1 parent 7509439 commit fea4409

File tree

6 files changed

+37
-21
lines changed

6 files changed

+37
-21
lines changed

src/query/service/src/schedulers/scheduler.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,20 @@ pub async fn build_distributed_pipeline(
114114

115115
let exchange_manager = ctx.get_exchange_manager();
116116

117-
let mut build_res = exchange_manager
117+
match exchange_manager
118118
.commit_actions(ctx.clone(), fragments_actions)
119-
.await?;
120-
121-
let settings = ctx.get_settings();
122-
build_res.set_max_threads(settings.get_max_threads()? as usize);
123-
Ok(build_res)
119+
.await
120+
{
121+
Ok(mut build_res) => {
122+
let settings = ctx.get_settings();
123+
build_res.set_max_threads(settings.get_max_threads()? as usize);
124+
Ok(build_res)
125+
}
126+
Err(error) => {
127+
exchange_manager.on_finished_query(&ctx.get_id(), Some(error.clone()));
128+
Err(error)
129+
}
130+
}
124131
}
125132

126133
pub struct ServiceQueryExecutor {

src/query/service/src/servers/flight/v1/actions/init_query_env.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub async fn init_query_env(env: QueryEnv) -> Result<()> {
3838
.init_query_env(&env, ctx)
3939
.await
4040
{
41-
DataExchangeManager::instance().on_finished_query(&env.query_id);
41+
DataExchangeManager::instance().on_finished_query(&env.query_id, Some(e.clone()));
4242
return Err(e);
4343
}
4444

src/query/service/src/servers/flight/v1/actions/init_query_fragments.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub async fn init_query_fragments(fragments: QueryFragments) -> Result<()> {
3737
}));
3838

3939
if let Err(cause) = join_handler.await.flatten() {
40-
DataExchangeManager::instance().on_finished_query(&query_id);
40+
DataExchangeManager::instance().on_finished_query(&query_id, Some(cause.clone()));
4141
return Err(cause);
4242
}
4343

src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub async fn start_prepared_query(id: String) -> Result<()> {
2727

2828
debug!("start prepared query {}", id);
2929
if let Err(cause) = DataExchangeManager::instance().execute_partial_query(&id) {
30-
DataExchangeManager::instance().on_finished_query(&id);
30+
DataExchangeManager::instance().on_finished_query(&id, Some(cause.clone()));
3131
return Err(cause);
3232
}
3333
Ok(())

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,13 @@ impl DataExchangeManager {
242242
"Query {} cannot start command while in 180 seconds",
243243
query_id
244244
);
245-
self.on_finished_query(&query_id);
245+
self.on_finished_query(
246+
&query_id,
247+
Some(ErrorCode::Internal(format!(
248+
"Query {} cannot start command while in 180 seconds",
249+
query_id
250+
))),
251+
);
246252
}
247253
}
248254

@@ -379,25 +385,25 @@ impl DataExchangeManager {
379385
}
380386
}
381387

382-
pub fn shutdown_query(&self, query_id: &str) {
388+
pub fn shutdown_query(&self, query_id: &str, cause: Option<ErrorCode>) {
383389
let queries_coordinator_guard = self.queries_coordinator.lock();
384390
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };
385391

386392
if let Some(query_coordinator) = queries_coordinator.get_mut(query_id) {
387-
query_coordinator.shutdown_query();
393+
query_coordinator.shutdown_query(cause);
388394
}
389395
}
390396

391397
#[fastrace::trace]
392-
pub fn on_finished_query(&self, query_id: &str) {
398+
pub fn on_finished_query(&self, query_id: &str, cause: Option<ErrorCode>) {
393399
let queries_coordinator_guard = self.queries_coordinator.lock();
394400
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };
395401

396402
if let Some(mut query_coordinator) = queries_coordinator.remove(query_id) {
397403
// Drop mutex guard to avoid deadlock during shutdown,
398404
drop(queries_coordinator_guard);
399405

400-
query_coordinator.shutdown_query();
406+
query_coordinator.shutdown_query(cause);
401407
query_coordinator.on_finished();
402408
}
403409
}
@@ -476,7 +482,8 @@ impl DataExchangeManager {
476482
let mut statistics_receiver = statistics_receiver.lock();
477483

478484
statistics_receiver.shutdown(info.res.is_err());
479-
ctx.get_exchange_manager().on_finished_query(&query_id);
485+
ctx.get_exchange_manager()
486+
.on_finished_query(&query_id, info.res.clone().err());
480487
statistics_receiver.wait_shutdown()
481488
},
482489
));
@@ -768,10 +775,10 @@ impl QueryCoordinator {
768775
Err(ErrorCode::Unimplemented("ExchangeSource is unimplemented"))
769776
}
770777

771-
pub fn shutdown_query(&mut self) {
778+
pub fn shutdown_query(&mut self, cause: Option<ErrorCode>) {
772779
if let Some(query_info) = &mut self.info {
773780
if let Some(query_executor) = &query_info.query_executor {
774-
query_executor.finish(None);
781+
query_executor.finish(cause);
775782
}
776783

777784
if let Some(worker) = query_info.remove_leak_query_worker.take() {
@@ -871,10 +878,11 @@ impl QueryCoordinator {
871878

872879
Thread::named_spawn(Some(String::from("Distributed-Executor")), move || {
873880
let _g = span.set_local_parent();
874-
statistics_sender.shutdown(executor.execute().err());
881+
let error = executor.execute().err();
882+
statistics_sender.shutdown(error.clone());
875883
query_ctx
876884
.get_exchange_manager()
877-
.on_finished_query(&query_id);
885+
.on_finished_query(&query_id, error);
878886
});
879887

880888
Ok(())

src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,9 @@ impl StatisticsSender {
8181
notified = right;
8282
sleep_future = Box::pin(sleep(Duration::from_millis(100)));
8383

84-
if let Err(_cause) = Self::send_progress(&ctx, &tx).await {
85-
ctx.get_exchange_manager().shutdown_query(&query_id);
84+
if let Err(cause) = Self::send_progress(&ctx, &tx).await {
85+
ctx.get_exchange_manager()
86+
.shutdown_query(&query_id, Some(cause));
8687
return;
8788
}
8889

0 commit comments

Comments
 (0)