Skip to content

Commit 32faa14

Browse files
committed
fix(cluster): add statistics receiver runtime
1 parent 6b81a95 commit 32faa14

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

src/query/service/src/api/rpc/exchange/exchange_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ impl DataExchangeManager {
284284
let mut build_res = query_coordinator.subscribe_fragment(&ctx, fragment_id)?;
285285

286286
let exchanges = std::mem::take(&mut query_coordinator.statistics_exchanges);
287-
let mut statistics_receiver = StatisticsReceiver::create(ctx.clone(), exchanges);
287+
let mut statistics_receiver = StatisticsReceiver::create(ctx.clone(), exchanges)?;
288288
statistics_receiver.start();
289289

290290
let statistics_receiver: Mutex<StatisticsReceiver> =

src/query/service/src/api/rpc/exchange/statistics_receiver.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use common_base::base::select3;
2222
use common_base::base::tokio;
2323
use common_base::base::tokio::sync::Notify;
2424
use common_base::base::tokio::task::JoinHandle;
25+
use common_base::base::Runtime;
2526
use common_base::base::Select3Output;
27+
use common_base::base::TrySpawn;
2628
use common_exception::ErrorCode;
2729
use common_exception::Result;
2830

@@ -36,17 +38,25 @@ pub struct StatisticsReceiver {
3638
shutdown_flag: Arc<AtomicBool>,
3739
shutdown_notify: Arc<Notify>,
3840
exchange_handler: Vec<JoinHandle<Result<()>>>,
41+
runtime: Arc<Runtime>,
3942
}
4043

4144
impl StatisticsReceiver {
42-
pub fn create(ctx: Arc<QueryContext>, exchanges: Vec<FlightExchange>) -> StatisticsReceiver {
43-
StatisticsReceiver {
45+
pub fn create(
46+
ctx: Arc<QueryContext>,
47+
exchanges: Vec<FlightExchange>,
48+
) -> Result<StatisticsReceiver> {
49+
Ok(StatisticsReceiver {
4450
ctx,
4551
exchanges,
4652
shutdown_flag: Arc::new(AtomicBool::new(false)),
4753
shutdown_notify: Arc::new(Notify::new()),
4854
exchange_handler: vec![],
49-
}
55+
runtime: Arc::new(Runtime::with_worker_threads(
56+
2,
57+
Some(String::from("StatisticsReceiver")),
58+
)?),
59+
})
5060
}
5161

5262
pub fn start(&mut self) {
@@ -55,7 +65,7 @@ impl StatisticsReceiver {
5565
let shutdown_flag = self.shutdown_flag.clone();
5666
let shutdown_notify = self.shutdown_notify.clone();
5767

58-
self.exchange_handler.push(tokio::spawn(async move {
68+
self.exchange_handler.push(self.runtime.spawn(async move {
5969
let mut recv = Box::pin(flight_exchange.recv());
6070
let mut notified = Box::pin(shutdown_notify.notified());
6171

0 commit comments

Comments
 (0)