Skip to content

Commit 4a6ac70

Browse files
authored
refactor(query): add setting external_server_request_retry_times (#16307)
* refactor(query): add setting external_server_request_retry_times and profile the retry counts * safety index_unchecked for nullable * safety index_unchecked for nullable * update * update * update
1 parent d6d2159 commit 4a6ac70

File tree

12 files changed

+76
-57
lines changed

12 files changed

+76
-57
lines changed

โ€Žsrc/common/base/src/runtime/profile/profiles.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub enum ProfileStatisticsName {
4545
SpillReadTime,
4646
RuntimeFilterPruneParts,
4747
MemoryUsage,
48+
ExternalServerRetryCount,
4849
}
4950

5051
#[derive(Clone, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Debug)]
@@ -242,7 +243,14 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
242243
index: ProfileStatisticsName::MemoryUsage as usize,
243244
unit: StatisticsUnit::Bytes,
244245
plain_statistics: false,
245-
})
246+
}),
247+
(ProfileStatisticsName::ExternalServerRetryCount, ProfileDesc {
248+
display_name: "external server retry count",
249+
desc: "The count of external server retry times",
250+
index: ProfileStatisticsName::ExternalServerRetryCount as usize,
251+
unit: StatisticsUnit::Count,
252+
plain_statistics: true,
253+
}),
246254
]))
247255
}).clone()
248256
}

โ€Žsrc/query/expression/src/function.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,6 @@ pub struct FunctionContext {
109109
pub openai_api_embedding_model: String,
110110
pub openai_api_completion_model: String,
111111

112-
pub external_server_connect_timeout_secs: u64,
113-
pub external_server_request_timeout_secs: u64,
114-
pub external_server_request_batch_rows: u64,
115-
116112
pub geometry_output_format: GeometryDataType,
117113
pub parse_datetime_ignore_remainder: bool,
118114
pub enable_dst_hour_fix: bool,
@@ -133,9 +129,7 @@ impl Default for FunctionContext {
133129
openai_api_version: "".to_string(),
134130
openai_api_embedding_model: "".to_string(),
135131
openai_api_completion_model: "".to_string(),
136-
external_server_connect_timeout_secs: 0,
137-
external_server_request_timeout_secs: 0,
138-
external_server_request_batch_rows: 0,
132+
139133
geometry_output_format: Default::default(),
140134
parse_datetime_ignore_remainder: false,
141135
enable_dst_hour_fix: false,

โ€Žsrc/query/expression/src/types/nullable.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,8 @@ impl<T: ValueType> NullableColumn<T> {
258258
// we should better use new to create a new instance to ensure the validity and column are consistent
259259
// todo: make column and validity private
260260
pub fn new(column: T::Column, validity: Bitmap) -> Self {
261-
assert_eq!(T::column_len(&column), validity.len());
262-
assert!(!matches!(
261+
debug_assert_eq!(T::column_len(&column), validity.len());
262+
debug_assert!(!matches!(
263263
T::upcast_column(column.clone()),
264264
Column::Nullable(_)
265265
));
@@ -290,11 +290,15 @@ impl<T: ValueType> NullableColumn<T> {
290290
///
291291
/// Calling this method with an out-of-bounds index is *[undefined behavior]*
292292
pub unsafe fn index_unchecked(&self, index: usize) -> Option<T::ScalarRef<'_>> {
293-
debug_assert!(index < self.validity.len());
294-
295-
match self.validity.get_bit_unchecked(index) {
296-
true => Some(T::index_column(&self.column, index).unwrap()),
297-
false => None,
293+
// we need to check the validity firstly
294+
// cause `self.validity.get_bit_unchecked` may check the index from buffer address with `true` result
295+
if index < self.validity.len() {
296+
match self.validity.get_bit_unchecked(index) {
297+
true => Some(T::index_column_unchecked(&self.column, index)),
298+
false => None,
299+
}
300+
} else {
301+
None
298302
}
299303
}
300304

โ€Žsrc/query/functions/src/scalars/variant.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -904,10 +904,10 @@ pub fn register(registry: &mut FunctionRegistry) {
904904
};
905905
let new_col = cast_scalars_to_variants(col.iter(), ctx.func_ctx.tz);
906906
if let Some(validity) = validity {
907-
Value::Column(Column::Nullable(Box::new(NullableColumn {
907+
Value::Column(NullableColumn::new_column(
908+
Column::Variant(new_col),
908909
validity,
909-
column: Column::Variant(new_col),
910-
})))
910+
))
911911
} else {
912912
Value::Column(Column::Variant(new_col))
913913
}
@@ -945,10 +945,7 @@ pub fn register(registry: &mut FunctionRegistry) {
945945
_ => Bitmap::new_constant(true, col.len()),
946946
};
947947
let new_col = cast_scalars_to_variants(col.iter(), ctx.func_ctx.tz);
948-
Value::Column(NullableColumn {
949-
validity,
950-
column: new_col,
951-
})
948+
Value::Column(NullableColumn::new(new_col, validity))
952949
}
953950
},
954951
);

โ€Žsrc/query/functions/tests/it/scalars/tuple.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ fn test_get(file: &mut impl Write) {
6161
)]);
6262
run_ast(file, "col.1", &[(
6363
"col",
64-
Column::Nullable(Box::new(NullableColumn {
65-
column: Column::Tuple(vec![StringType::from_data_with_validity(
64+
NullableColumn::new_column(
65+
Column::Tuple(vec![StringType::from_data_with_validity(
6666
vec!["a", "b", "c", "d"],
6767
vec![true, true, false, false],
6868
)]),
69-
validity: vec![true, false, true, false].into(),
70-
})),
69+
vec![true, false, true, false].into(),
70+
),
7171
)]);
7272
}

โ€Žsrc/query/pipeline/transforms/src/processors/transforms/transform_retry_async.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use super::AsyncTransform;
2020

2121
pub trait AsyncRetry: AsyncTransform {
2222
fn retry_on(&self, err: &databend_common_exception::ErrorCode) -> bool;
23+
// record some log when retrying
24+
fn retry_hook(&self);
2325
fn retry_strategy(&self) -> RetryStrategy;
2426
}
2527

@@ -67,6 +69,7 @@ impl<T: AsyncRetry + 'static> AsyncTransform for AsyncRetryWrapper<T> {
6769
tokio::time::sleep(duration).await;
6870
}
6971
self.retries += 1;
72+
self.t.retry_hook();
7073
}
7174
}
7275
}

โ€Žsrc/query/service/src/pipelines/builders/builder_udf.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,7 @@ impl PipelineBuilder {
4242
})
4343
} else {
4444
self.main_pipeline.try_add_async_transformer(|| {
45-
Ok(TransformUdfServer::new_retry_wrapper(
46-
self.ctx.clone(),
47-
self.func_ctx.clone(),
48-
udf.udf_funcs.clone(),
49-
))
45+
TransformUdfServer::new_retry_wrapper(self.ctx.clone(), udf.udf_funcs.clone())
5046
})
5147
}
5248
}

โ€Žsrc/query/service/src/pipelines/processors/transforms/transform_udf_server.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_base::runtime::profile::Profile;
18+
use databend_common_base::runtime::profile::ProfileStatisticsName;
1719
use databend_common_catalog::table_context::TableContext;
1820
use databend_common_exception::ErrorCode;
1921
use databend_common_exception::Result;
@@ -24,7 +26,6 @@ use databend_common_expression::BlockEntry;
2426
use databend_common_expression::DataBlock;
2527
use databend_common_expression::DataField;
2628
use databend_common_expression::DataSchema;
27-
use databend_common_expression::FunctionContext;
2829
use databend_common_pipeline_transforms::processors::AsyncRetry;
2930
use databend_common_pipeline_transforms::processors::AsyncRetryWrapper;
3031
use databend_common_pipeline_transforms::processors::AsyncTransform;
@@ -35,22 +36,34 @@ use crate::sessions::QueryContext;
3536

3637
pub struct TransformUdfServer {
3738
ctx: Arc<QueryContext>,
38-
func_ctx: FunctionContext,
3939
funcs: Vec<UdfFunctionDesc>,
40+
connect_timeout: u64,
41+
request_timeout: u64,
42+
request_bacth_rows: u64,
43+
retry_times: u64,
4044
}
4145

4246
impl TransformUdfServer {
4347
pub fn new_retry_wrapper(
4448
ctx: Arc<QueryContext>,
45-
func_ctx: FunctionContext,
4649
funcs: Vec<UdfFunctionDesc>,
47-
) -> AsyncRetryWrapper<Self> {
50+
) -> Result<AsyncRetryWrapper<Self>> {
51+
let settings = ctx.get_settings();
52+
let connect_timeout = settings.get_external_server_connect_timeout_secs()?;
53+
let request_timeout = settings.get_external_server_request_timeout_secs()?;
54+
let request_bacth_rows = settings.get_external_server_request_batch_rows()?;
55+
let retry_times = settings.get_external_server_request_retry_times()?;
56+
4857
let s = Self {
4958
ctx,
50-
func_ctx,
5159
funcs,
60+
61+
connect_timeout,
62+
request_timeout,
63+
request_bacth_rows,
64+
retry_times,
5265
};
53-
AsyncRetryWrapper::create(s)
66+
Ok(AsyncRetryWrapper::create(s))
5467
}
5568
}
5669

@@ -61,10 +74,14 @@ impl AsyncRetry for TransformUdfServer {
6174

6275
fn retry_strategy(&self) -> RetryStrategy {
6376
RetryStrategy {
64-
retry_times: 64,
77+
retry_times: self.retry_times as usize,
6578
retry_sleep_duration: Some(tokio::time::Duration::from_millis(500)),
6679
}
6780
}
81+
82+
fn retry_hook(&self) {
83+
Profile::record_usize_profile(ProfileStatisticsName::ExternalServerRetryCount, 1);
84+
}
6885
}
6986

7087
#[async_trait::async_trait]
@@ -73,9 +90,6 @@ impl AsyncTransform for TransformUdfServer {
7390

7491
#[async_backtrace::framed]
7592
async fn transform(&mut self, mut data_block: DataBlock) -> Result<DataBlock> {
76-
let connect_timeout = self.func_ctx.external_server_connect_timeout_secs;
77-
let request_timeout = self.func_ctx.external_server_request_timeout_secs;
78-
let request_bacth_rows = self.func_ctx.external_server_request_batch_rows;
7993
for func in &self.funcs {
8094
let server_addr = func.udf_type.as_server().unwrap();
8195
// construct input record_batch
@@ -110,9 +124,9 @@ impl AsyncTransform for TransformUdfServer {
110124

111125
let mut client = UDFFlightClient::connect(
112126
server_addr,
113-
connect_timeout,
114-
request_timeout,
115-
request_bacth_rows,
127+
self.connect_timeout,
128+
self.request_timeout,
129+
self.request_bacth_rows,
116130
)
117131
.await?
118132
.with_tenant(self.ctx.get_tenant().tenant_name())?

โ€Žsrc/query/service/src/pipelines/processors/transforms/window/transform_window.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,10 +1003,12 @@ where T: Number + ResultTypeOfUnary
10031003
fn add_block(&mut self, data: Option<DataBlock>) -> Result<()> {
10041004
if let Some(data) = data {
10051005
let num_rows = data.num_rows();
1006-
self.blocks.push_back(WindowBlock {
1007-
block: data.convert_to_full(),
1008-
builder: ColumnBuilder::with_capacity(&self.func.return_type()?, num_rows),
1009-
});
1006+
if num_rows != 0 {
1007+
self.blocks.push_back(WindowBlock {
1008+
block: data.convert_to_full(),
1009+
builder: ColumnBuilder::with_capacity(&self.func.return_type()?, num_rows),
1010+
});
1011+
}
10101012
}
10111013

10121014
// Each loop will do:

โ€Žsrc/query/service/src/sessions/query_ctx.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -695,12 +695,6 @@ impl TableContext for QueryContext {
695695

696696
fn get_function_context(&self) -> Result<FunctionContext> {
697697
let settings = self.get_settings();
698-
let external_server_connect_timeout_secs =
699-
settings.get_external_server_connect_timeout_secs()?;
700-
let external_server_request_timeout_secs =
701-
settings.get_external_server_request_timeout_secs()?;
702-
let external_server_request_batch_rows =
703-
settings.get_external_server_request_batch_rows()?;
704698

705699
let tz = settings.get_timezone()?;
706700
let tz = TzFactory::instance().get_by_name(&tz)?;
@@ -728,9 +722,6 @@ impl TableContext for QueryContext {
728722
openai_api_embedding_model: query_config.openai_api_embedding_model.clone(),
729723
openai_api_completion_model: query_config.openai_api_completion_model.clone(),
730724

731-
external_server_connect_timeout_secs,
732-
external_server_request_timeout_secs,
733-
external_server_request_batch_rows,
734725
geometry_output_format,
735726
parse_datetime_ignore_remainder,
736727
enable_dst_hour_fix,

0 commit comments

Comments
ย (0)