diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 030dfde08f687..57f055cc615dd 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -342,8 +342,11 @@ impl PipelineBuilder { } = merge_into_source; self.build_pipeline(input)?; - self.main_pipeline - .try_resize(self.ctx.get_settings().get_max_threads()? as usize)?; + self.main_pipeline.try_resize( + self.ctx + .get_settings() + .get_merge_into_resize_parallel_threads()? as usize, + )?; // 1. if matchedOnly, we will use inner join // 2. if insert Only, we will use right anti join // 3. other cases, we use right outer join diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index e2166e19584e1..e7a913ee3e066 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -498,6 +498,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("merge_into_resize_parallel_threads", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "tune the source parallel when too many small blocks", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("enable_distributed_merge_into", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Enables distributed execution for 'MERGE INTO'.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 9a1937342c5f8..6dba871767aa8 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -443,6 +443,13 @@ impl Settings { Ok(self.try_get_u64("enable_distributed_copy_into")? != 0) } + pub fn get_merge_into_resize_parallel_threads(&self) -> Result { + match self.try_get_u64("merge_into_resize_parallel_threads")? { + 0 => Ok(self.get_max_threads()?), + value => Ok(std::cmp::min(self.get_max_threads()?, value)), + } + } + pub fn get_enable_experimental_merge_into(&self) -> Result { Ok(self.try_get_u64("enable_experimental_merge_into")? != 0) }