Skip to content

Commit 5761706

Browse files
committed
fix
1 parent 661e996 commit 5761706

File tree

8 files changed

+83
-180
lines changed

8 files changed

+83
-180
lines changed

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,20 @@ use databend_common_sql::executor::physical_plans::Recluster;
3939
use databend_common_sql::StreamContext;
4040
use databend_common_storages_factory::Table;
4141
use databend_common_storages_fuse::io::StreamBlockProperties;
42-
use databend_common_storages_fuse::operations::TransformBlockBuilder;
4342
use databend_common_storages_fuse::operations::TransformBlockWriter;
4443
use databend_common_storages_fuse::operations::TransformSerializeBlock;
4544
use databend_common_storages_fuse::FuseTable;
4645
use databend_common_storages_fuse::TableContext;
4746

4847
use crate::pipelines::builders::SortPipelineBuilder;
4948
use crate::pipelines::processors::transforms::ReclusterPartitionExchange;
50-
use crate::pipelines::processors::transforms::ReclusterPartitionStrategys;
49+
use crate::pipelines::processors::transforms::ReclusterPartitionStrategy;
5150
use crate::pipelines::processors::transforms::SampleState;
5251
use crate::pipelines::processors::transforms::TransformAddOrderColumn;
5352
use crate::pipelines::processors::transforms::TransformAddStreamColumns;
5453
use crate::pipelines::processors::transforms::TransformPartitionCollect;
5554
use crate::pipelines::processors::transforms::TransformRangePartitionIndexer;
5655
use crate::pipelines::processors::transforms::TransformReclusterCollect;
57-
use crate::pipelines::processors::transforms::TransformReclusterPartition;
5856
use crate::pipelines::PipelineBuilder;
5957

6058
impl PipelineBuilder {
@@ -173,7 +171,6 @@ impl PipelineBuilder {
173171
let fields_with_cluster_key = properties.fields_with_cluster_key();
174172
let schema = DataSchemaRefExt::create(fields_with_cluster_key);
175173
let schema = add_order_field(schema, &sort_desc);
176-
let order_offset = schema.fields.len() - 1;
177174

178175
let num_processors = self.main_pipeline.output_len();
179176
let sample_size = self
@@ -203,44 +200,24 @@ impl PipelineBuilder {
203200
let processor_id = AtomicUsize::new(0);
204201

205202
let settings = self.ctx.get_settings();
206-
let enable_writings = settings.get_enable_block_stream_writes()?;
207-
if enable_writings {
208-
let memory_settings = MemorySettings::disable_spill();
209-
self.main_pipeline.add_transform(|input, output| {
210-
let strategy =
211-
ReclusterPartitionStrategys::new(properties.clone(), order_offset);
212-
213-
Ok(ProcessorPtr::create(Box::new(
214-
TransformPartitionCollect::new(
215-
self.ctx.clone(),
216-
input,
217-
output,
218-
&settings,
219-
processor_id.fetch_add(1, atomic::Ordering::AcqRel),
220-
num_processors,
221-
partitions,
222-
memory_settings.clone(),
223-
None,
224-
strategy,
225-
)?,
226-
)))
227-
})?;
228-
229-
self.main_pipeline.add_transform(|input, output| {
230-
TransformBlockBuilder::try_create(input, output, properties.clone())
231-
})?;
232-
} else {
233-
self.main_pipeline.add_transform(|input, output| {
234-
TransformReclusterPartition::try_create(
203+
let memory_settings = MemorySettings::disable_spill();
204+
self.main_pipeline.add_transform(|input, output| {
205+
let strategy = ReclusterPartitionStrategy::new(properties.clone());
206+
Ok(ProcessorPtr::create(Box::new(
207+
TransformPartitionCollect::new(
208+
self.ctx.clone(),
235209
input,
236210
output,
237-
properties.clone(),
211+
&settings,
238212
processor_id.fetch_add(1, atomic::Ordering::AcqRel),
239213
num_processors,
240214
partitions,
241-
)
242-
})?;
243-
}
215+
memory_settings.clone(),
216+
None,
217+
strategy,
218+
)?,
219+
)))
220+
})?;
244221

245222
self.main_pipeline.add_async_accumulating_transformer(|| {
246223
TransformBlockWriter::create(

src/query/service/src/pipelines/processors/transforms/recluster/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ pub use range_bound_sampler::RangeBoundSampler;
2525
pub use recluster_partition_exchange::ReclusterPartitionExchange;
2626
pub use recluster_partition_strategy::CompactPartitionStrategy;
2727
pub use recluster_partition_strategy::ReclusterPartitionStrategy;
28-
pub use recluster_partition_strategy::ReclusterPartitionStrategys;
2928
pub use recluster_sample_state::SampleState;
3029
pub use transform_add_order_column::TransformAddOrderColumn;
3130
pub use transform_range_partition_indexer::TransformRangePartitionIndexer;

src/query/service/src/pipelines/processors/transforms/recluster/recluster_partition_strategy.rs

Lines changed: 6 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ use std::sync::Arc;
1616

1717
use databend_common_exception::Result;
1818
use databend_common_expression::DataBlock;
19-
use databend_common_expression::LimitType;
20-
use databend_common_expression::SortColumnDescription;
2119
use databend_common_storages_fuse::io::StreamBlockBuilder;
2220
use databend_common_storages_fuse::io::StreamBlockProperties;
2321

@@ -34,10 +32,6 @@ impl ReclusterPartitionStrategy {
3432
pub fn new(properties: Arc<StreamBlockProperties>) -> Self {
3533
Self { properties }
3634
}
37-
38-
fn concat_blocks(blocks: Vec<DataBlock>) -> Result<DataBlock> {
39-
DataBlock::concat(&blocks)
40-
}
4135
}
4236

4337
impl PartitionProcessStrategy for ReclusterPartitionStrategy {
@@ -74,21 +68,23 @@ impl PartitionProcessStrategy for ReclusterPartitionStrategy {
7468
continue;
7569
}
7670
if !staged_blocks.is_empty() {
77-
compacted.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?);
71+
compacted.push(std::mem::take(&mut staged_blocks));
7872
}
7973
std::mem::swap(&mut staged_blocks, &mut pending_blocks);
8074
accumulated_rows = 0;
8175
accumulated_bytes = 0;
8276
}
8377
staged_blocks.append(&mut pending_blocks);
8478
if !staged_blocks.is_empty() {
85-
compacted.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?);
79+
compacted.push(std::mem::take(&mut staged_blocks));
8680
}
8781

8882
let mut result = Vec::new();
8983
let mut builder = StreamBlockBuilder::try_new_with_config(self.properties.clone())?;
90-
for block in compacted {
91-
builder.write(block)?;
84+
for blocks in compacted {
85+
for block in blocks {
86+
builder.write(block)?;
87+
}
9288
if builder.need_flush() {
9389
let serialized = builder.finish()?;
9490
result.push(DataBlock::empty_with_meta(Box::new(serialized)));
@@ -177,89 +173,3 @@ impl PartitionProcessStrategy for CompactPartitionStrategy {
177173
Ok(result)
178174
}
179175
}
180-
181-
pub struct ReclusterPartitionStrategys {
182-
properties: Arc<StreamBlockProperties>,
183-
sort_desc: Vec<SortColumnDescription>,
184-
}
185-
186-
impl ReclusterPartitionStrategys {
187-
pub fn new(properties: Arc<StreamBlockProperties>, offset: usize) -> Self {
188-
Self {
189-
properties,
190-
sort_desc: vec![SortColumnDescription {
191-
offset,
192-
asc: true,
193-
nulls_first: false,
194-
}],
195-
}
196-
}
197-
198-
fn concat_blocks(blocks: Vec<DataBlock>) -> Result<DataBlock> {
199-
DataBlock::concat(&blocks)
200-
}
201-
}
202-
203-
impl PartitionProcessStrategy for ReclusterPartitionStrategys {
204-
const NAME: &'static str = "Recluster";
205-
206-
fn calc_partitions(
207-
&self,
208-
processor_id: usize,
209-
num_processors: usize,
210-
num_partitions: usize,
211-
) -> Vec<usize> {
212-
(0..num_partitions)
213-
.filter(|&partition| (partition * num_processors) / num_partitions == processor_id)
214-
.collect()
215-
}
216-
217-
/// Stream write each block, and flush it conditionally based on builder status
218-
/// and input size estimation.
219-
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
220-
let blocks_num = data_blocks.len();
221-
let mut accumulated_rows = 0;
222-
let mut accumulated_bytes = 0;
223-
let mut pending_blocks = Vec::with_capacity(blocks_num);
224-
let mut staged_blocks = Vec::with_capacity(blocks_num);
225-
let mut compacted = Vec::with_capacity(blocks_num);
226-
for block in data_blocks {
227-
accumulated_rows += block.num_rows();
228-
accumulated_bytes += block.estimate_block_size();
229-
pending_blocks.push(block);
230-
if !self
231-
.properties
232-
.check_large_enough(accumulated_rows, accumulated_bytes)
233-
{
234-
continue;
235-
}
236-
if !staged_blocks.is_empty() {
237-
compacted.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?);
238-
}
239-
std::mem::swap(&mut staged_blocks, &mut pending_blocks);
240-
accumulated_rows = 0;
241-
accumulated_bytes = 0;
242-
}
243-
staged_blocks.append(&mut pending_blocks);
244-
if !staged_blocks.is_empty() {
245-
compacted.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?);
246-
}
247-
248-
let mut result = Vec::new();
249-
let mut builder = StreamBlockBuilder::try_new_with_config(self.properties.clone())?;
250-
for block in compacted {
251-
let block = DataBlock::sort_with_type(&block, &self.sort_desc, LimitType::None)?;
252-
builder.write(block)?;
253-
if builder.need_flush() {
254-
let serialized = builder.finish()?;
255-
result.push(DataBlock::empty_with_meta(Box::new(serialized)));
256-
builder = StreamBlockBuilder::try_new_with_config(self.properties.clone())?;
257-
}
258-
}
259-
if !builder.is_empty() {
260-
let serialized = builder.finish()?;
261-
result.push(DataBlock::empty_with_meta(Box::new(serialized)));
262-
}
263-
Ok(result)
264-
}
265-
}

src/query/service/src/pipelines/processors/transforms/recluster/recluster_sample_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl SampleStateInner {
114114
let weight = weights[idx];
115115
cum_weight += weight;
116116

117-
if cum_weight >= target && previous_bound.map_or(true, |prev| value > prev) {
117+
if cum_weight >= target && previous_bound.is_none_or(|prev| value > prev) {
118118
if unlikely(value == max_val) {
119119
self.max_value = Some(max_val.clone());
120120
break;

src/query/service/src/pipelines/processors/transforms/recluster/transform_range_partition_indexer.rs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -118,28 +118,40 @@ impl Processor for TransformRangePartitionIndexer {
118118
fn process(&mut self) -> Result<()> {
119119
let start = Instant::now();
120120
if let Some(mut block) = self.input_data.pop() {
121-
let bound_len = self.bounds.len();
122121
let num_rows = block.num_rows();
123122
let mut builder = Vec::with_capacity(num_rows);
124123
let last_col = block.get_last_column().as_binary().unwrap();
125-
for index in 0..num_rows {
126-
let val = unsafe { last_col.index_unchecked(index) };
127-
if unlikely(self.max_value.as_ref().is_some_and(|v| val >= v.as_slice())) {
128-
let range_id = bound_len + 1;
129-
builder.push(range_id as u64);
130-
continue;
124+
if let Some(max_value) = self.max_value.as_ref() {
125+
let bound_len = self.bounds.len();
126+
for index in 0..num_rows {
127+
let val = unsafe { last_col.index_unchecked(index) };
128+
if unlikely(val >= max_value.as_slice()) {
129+
let range_id = bound_len + 1;
130+
builder.push(range_id as u64);
131+
continue;
132+
}
133+
134+
let idx = self
135+
.bounds
136+
.binary_search_by(|b| b.as_slice().cmp(val))
137+
.unwrap_or_else(|i| i);
138+
builder.push(idx as u64);
139+
}
140+
} else {
141+
for index in 0..num_rows {
142+
let val = unsafe { last_col.index_unchecked(index) };
143+
let idx = self
144+
.bounds
145+
.binary_search_by(|b| b.as_slice().cmp(val))
146+
.unwrap_or_else(|i| i);
147+
builder.push(idx as u64);
131148
}
132-
133-
let idx = self
134-
.bounds
135-
.binary_search_by(|b| b.as_slice().cmp(val))
136-
.unwrap_or_else(|i| i);
137-
builder.push(idx as u64);
138149
}
139-
150+
block.pop_columns(1);
140151
block.add_column(UInt64Type::from_data(builder));
141152
self.output_data.push_back(block);
142153
}
154+
143155
log::info!("Recluster range output: {:?}", start.elapsed());
144156
Ok(())
145157
}

src/query/settings/src/settings_default.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,13 +1312,6 @@ impl DefaultSettings {
13121312
scope: SettingScope::Both,
13131313
range: Some(SettingRange::Numeric(0..=1)),
13141314
}),
1315-
("enable_block_stream_writes", DefaultSettingValue {
1316-
value: UserSettingValue::UInt64(0),
1317-
desc: "Enables block stream write",
1318-
mode: SettingMode::Both,
1319-
scope: SettingScope::Both,
1320-
range: Some(SettingRange::Numeric(0..=1)),
1321-
}),
13221315
("trace_sample_rate", DefaultSettingValue {
13231316
value: UserSettingValue::UInt64(1),
13241317
desc: "Setting the trace sample rate. The value should be between '0' and '100'",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -964,10 +964,6 @@ impl Settings {
964964
Ok(self.try_get_u64("enable_block_stream_write")? == 1)
965965
}
966966

967-
pub fn get_enable_block_stream_writes(&self) -> Result<bool> {
968-
Ok(self.try_get_u64("enable_block_stream_writes")? == 1)
969-
}
970-
971967
pub fn get_statement_queue_ttl_in_seconds(&self) -> Result<u64> {
972968
self.try_get_u64("statement_queue_ttl_in_seconds")
973969
}

0 commit comments

Comments
 (0)