Skip to content

Commit 1b0d982

Browse files
authored
Merge pull request #7679 from zhang2014/fix/runtime_no_schedule
fix(cluster): add statistics receiver runtime
2 parents b3c2cb5 + 310a879 commit 1b0d982

File tree

3 files changed

+53
-5
lines changed

3 files changed

+53
-5
lines changed

src/query/service/src/api/rpc/exchange/exchange_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ impl DataExchangeManager {
284284
let mut build_res = query_coordinator.subscribe_fragment(&ctx, fragment_id)?;
285285

286286
let exchanges = std::mem::take(&mut query_coordinator.statistics_exchanges);
287-
let mut statistics_receiver = StatisticsReceiver::create(ctx.clone(), exchanges);
287+
let mut statistics_receiver = StatisticsReceiver::create(ctx.clone(), exchanges)?;
288288
statistics_receiver.start();
289289

290290
let statistics_receiver: Mutex<StatisticsReceiver> =

src/query/service/src/api/rpc/exchange/statistics_receiver.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use common_base::base::select3;
2222
use common_base::base::tokio;
2323
use common_base::base::tokio::sync::Notify;
2424
use common_base::base::tokio::task::JoinHandle;
25+
use common_base::base::Runtime;
2526
use common_base::base::Select3Output;
27+
use common_base::base::TrySpawn;
2628
use common_exception::ErrorCode;
2729
use common_exception::Result;
2830

@@ -36,17 +38,25 @@ pub struct StatisticsReceiver {
3638
shutdown_flag: Arc<AtomicBool>,
3739
shutdown_notify: Arc<Notify>,
3840
exchange_handler: Vec<JoinHandle<Result<()>>>,
41+
runtime: Arc<Runtime>,
3942
}
4043

4144
impl StatisticsReceiver {
42-
pub fn create(ctx: Arc<QueryContext>, exchanges: Vec<FlightExchange>) -> StatisticsReceiver {
43-
StatisticsReceiver {
45+
pub fn create(
46+
ctx: Arc<QueryContext>,
47+
exchanges: Vec<FlightExchange>,
48+
) -> Result<StatisticsReceiver> {
49+
Ok(StatisticsReceiver {
4450
ctx,
4551
exchanges,
4652
shutdown_flag: Arc::new(AtomicBool::new(false)),
4753
shutdown_notify: Arc::new(Notify::new()),
4854
exchange_handler: vec![],
49-
}
55+
runtime: Arc::new(Runtime::with_worker_threads(
56+
2,
57+
Some(String::from("StatisticsReceiver")),
58+
)?),
59+
})
5060
}
5161

5262
pub fn start(&mut self) {
@@ -55,7 +65,7 @@ impl StatisticsReceiver {
5565
let shutdown_flag = self.shutdown_flag.clone();
5666
let shutdown_notify = self.shutdown_notify.clone();
5767

58-
self.exchange_handler.push(tokio::spawn(async move {
68+
self.exchange_handler.push(self.runtime.spawn(async move {
5969
let mut recv = Box::pin(flight_exchange.recv());
6070
let mut notified = Box::pin(shutdown_notify.notified());
6171

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
statement ok
2+
DROP DATABASE IF EXISTS db1;
3+
4+
statement ok
5+
CREATE DATABASE db1;
6+
7+
statement ok
8+
USE db1;
9+
10+
statement ok
11+
CREATE TABLE test_table(id INTEGER, name VARCHAR, age INT);
12+
13+
statement ok
14+
insert into test_table (id,name,age) values(1,'2',3), (4, '5', 6);
15+
16+
statement ok
17+
CREATE STAGE IF NOT EXISTS test;
18+
19+
statement ok
20+
copy into @test from test_table FILE_FORMAT = (type = 'CSV');
21+
22+
statement ok
23+
copy into test_table from @test;
24+
25+
statement query A
26+
SELECT COUNT() FROM test_table;
27+
28+
----
29+
4
30+
31+
statement ok
32+
drop table test_table all;
33+
34+
statement ok
35+
drop stage test;
36+
37+
statement ok
38+
DROP DATABASE db1;

0 commit comments

Comments
 (0)