Skip to content

Commit d5e217e

Browse files
authored
chore: new setting create_rpc_client_with_current_rt (#13898)
chore: new setting "create_rpc_client_with_current_rt"
1 parent 8dde650 commit d5e217e

File tree

6 files changed

+49
-16
lines changed

6 files changed

+49
-16
lines changed

src/query/service/src/api/rpc/exchange/exchange_manager.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ use async_channel::Receiver;
2222
use common_arrow::arrow_format::flight::data::FlightData;
2323
use common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient;
2424
use common_base::base::GlobalInstance;
25+
use common_base::runtime::GlobalIORuntime;
2526
use common_base::runtime::Thread;
27+
use common_base::runtime::TrySpawn;
28+
use common_base::GLOBAL_TASK;
2629
use common_config::GlobalConfig;
2730
use common_exception::ErrorCode;
2831
use common_exception::Result;
@@ -122,10 +125,13 @@ impl DataExchangeManager {
122125

123126
let target = &packet.executor.id;
124127

128+
let create_rpc_client_with_current_rt = packet.create_rpc_clint_with_current_rt;
129+
125130
for connection_info in &packet.fragment_connections_info {
126131
for fragment in &connection_info.fragments {
127132
let address = &connection_info.source.flight_address;
128-
let mut flight_client = Self::create_client(address).await?;
133+
let mut flight_client =
134+
Self::create_client(address, create_rpc_client_with_current_rt).await?;
129135

130136
targets_exchanges.insert(
131137
(connection_info.source.id.clone(), *fragment),
@@ -138,7 +144,8 @@ impl DataExchangeManager {
138144

139145
for connection_info in &packet.statistics_connections_info {
140146
let address = &connection_info.source.flight_address;
141-
let mut flight_client = Self::create_client(address).await?;
147+
let mut flight_client =
148+
Self::create_client(address, create_rpc_client_with_current_rt).await?;
142149
request_exchanges.insert(
143150
connection_info.source.id.clone(),
144151
flight_client
@@ -165,22 +172,31 @@ impl DataExchangeManager {
165172
}
166173

167174
#[async_backtrace::framed]
168-
pub async fn create_client(address: &str) -> Result<FlightClient> {
175+
pub async fn create_client(address: &str, use_current_rt: bool) -> Result<FlightClient> {
169176
let config = GlobalConfig::instance();
170177
let address = address.to_string();
171-
172-
match config.tls_query_cli_enabled() {
173-
true => Ok(FlightClient::new(FlightServiceClient::new(
174-
ConnectionFactory::create_rpc_channel(
175-
address.to_owned(),
176-
None,
177-
Some(config.query.to_rpc_client_tls_config()),
178-
)
179-
.await?,
180-
))),
181-
false => Ok(FlightClient::new(FlightServiceClient::new(
182-
ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
183-
))),
178+
let task = async move {
179+
match config.tls_query_cli_enabled() {
180+
true => Ok(FlightClient::new(FlightServiceClient::new(
181+
ConnectionFactory::create_rpc_channel(
182+
address.to_owned(),
183+
None,
184+
Some(config.query.to_rpc_client_tls_config()),
185+
)
186+
.await?,
187+
))),
188+
false => Ok(FlightClient::new(FlightServiceClient::new(
189+
ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
190+
))),
191+
}
192+
};
193+
if use_current_rt {
194+
task.await
195+
} else {
196+
GlobalIORuntime::instance()
197+
.spawn(GLOBAL_TASK, task)
198+
.await
199+
.expect("create client future must be joined successfully")
184200
}
185201
}
186202

src/query/service/src/api/rpc/packets/packet_publisher.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub struct InitNodesChannelPacket {
4141
pub executor: Arc<NodeInfo>,
4242
pub fragment_connections_info: Vec<ConnectionInfo>,
4343
pub statistics_connections_info: Vec<ConnectionInfo>,
44+
pub create_rpc_clint_with_current_rt: bool,
4445
}
4546

4647
impl InitNodesChannelPacket {
@@ -49,12 +50,14 @@ impl InitNodesChannelPacket {
4950
executor: Arc<NodeInfo>,
5051
fragment_connections_info: Vec<ConnectionInfo>,
5152
statistics_connections_info: Vec<ConnectionInfo>,
53+
create_rpc_clint_with_current_rt: bool,
5254
) -> InitNodesChannelPacket {
5355
InitNodesChannelPacket {
5456
query_id,
5557
executor,
5658
fragment_connections_info,
5759
statistics_connections_info,
60+
create_rpc_clint_with_current_rt,
5861
}
5962
}
6063
}

src/query/service/src/schedulers/fragments/query_fragment_actions.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,9 @@ impl QueryFragmentsActions {
250250
true => statistics_connections.clone(),
251251
false => vec![],
252252
},
253+
self.ctx
254+
.get_settings()
255+
.get_create_query_flight_client_with_current_rt()?,
253256
));
254257
}
255258

src/query/service/tests/it/storages/testdata/settings_table.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
66
+------------------------------------------------+----------------+----------------+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
77
| 'acquire_lock_timeout' | '15' | '15' | 'SESSION' | 'Sets the maximum timeout in seconds for acquire a lock.' | 'UInt64' |
88
| 'collation' | 'binary' | 'binary' | 'SESSION' | 'Sets the character collation. Available values include "binary" and "utf8".' | 'String' |
9+
| 'create_query_flight_client_with_current_rt' | '1' | '1' | 'SESSION' | 'create query flight client with current runtime' | 'UInt64' |
910
| 'ddl_column_type_nullable' | '1' | '1' | 'SESSION' | 'If columns are default nullable when create or alter table' | 'UInt64' |
1011
| 'disable_join_reorder' | '0' | '0' | 'SESSION' | 'Disable join reorder optimization.' | 'UInt64' |
1112
| 'efficiently_memory_group_by' | '0' | '0' | 'SESSION' | 'Memory is used efficiently, but this may cause performance degradation.' | 'UInt64' |

src/query/settings/src/settings_default.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,12 @@ impl DefaultSettings {
525525
possible_values: None,
526526
mode: SettingMode::Both,
527527
}),
528+
("create_query_flight_client_with_current_rt", DefaultSettingValue {
529+
value: UserSettingValue::UInt64(1),
530+
desc: "create query flight client with current runtime",
531+
possible_values: None,
532+
mode: SettingMode::Both,
533+
}),
528534
]);
529535

530536
Ok(Arc::new(DefaultSettings {

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,4 +491,8 @@ impl Settings {
491491
pub fn get_external_server_request_timeout_secs(&self) -> Result<u64> {
492492
self.try_get_u64("external_server_request_timeout_secs")
493493
}
494+
495+
pub fn get_create_query_flight_client_with_current_rt(&self) -> Result<bool> {
496+
Ok(self.try_get_u64("create_query_flight_client_with_current_rt")? != 0)
497+
}
494498
}

0 commit comments

Comments
 (0)