Skip to content

Commit aab0f85

Browse files
authored
feat(query): TopN window operator (#16726)
* transform_window_partial_top_n Signed-off-by: coldWater <forsaken628@gmail.com> * test Signed-off-by: coldWater <forsaken628@gmail.com> * builder Signed-off-by: coldWater <forsaken628@gmail.com> * RulePushDownFilterWindowRank Signed-off-by: coldWater <forsaken628@gmail.com> * WindowPartitionTopNFunc Signed-off-by: coldWater <forsaken628@gmail.com> * SortCompareEquality Signed-off-by: coldWater <forsaken628@gmail.com> * name Signed-off-by: coldWater <forsaken628@gmail.com> * group_hash_value_spread Signed-off-by: coldWater <forsaken628@gmail.com> * group_hash_columns_slice Signed-off-by: coldWater <forsaken628@gmail.com> * WindowPartitionTopNExchange Signed-off-by: coldWater <forsaken628@gmail.com> * builder Signed-off-by: coldWater <forsaken628@gmail.com> * group_hash test Signed-off-by: coldWater <forsaken628@gmail.com> * fix Signed-off-by: coldWater <forsaken628@gmail.com> * test Signed-off-by: coldWater <forsaken628@gmail.com> * refine Signed-off-by: coldWater <forsaken628@gmail.com> --------- Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent aca2fe4 commit aab0f85

File tree

29 files changed

+1113
-149
lines changed

29 files changed

+1113
-149
lines changed

src/query/expression/src/aggregate/group_hash.rs

Lines changed: 350 additions & 11 deletions
Large diffs are not rendered by default.

src/query/expression/src/kernels/sort_compare.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub struct SortCompare {
3737
current_column_index: usize,
3838
validity: Option<Bitmap>,
3939
equality_index: Vec<u8>,
40+
force_equality: bool,
4041
}
4142

4243
macro_rules! do_sorter {
@@ -112,12 +113,25 @@ impl SortCompare {
112113
current_column_index: 0,
113114
validity: None,
114115
equality_index,
116+
force_equality: matches!(limit, LimitType::LimitRank(_)),
117+
}
118+
}
119+
120+
pub fn with_force_equality(ordering_descs: Vec<SortColumnDescription>, rows: usize) -> Self {
121+
Self {
122+
rows,
123+
limit: LimitType::None,
124+
permutation: (0..rows as u32).collect(),
125+
ordering_descs,
126+
current_column_index: 0,
127+
validity: None,
128+
equality_index: vec![1; rows as _],
129+
force_equality: true,
115130
}
116131
}
117132

118133
fn need_update_equality_index(&self) -> bool {
119-
self.current_column_index != self.ordering_descs.len() - 1
120-
|| matches!(self.limit, LimitType::LimitRank(_))
134+
self.force_equality || self.current_column_index != self.ordering_descs.len() - 1
121135
}
122136

123137
pub fn increment_column_index(&mut self) {
@@ -254,6 +268,11 @@ impl SortCompare {
254268
}
255269
}
256270
}
271+
272+
pub fn equality_index(&self) -> &[u8] {
273+
debug_assert!(self.force_equality);
274+
&self.equality_index
275+
}
257276
}
258277

259278
impl ValueVisitor for SortCompare {

src/query/expression/src/types.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub use self::empty_map::EmptyMapType;
5757
pub use self::generic::GenericType;
5858
pub use self::geography::GeographyColumn;
5959
pub use self::geography::GeographyType;
60+
pub use self::geometry::GeometryType;
6061
pub use self::map::MapType;
6162
pub use self::null::NullType;
6263
pub use self::nullable::NullableColumn;

src/query/expression/src/types/geography.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ impl<'a> GeographyRef<'a> {
8383
}
8484
}
8585

86+
impl<'a> AsRef<[u8]> for GeographyRef<'a> {
87+
fn as_ref(&self) -> &[u8] {
88+
self.0
89+
}
90+
}
91+
8692
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8793
pub struct GeographyType;
8894

src/query/expression/src/utils/visitor.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,25 @@ pub trait ValueVisitor {
3636
self.visit_typed_column::<EmptyMapType>(len)
3737
}
3838

39+
fn visit_any_number(&mut self, column: NumberColumn) -> Result<()> {
40+
with_number_type!(|NUM_TYPE| match column {
41+
NumberColumn::NUM_TYPE(b) => self.visit_number(b),
42+
})
43+
}
44+
3945
fn visit_number<T: Number>(
4046
&mut self,
4147
column: <NumberType<T> as ValueType>::Column,
4248
) -> Result<()> {
4349
self.visit_typed_column::<NumberType<T>>(column)
4450
}
4551

52+
fn visit_any_decimal(&mut self, column: DecimalColumn) -> Result<()> {
53+
with_decimal_type!(|DECIMAL_TYPE| match column {
54+
DecimalColumn::DECIMAL_TYPE(b, size) => self.visit_decimal(b, size),
55+
})
56+
}
57+
4658
fn visit_decimal<T: Decimal>(&mut self, column: Buffer<T>, _size: DecimalSize) -> Result<()> {
4759
self.visit_typed_column::<DecimalType<T>>(column)
4860
}
@@ -113,16 +125,8 @@ pub trait ValueVisitor {
113125
Column::Null { len } => self.visit_null(len),
114126
Column::EmptyArray { len } => self.visit_empty_array(len),
115127
Column::EmptyMap { len } => self.visit_empty_map(len),
116-
Column::Number(column) => {
117-
with_number_type!(|NUM_TYPE| match column {
118-
NumberColumn::NUM_TYPE(b) => self.visit_number(b),
119-
})
120-
}
121-
Column::Decimal(column) => {
122-
with_decimal_type!(|DECIMAL_TYPE| match column {
123-
DecimalColumn::DECIMAL_TYPE(b, size) => self.visit_decimal(b, size),
124-
})
125-
}
128+
Column::Number(column) => self.visit_any_number(column),
129+
Column::Decimal(column) => self.visit_any_decimal(column),
126130
Column::Boolean(bitmap) => self.visit_boolean(bitmap),
127131
Column::Binary(column) => self.visit_binary(column),
128132
Column::String(column) => self.visit_string(column),

src/query/pipeline/core/src/processors/shuffle_processor.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub enum MultiwayStrategy {
3131
}
3232

3333
pub trait Exchange: Send + Sync + 'static {
34+
const NAME: &'static str;
3435
const STRATEGY: MultiwayStrategy = MultiwayStrategy::Random;
3536

3637
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>>;
@@ -185,7 +186,7 @@ impl<T: Exchange> PartitionProcessor<T> {
185186

186187
impl<T: Exchange> Processor for PartitionProcessor<T> {
187188
fn name(&self) -> String {
188-
String::from("ShufflePartition")
189+
format!("ShufflePartition({})", T::NAME)
189190
}
190191

191192
fn as_any(&mut self) -> &mut dyn Any {
@@ -287,7 +288,7 @@ impl<T: Exchange> MergePartitionProcessor<T> {
287288

288289
impl<T: Exchange> Processor for MergePartitionProcessor<T> {
289290
fn name(&self) -> String {
290-
String::from("ShuffleMergePartition")
291+
format!("ShuffleMergePartition({})", T::NAME)
291292
}
292293

293294
fn as_any(&mut self) -> &mut dyn Any {

src/query/service/src/pipelines/builders/builder_window.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::pipelines::processors::transforms::TransformWindow;
3434
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
3535
use crate::pipelines::processors::transforms::WindowFunctionInfo;
3636
use crate::pipelines::processors::transforms::WindowPartitionExchange;
37+
use crate::pipelines::processors::transforms::WindowPartitionTopNExchange;
3738
use crate::pipelines::processors::transforms::WindowSortDesc;
3839
use crate::pipelines::processors::transforms::WindowSpillSettings;
3940
use crate::pipelines::PipelineBuilder;
@@ -169,10 +170,23 @@ impl PipelineBuilder {
169170
})
170171
.collect::<Result<Vec<_>>>()?;
171172

172-
self.main_pipeline.exchange(
173-
num_processors,
174-
WindowPartitionExchange::create(partition_by.clone(), num_partitions),
175-
);
173+
if let Some(top_n) = &window_partition.top_n {
174+
self.main_pipeline.exchange(
175+
num_processors,
176+
WindowPartitionTopNExchange::create(
177+
partition_by.clone(),
178+
sort_desc.clone(),
179+
top_n.top,
180+
top_n.func,
181+
num_partitions as u64,
182+
),
183+
)
184+
} else {
185+
self.main_pipeline.exchange(
186+
num_processors,
187+
WindowPartitionExchange::create(partition_by.clone(), num_partitions),
188+
);
189+
}
176190

177191
let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?;
178192
let temp_dir_manager = TempDirManager::instance();

src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ mod transform_window_partition_collect;
1616
mod window_partition_buffer;
1717
mod window_partition_exchange;
1818
mod window_partition_meta;
19+
mod window_partition_partial_top_n_exchange;
1920

2021
pub use transform_window_partition_collect::*;
2122
pub use window_partition_buffer::*;
2223
pub use window_partition_exchange::*;
2324
pub use window_partition_meta::*;
25+
pub use window_partition_partial_top_n_exchange::*;

src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_exchange.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_exception::Result;
18-
use databend_common_expression::group_hash_columns_slice;
19-
use databend_common_expression::ColumnBuilder;
18+
use databend_common_expression::group_hash_columns;
2019
use databend_common_expression::DataBlock;
21-
use databend_common_expression::Value;
20+
use databend_common_expression::InputColumns;
2221
use databend_common_pipeline_core::processors::Exchange;
2322

2423
use super::WindowPartitionMeta;
@@ -38,27 +37,17 @@ impl WindowPartitionExchange {
3837
}
3938

4039
impl Exchange for WindowPartitionExchange {
40+
const NAME: &'static str = "Window";
4141
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
4242
let num_rows = data_block.num_rows();
4343

4444
// Extract the columns used for hash computation.
45-
let hash_cols = self
46-
.hash_keys
47-
.iter()
48-
.map(|&offset| {
49-
let entry = data_block.get_by_offset(offset);
50-
match &entry.value {
51-
Value::Scalar(s) => {
52-
ColumnBuilder::repeat(&s.as_ref(), num_rows, &entry.data_type).build()
53-
}
54-
Value::Column(c) => c.clone(),
55-
}
56-
})
57-
.collect::<Vec<_>>();
45+
let data_block = data_block.consume_convert_to_full();
46+
let hash_cols = InputColumns::new_block_proxy(&self.hash_keys, &data_block);
5847

5948
// Compute the hash value for each row.
6049
let mut hashes = vec![0u64; num_rows];
61-
group_hash_columns_slice(&hash_cols, &mut hashes);
50+
group_hash_columns(hash_cols, &mut hashes);
6251

6352
// Scatter the data block to different partitions.
6453
let indices = hashes

0 commit comments

Comments
 (0)