Skip to content

Commit 4181f1f

Browse files
committed
spill block build and block write
1 parent e25a455 commit 4181f1f

File tree

23 files changed

+355
-499
lines changed

23 files changed

+355
-499
lines changed

src/query/expression/src/utils/block_thresholds.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ impl BlockThresholds {
153153
let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed);
154154
// Adjust the number of blocks based on block size thresholds.
155155
let max_bytes_per_block = self.max_bytes_per_block.min(400 * 1024 * 1024);
156-
let min_bytes_per_block = max_bytes_per_block / 2;
156+
let min_bytes_per_block = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
157157
let block_nums = if bytes_per_block > max_bytes_per_block {
158158
// Case 1: If the block size is too bigger.
159159
total_bytes.div_ceil(max_bytes_per_block)
@@ -201,7 +201,7 @@ impl BlockThresholds {
201201
// Adjust block count based on byte size thresholds.
202202
let bytes_per_block = total_bytes.div_ceil(by_compressed);
203203
let max_bytes = self.max_bytes_per_block.min(400 * 1024 * 1024);
204-
let min_bytes = max_bytes / 2;
204+
let min_bytes = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
205205
let total_partitions = if bytes_per_block > max_bytes {
206206
// Block size is too large.
207207
total_bytes / max_bytes

src/query/service/src/pipelines/builders/builder_hilbert_partition.rs

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,21 @@ use databend_common_catalog::table_context::TableContext;
2020
use databend_common_exception::Result;
2121
use databend_common_pipeline_core::processors::ProcessorPtr;
2222
use databend_common_pipeline_transforms::MemorySettings;
23+
use databend_common_pipeline_transforms::TransformPipelineHelper;
2324
use databend_common_sql::executor::physical_plans::HilbertPartition;
2425
use databend_common_sql::executor::physical_plans::MutationKind;
26+
use databend_common_storages_fuse::io::StreamBlockProperties;
2527
use databend_common_storages_fuse::operations::TransformBlockWriter;
2628
use databend_common_storages_fuse::operations::TransformSerializeBlock;
2729
use databend_common_storages_fuse::statistics::ClusterStatsGenerator;
2830
use databend_common_storages_fuse::FuseTable;
2931
use databend_storages_common_cache::TempDirManager;
3032

3133
use crate::pipelines::memory_settings::MemorySettingsExt;
32-
use crate::pipelines::processors::transforms::CompactStrategy;
33-
use crate::pipelines::processors::transforms::HilbertPartitionExchange;
34-
use crate::pipelines::processors::transforms::TransformHilbertCollect;
35-
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
34+
use crate::pipelines::processors::transforms::CompactPartitionStrategy;
35+
use crate::pipelines::processors::transforms::ReclusterPartitionExchange;
36+
use crate::pipelines::processors::transforms::ReclusterPartitionStrategy;
37+
use crate::pipelines::processors::transforms::TransformPartitionCollect;
3638
use crate::pipelines::PipelineBuilder;
3739
use crate::spillers::SpillerDiskConfig;
3840

@@ -49,7 +51,7 @@ impl PipelineBuilder {
4951

5052
self.main_pipeline.exchange(
5153
num_processors,
52-
HilbertPartitionExchange::create(partition.range_start, partition.range_width),
54+
ReclusterPartitionExchange::create(partition.range_start, partition.range_width),
5355
);
5456

5557
let settings = self.ctx.get_settings();
@@ -66,9 +68,15 @@ impl PipelineBuilder {
6668
let processor_id = AtomicUsize::new(0);
6769

6870
if enable_stream_writer {
71+
let properties = StreamBlockProperties::try_create(
72+
self.ctx.clone(),
73+
table,
74+
partition.table_meta_timestamps,
75+
)?;
76+
6977
self.main_pipeline.add_transform(|input, output| {
7078
Ok(ProcessorPtr::create(Box::new(
71-
TransformHilbertCollect::new(
79+
TransformPartitionCollect::new(
7280
self.ctx.clone(),
7381
input,
7482
output,
@@ -78,28 +86,24 @@ impl PipelineBuilder {
7886
partition.range_width,
7987
window_spill_settings.clone(),
8088
disk_spill.clone(),
81-
partition.rows_per_block,
82-
partition.bytes_per_block,
89+
ReclusterPartitionStrategy::new(properties.clone()),
8390
)?,
8491
)))
8592
})?;
8693

87-
self.main_pipeline.add_transform(|input, output| {
88-
TransformBlockWriter::try_create(
94+
self.main_pipeline.add_async_accumulating_transformer(|| {
95+
TransformBlockWriter::create(
8996
self.ctx.clone(),
90-
input,
91-
output,
9297
MutationKind::Recluster,
9398
table,
94-
partition.table_meta_timestamps,
9599
false,
96-
Some(partition.bytes_per_block),
97100
)
98-
})
101+
});
102+
Ok(())
99103
} else {
100104
self.main_pipeline.add_transform(|input, output| {
101105
Ok(ProcessorPtr::create(Box::new(
102-
TransformWindowPartitionCollect::new(
106+
TransformPartitionCollect::new(
103107
self.ctx.clone(),
104108
input,
105109
output,
@@ -109,24 +113,26 @@ impl PipelineBuilder {
109113
partition.range_width,
110114
window_spill_settings.clone(),
111115
disk_spill.clone(),
112-
CompactStrategy::new(partition.rows_per_block, partition.bytes_per_block),
116+
CompactPartitionStrategy::new(
117+
partition.rows_per_block,
118+
partition.bytes_per_block,
119+
),
113120
)?,
114121
)))
115122
})?;
116123

117-
self.main_pipeline
118-
.add_transform(|transform_input_port, transform_output_port| {
119-
let proc = TransformSerializeBlock::try_create(
120-
self.ctx.clone(),
121-
transform_input_port,
122-
transform_output_port,
123-
table,
124-
ClusterStatsGenerator::default(),
125-
MutationKind::Recluster,
126-
partition.table_meta_timestamps,
127-
)?;
128-
proc.into_processor()
129-
})
124+
self.main_pipeline.add_transform(|input, output| {
125+
let proc = TransformSerializeBlock::try_create(
126+
self.ctx.clone(),
127+
input,
128+
output,
129+
table,
130+
ClusterStatsGenerator::default(),
131+
MutationKind::Recluster,
132+
partition.table_meta_timestamps,
133+
)?;
134+
proc.into_processor()
135+
})
130136
}
131137
}
132138
}

src/query/service/src/pipelines/builders/builder_window.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ use databend_storages_common_cache::TempDirManager;
3030

3131
use crate::pipelines::memory_settings::MemorySettingsExt;
3232
use crate::pipelines::processors::transforms::FrameBound;
33-
use crate::pipelines::processors::transforms::SortStrategy;
33+
use crate::pipelines::processors::transforms::TransformPartitionCollect;
3434
use crate::pipelines::processors::transforms::TransformWindow;
35-
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
3635
use crate::pipelines::processors::transforms::WindowFunctionInfo;
3736
use crate::pipelines::processors::transforms::WindowPartitionExchange;
37+
use crate::pipelines::processors::transforms::WindowPartitionStrategy;
3838
use crate::pipelines::processors::transforms::WindowPartitionTopNExchange;
3939
use crate::pipelines::processors::transforms::WindowSortDesc;
4040
use crate::pipelines::PipelineBuilder;
@@ -203,14 +203,14 @@ impl PipelineBuilder {
203203

204204
let processor_id = AtomicUsize::new(0);
205205
self.main_pipeline.add_transform(|input, output| {
206-
let strategy = SortStrategy::try_create(
206+
let strategy = WindowPartitionStrategy::try_create(
207207
&settings,
208208
sort_desc.clone(),
209209
plan_schema.clone(),
210210
have_order_col,
211211
)?;
212212
Ok(ProcessorPtr::create(Box::new(
213-
TransformWindowPartitionCollect::new(
213+
TransformPartitionCollect::new(
214214
self.ctx.clone(),
215215
input,
216216
output,

src/query/service/src/pipelines/processors/transforms/recluster/compact_strategy.rs

Lines changed: 0 additions & 78 deletions
This file was deleted.

src/query/service/src/pipelines/processors/transforms/recluster/mod.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
mod compact_strategy;
16-
mod hilbert_partition_exchange;
17-
mod transform_hilbert_collect;
15+
mod recluster_partition_exchange;
16+
mod recluster_partition_strategy;
1817

19-
pub use compact_strategy::CompactStrategy;
20-
pub use hilbert_partition_exchange::HilbertPartitionExchange;
21-
pub use transform_hilbert_collect::TransformHilbertCollect;
18+
pub use recluster_partition_exchange::ReclusterPartitionExchange;
19+
pub use recluster_partition_strategy::CompactPartitionStrategy;
20+
pub use recluster_partition_strategy::ReclusterPartitionStrategy;

src/query/service/src/pipelines/processors/transforms/recluster/hilbert_partition_exchange.rs renamed to src/query/service/src/pipelines/processors/transforms/recluster/recluster_partition_exchange.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@ use databend_common_pipeline_core::processors::Exchange;
2020

2121
use crate::pipelines::processors::transforms::WindowPartitionMeta;
2222

23-
pub struct HilbertPartitionExchange {
23+
pub struct ReclusterPartitionExchange {
2424
start: u64,
2525
width: usize,
2626
}
2727

28-
impl HilbertPartitionExchange {
29-
pub fn create(start: u64, width: usize) -> Arc<HilbertPartitionExchange> {
30-
Arc::new(HilbertPartitionExchange { start, width })
28+
impl ReclusterPartitionExchange {
29+
pub fn create(start: u64, width: usize) -> Arc<ReclusterPartitionExchange> {
30+
Arc::new(ReclusterPartitionExchange { start, width })
3131
}
3232
}
3333

34-
impl Exchange for HilbertPartitionExchange {
35-
const NAME: &'static str = "Hilbert";
34+
impl Exchange for ReclusterPartitionExchange {
35+
const NAME: &'static str = "Recluster";
3636
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
3737
let mut data_block = data_block;
3838
let range_ids = data_block
@@ -51,16 +51,10 @@ impl Exchange for HilbertPartitionExchange {
5151

5252
let scatter_indices = DataBlock::divide_indices_by_scatter_size(&indices, self.width);
5353
// Partition the data blocks to different processors.
54-
let base = self.width / n;
55-
let remainder = self.width % n;
5654
let mut output_data_blocks = vec![vec![]; n];
5755
for (partition_id, indices) in scatter_indices.into_iter().take(self.width).enumerate() {
5856
if !indices.is_empty() {
59-
let target = if partition_id < remainder * (base + 1) {
60-
partition_id / (base + 1)
61-
} else {
62-
(partition_id - remainder) / base
63-
};
57+
let target = (partition_id * n) / self.width;
6458
let block = data_block.take_with_optimize_size(&indices)?;
6559
output_data_blocks[target].push((partition_id, block));
6660
}

0 commit comments

Comments
 (0)