Skip to content

Commit 0f02fc6

Browse files
committed
linear recluster support block stream writer
1 parent 841e4cf commit 0f02fc6

File tree

13 files changed

+362
-300
lines changed

13 files changed

+362
-300
lines changed

src/query/functions/src/scalars/hilbert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ fn calc_range_partition_id(val: ScalarRef, arr: &Column) -> u64 {
267267
while low < high {
268268
let mid = low + ((high - low) / 2);
269269
let bound = unsafe { arr.index_unchecked(mid) };
270-
if val > bound {
270+
if val >= bound {
271271
low = mid + 1;
272272
} else {
273273
high = mid;

src/query/service/src/local/display.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ impl FormatDisplay<'_> {
259259
rows_str,
260260
self.start.elapsed().as_secs_f64(),
261261
humanize_count(stats.total_rows as f64),
262-
HumanBytes(stats.total_rows as u64),
262+
HumanBytes(stats.total_bytes as u64),
263263
humanize_count(stats.total_rows as f64 / self.start.elapsed().as_secs_f64()),
264264
HumanBytes((stats.total_bytes as f64 / self.start.elapsed().as_secs_f64()) as u64),
265265
);

src/query/service/src/pipelines/builders/builder_hilbert_partition.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ impl PipelineBuilder {
7171
let properties = StreamBlockProperties::try_create(
7272
self.ctx.clone(),
7373
table,
74+
MutationKind::Recluster,
75+
None,
7476
partition.table_meta_timestamps,
7577
)?;
7678

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 269 additions & 64 deletions
Large diffs are not rendered by default.

src/query/service/src/pipelines/processors/transforms/recluster/builder.rs

Lines changed: 0 additions & 198 deletions
This file was deleted.

src/query/service/src/pipelines/processors/transforms/recluster/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
mod builder;
1615
mod range_bound_sampler;
1716
mod recluster_partition_exchange;
1817
mod recluster_partition_strategy;
@@ -25,7 +24,8 @@ pub use range_bound_sampler::RangeBoundSampler;
2524
pub use recluster_partition_exchange::ReclusterPartitionExchange;
2625
pub use recluster_partition_strategy::CompactPartitionStrategy;
2726
pub use recluster_partition_strategy::ReclusterPartitionStrategy;
28-
pub(crate) use recluster_sample_state::SampleState;
27+
pub use recluster_sample_state::SampleState;
28+
pub use transform_add_order_column::TransformAddOrderColumn;
2929
pub use transform_range_partition_indexer::TransformRangePartitionIndexer;
30-
pub(crate) use transform_recluster_collect::ReclusterSampleMeta;
30+
pub use transform_recluster_collect::ReclusterSampleMeta;
3131
pub use transform_recluster_collect::TransformReclusterCollect;

src/query/service/src/pipelines/processors/transforms/recluster/transform_range_partition_indexer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ where
137137
while low < high {
138138
let mid = low + ((high - low) / 2);
139139
let bound = unsafe { self.bounds.get_unchecked(mid) }.clone();
140-
if val > bound {
140+
if val >= bound {
141141
low = mid + 1;
142142
} else {
143143
high = mid;

src/query/service/src/pipelines/processors/transforms/recluster/transform_recluster_collect.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use databend_common_exception::Result;
1616
use databend_common_expression::local_block_meta_serde;
1717
use databend_common_expression::types::ArgType;
18-
use databend_common_expression::types::ValueType;
1918
use databend_common_expression::BlockMetaInfo;
2019
use databend_common_expression::DataBlock;
2120
use databend_common_expression::Scalar;

src/query/settings/src/settings_default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,7 @@ impl DefaultSettings {
860860
desc: "Sets the maximum byte size of blocks for recluster",
861861
mode: SettingMode::Both,
862862
scope: SettingScope::Both,
863-
range: Some(SettingRange::Numeric(0..=u64::MAX)),
863+
range: Some(SettingRange::Numeric(0..=80 * 1024 * 1024 * 1024)),
864864
}),
865865
("compact_max_block_selection", DefaultSettingValue {
866866
value: UserSettingValue::UInt64(10000),

src/query/storages/fuse/src/io/write/stream/block_builder.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@ use databend_common_expression::Column;
2828
use databend_common_expression::ColumnId;
2929
use databend_common_expression::ComputedExpr;
3030
use databend_common_expression::DataBlock;
31+
use databend_common_expression::DataField;
3132
use databend_common_expression::FieldIndex;
3233
use databend_common_expression::TableField;
3334
use databend_common_expression::TableSchema;
3435
use databend_common_expression::TableSchemaRef;
3536
use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID;
3637
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
3738
use databend_common_native::write::NativeWriter;
39+
use databend_common_sql::evaluator::BlockOperator;
40+
use databend_common_sql::executor::physical_plans::MutationKind;
3841
use databend_storages_common_index::BloomIndex;
3942
use databend_storages_common_index::BloomIndexBuilder;
4043
use databend_storages_common_index::Index;
@@ -367,17 +370,24 @@ impl StreamBlockProperties {
367370
pub fn try_create(
368371
ctx: Arc<dyn TableContext>,
369372
table: &FuseTable,
373+
kind: MutationKind,
374+
level: Option<i32>,
370375
table_meta_timestamps: TableMetaTimestamps,
371376
) -> Result<Arc<Self>> {
372377
// remove virtual computed fields.
373-
let fields = table
378+
let mut fields = table
374379
.schema()
375380
.fields()
376381
.iter()
377382
.filter(|f| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_))))
378383
.cloned()
379384
.collect::<Vec<_>>();
380-
385+
if !matches!(kind, MutationKind::Insert | MutationKind::Replace) {
386+
// add stream fields.
387+
for stream_column in table.stream_columns().iter() {
388+
fields.push(stream_column.table_field());
389+
}
390+
}
381391
let source_schema = Arc::new(TableSchema {
382392
fields,
383393
..table.schema().as_ref().clone()
@@ -397,7 +407,7 @@ impl StreamBlockProperties {
397407
let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta);
398408

399409
let cluster_stats_builder =
400-
ClusterStatisticsBuilder::try_create(table, ctx.clone(), &source_schema)?;
410+
ClusterStatisticsBuilder::try_create(table, ctx.clone(), &source_schema, level)?;
401411

402412
let mut stats_columns = vec![];
403413
let mut distinct_columns = vec![];
@@ -434,4 +444,16 @@ impl StreamBlockProperties {
434444
self.block_thresholds
435445
.check_large_enough(num_rows, data_size)
436446
}
447+
448+
pub fn cluster_operators(&self) -> Vec<BlockOperator> {
449+
self.cluster_stats_builder.operators()
450+
}
451+
452+
pub fn fields_with_cluster_key(&self) -> Vec<DataField> {
453+
self.cluster_stats_builder.out_fields()
454+
}
455+
456+
pub fn cluster_key_index(&self) -> &Vec<usize> {
457+
self.cluster_stats_builder.cluster_key_index()
458+
}
437459
}

0 commit comments

Comments
 (0)