Skip to content

Commit 8a75f9d

Browse files
authored
feat(query): add external_server_request_batch_rows setting (#15440)
1 parent 3b38f5d commit 8a75f9d

File tree

7 files changed

+37
-5
lines changed

7 files changed

+37
-5
lines changed

src/query/expression/src/function.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ pub struct FunctionContext {
107107

108108
pub external_server_connect_timeout_secs: u64,
109109
pub external_server_request_timeout_secs: u64,
110+
pub external_server_request_batch_rows: u64,
110111

111112
pub geometry_output_format: GeometryDataType,
112113
pub parse_datetime_ignore_remainder: bool,

src/query/expression/src/utils/udf_client.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const MAX_DECODING_MESSAGE_SIZE: usize = 16 * 1024 * 1024 * 1024;
4040
#[derive(Debug, Clone)]
4141
pub struct UDFFlightClient {
4242
inner: FlightServiceClient<Channel>,
43+
batch_rows: u64,
4344
}
4445

4546
impl UDFFlightClient {
@@ -48,6 +49,7 @@ impl UDFFlightClient {
4849
addr: &str,
4950
conn_timeout: u64,
5051
request_timeout: u64,
52+
batch_rows: u64,
5153
) -> Result<UDFFlightClient> {
5254
let endpoint = Endpoint::from_shared(addr.to_string())
5355
.map_err(|err| {
@@ -69,7 +71,7 @@ impl UDFFlightClient {
6971
})?
7072
.max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE);
7173

72-
Ok(UDFFlightClient { inner })
74+
Ok(UDFFlightClient { inner, batch_rows })
7375
}
7476

7577
fn make_request<T>(&self, t: T) -> Request<T> {
@@ -135,9 +137,16 @@ impl UDFFlightClient {
135137
input_batch: RecordBatch,
136138
) -> Result<RecordBatch> {
137139
let descriptor = FlightDescriptor::new_path(vec![func_name.to_string()]);
140+
let batch_rows = self.batch_rows as usize;
141+
let batches = (0..input_batch.num_rows())
142+
.step_by(batch_rows)
143+
.map(move |start| {
144+
Ok(input_batch.slice(start, batch_rows.min(input_batch.num_rows() - start)))
145+
});
146+
138147
let flight_data_stream = FlightDataEncoderBuilder::new()
139148
.with_flight_descriptor(Some(descriptor))
140-
.build(stream::iter(vec![Ok(input_batch)]))
149+
.build(stream::iter(batches))
141150
.map(|data| data.unwrap());
142151
let request = self.make_request(flight_data_stream);
143152
let flight_data_stream = self.inner.do_exchange(request).await?.into_inner();

src/query/service/src/pipelines/processors/transforms/transform_udf_server.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ impl AsyncTransform for TransformUdfServer {
5959
async fn transform(&mut self, mut data_block: DataBlock) -> Result<DataBlock> {
6060
let connect_timeout = self.func_ctx.external_server_connect_timeout_secs;
6161
let request_timeout = self.func_ctx.external_server_request_timeout_secs;
62+
let request_bacth_rows = self.func_ctx.external_server_request_batch_rows;
6263
for func in &self.funcs {
6364
let server_addr = func.udf_type.as_server().unwrap();
6465
// construct input record_batch
@@ -91,10 +92,15 @@ impl AsyncTransform for TransformUdfServer {
9192
.to_record_batch_with_dataschema(&data_schema)
9293
.map_err(|err| ErrorCode::from_string(format!("{err}")))?;
9394

94-
let mut client =
95-
UDFFlightClient::connect(server_addr, connect_timeout, request_timeout).await?;
96-
let result_batch = client.do_exchange(&func.func_name, input_batch).await?;
95+
let mut client = UDFFlightClient::connect(
96+
server_addr,
97+
connect_timeout,
98+
request_timeout,
99+
request_bacth_rows,
100+
)
101+
.await?;
97102

103+
let result_batch = client.do_exchange(&func.func_name, input_batch).await?;
98104
let schema = DataSchema::try_from(&(*result_batch.schema()))?;
99105
let (result_block, result_schema) =
100106
DataBlock::from_record_batch(&schema, &result_batch).map_err(|err| {

src/query/service/src/sessions/query_ctx.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,8 @@ impl TableContext for QueryContext {
618618
settings.get_external_server_connect_timeout_secs()?;
619619
let external_server_request_timeout_secs =
620620
settings.get_external_server_request_timeout_secs()?;
621+
let external_server_request_batch_rows =
622+
settings.get_external_server_request_batch_rows()?;
621623

622624
let tz = settings.get_timezone()?;
623625
let tz = TzFactory::instance().get_by_name(&tz)?;
@@ -642,6 +644,7 @@ impl TableContext for QueryContext {
642644

643645
external_server_connect_timeout_secs,
644646
external_server_request_timeout_secs,
647+
external_server_request_batch_rows,
645648
geometry_output_format,
646649
parse_datetime_ignore_remainder,
647650
})

src/query/settings/src/settings_default.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,12 @@ impl DefaultSettings {
630630
mode: SettingMode::Both,
631631
range: Some(SettingRange::Numeric(0..=u64::MAX)),
632632
}),
633+
("external_server_request_batch_rows", DefaultSettingValue {
634+
value: UserSettingValue::UInt64(65536),
635+
desc: "Request batch rows to external server",
636+
mode: SettingMode::Both,
637+
range: Some(SettingRange::Numeric(1..=u64::MAX)),
638+
}),
633639
("enable_parquet_prewhere", DefaultSettingValue {
634640
value: UserSettingValue::UInt64(0),
635641
desc: "Enables parquet prewhere",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,10 @@ impl Settings {
557557
self.try_get_u64("external_server_request_timeout_secs")
558558
}
559559

560+
pub fn get_external_server_request_batch_rows(&self) -> Result<u64> {
561+
self.try_get_u64("external_server_request_batch_rows")
562+
}
563+
560564
pub fn get_create_query_flight_client_with_current_rt(&self) -> Result<bool> {
561565
Ok(self.try_get_u64("create_query_flight_client_with_current_rt")? != 0)
562566
}

src/query/sql/src/planner/binder/udf.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ impl Binder {
110110
self.ctx
111111
.get_settings()
112112
.get_external_server_request_timeout_secs()?,
113+
self.ctx
114+
.get_settings()
115+
.get_external_server_request_batch_rows()?,
113116
)
114117
.await?;
115118
client

0 commit comments

Comments
 (0)