Skip to content

Commit adac7a0

Browse files
Dousir9sundy-li
andauthored
feat(query): support take_ranges (#13878)
* support take_ranges * add comments * rename * make lint * add debug_assert_eq for take_ranges * &Vec<(u32, u32)> -> &[Range<u32>] --------- Co-authored-by: sundyli <543950155@qq.com>
1 parent 639638f commit adac7a0

File tree

4 files changed

+323
-2
lines changed

4 files changed

+323
-2
lines changed

src/query/expression/src/kernels/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ impl Column {
444444
/// # Safety
445445
/// * `src` + `src_idx`(in bits) must be [valid] for reads of `len` bits.
446446
/// * `ptr` must be [valid] for writes of `len` bits.
447-
unsafe fn copy_continuous_bits(
447+
pub unsafe fn copy_continuous_bits(
448448
ptr: &mut *mut u8,
449449
src: &[u8],
450450
mut dst_idx: usize,

src/query/expression/src/kernels/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod sort;
2121
mod take;
2222
mod take_chunks;
2323
mod take_compact;
24+
mod take_ranges;
2425
mod topk;
2526
mod utils;
2627

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use core::ops::Range;
16+
use std::sync::Arc;
17+
18+
use common_arrow::arrow::bitmap::Bitmap;
19+
use common_arrow::arrow::buffer::Buffer;
20+
use common_exception::Result;
21+
22+
use crate::kernels::take::BIT_MASK;
23+
use crate::kernels::utils::copy_advance_aligned;
24+
use crate::kernels::utils::set_vec_len_by_ptr;
25+
use crate::kernels::utils::store_advance_aligned;
26+
use crate::types::array::ArrayColumn;
27+
use crate::types::array::ArrayColumnBuilder;
28+
use crate::types::decimal::DecimalColumn;
29+
use crate::types::map::KvColumnBuilder;
30+
use crate::types::nullable::NullableColumn;
31+
use crate::types::number::NumberColumn;
32+
use crate::types::string::StringColumn;
33+
use crate::types::AnyType;
34+
use crate::types::ArrayType;
35+
use crate::types::MapType;
36+
use crate::types::ValueType;
37+
use crate::with_decimal_type;
38+
use crate::with_number_type;
39+
use crate::BlockEntry;
40+
use crate::Column;
41+
use crate::ColumnBuilder;
42+
use crate::DataBlock;
43+
use crate::Value;
44+
45+
impl DataBlock {
46+
// Generate a new `DataBlock` by the specified indices ranges.
47+
pub fn take_ranges(self, ranges: &[Range<u32>], num_rows: usize) -> Result<DataBlock> {
48+
debug_assert_eq!(
49+
ranges
50+
.iter()
51+
.map(|range| range.end - range.start)
52+
.sum::<u32>() as usize,
53+
num_rows
54+
);
55+
56+
let columns = self
57+
.columns()
58+
.iter()
59+
.map(|entry| match &entry.value {
60+
Value::Column(c) => {
61+
let value = Value::Column(Column::take_ranges(c, ranges, num_rows));
62+
BlockEntry::new(entry.data_type.clone(), value)
63+
}
64+
_ => entry.clone(),
65+
})
66+
.collect();
67+
Ok(DataBlock::new(columns, num_rows))
68+
}
69+
}
70+
71+
impl Column {
72+
// Generate a new `Column` by the specified indices ranges.
73+
fn take_ranges(&self, ranges: &[Range<u32>], num_rows: usize) -> Column {
74+
match self {
75+
Column::Null { .. } => Column::Null { len: num_rows },
76+
Column::EmptyArray { .. } => Column::EmptyArray { len: num_rows },
77+
Column::EmptyMap { .. } => Column::EmptyMap { len: num_rows },
78+
Column::Number(column) => with_number_type!(|NUM_TYPE| match column {
79+
NumberColumn::NUM_TYPE(values) => {
80+
Column::Number(NumberColumn::NUM_TYPE(Self::take_ranges_primitive_types(
81+
values, ranges, num_rows,
82+
)))
83+
}
84+
}),
85+
Column::Decimal(column) => with_decimal_type!(|DECIMAL_TYPE| match column {
86+
DecimalColumn::DECIMAL_TYPE(values, size) => {
87+
Column::Decimal(DecimalColumn::DECIMAL_TYPE(
88+
Self::take_ranges_primitive_types(values, ranges, num_rows),
89+
*size,
90+
))
91+
}
92+
}),
93+
Column::Boolean(bm) => {
94+
let column = Self::take_ranges_boolean_types(bm, ranges, num_rows);
95+
Column::Boolean(column)
96+
}
97+
Column::String(column) => {
98+
let column = Self::take_ranges_string_types(column, ranges, num_rows);
99+
Column::String(column)
100+
}
101+
Column::Timestamp(column) => {
102+
let ts = Self::take_ranges_primitive_types(column, ranges, num_rows);
103+
Column::Timestamp(ts)
104+
}
105+
Column::Date(column) => {
106+
let d = Self::take_ranges_primitive_types(column, ranges, num_rows);
107+
Column::Date(d)
108+
}
109+
Column::Array(column) => {
110+
let mut offsets = Vec::with_capacity(num_rows + 1);
111+
offsets.push(0);
112+
let builder = ColumnBuilder::with_capacity(&column.values.data_type(), num_rows);
113+
let builder = ArrayColumnBuilder { builder, offsets };
114+
Self::take_ranges_scalar_types::<ArrayType<AnyType>>(
115+
column, builder, ranges, num_rows,
116+
)
117+
}
118+
Column::Map(column) => {
119+
let mut offsets = Vec::with_capacity(num_rows + 1);
120+
offsets.push(0);
121+
let builder = ColumnBuilder::from_column(
122+
ColumnBuilder::with_capacity(&column.values.data_type(), num_rows).build(),
123+
);
124+
let (key_builder, val_builder) = match builder {
125+
ColumnBuilder::Tuple(fields) => (fields[0].clone(), fields[1].clone()),
126+
_ => unreachable!(),
127+
};
128+
let builder = KvColumnBuilder {
129+
keys: key_builder,
130+
values: val_builder,
131+
};
132+
let builder = ArrayColumnBuilder { builder, offsets };
133+
let column = ArrayColumn::try_downcast(column).unwrap();
134+
Self::take_ranges_scalar_types::<MapType<AnyType, AnyType>>(
135+
&column, builder, ranges, num_rows,
136+
)
137+
}
138+
Column::Bitmap(column) => {
139+
let column = Self::take_ranges_string_types(column, ranges, num_rows);
140+
Column::Bitmap(column)
141+
}
142+
143+
Column::Nullable(c) => {
144+
let column = Self::take_ranges(&c.column, ranges, num_rows);
145+
let validity = Self::take_ranges_boolean_types(&c.validity, ranges, num_rows);
146+
Column::Nullable(Box::new(NullableColumn { column, validity }))
147+
}
148+
Column::Tuple(fields) => {
149+
let fields = fields
150+
.iter()
151+
.map(|c| c.take_ranges(ranges, num_rows))
152+
.collect();
153+
Column::Tuple(fields)
154+
}
155+
Column::Variant(column) => {
156+
let column = Self::take_ranges_string_types(column, ranges, num_rows);
157+
Column::Variant(column)
158+
}
159+
}
160+
}
161+
162+
fn take_ranges_scalar_types<T: ValueType>(
163+
col: &T::Column,
164+
mut builder: T::ColumnBuilder,
165+
ranges: &[Range<u32>],
166+
_num_rows: usize,
167+
) -> Column {
168+
for range in ranges {
169+
for index in range.start as usize..range.end as usize {
170+
T::push_item(&mut builder, unsafe {
171+
T::index_column_unchecked(col, index)
172+
});
173+
}
174+
}
175+
T::upcast_column(T::build_column(builder))
176+
}
177+
178+
fn take_ranges_primitive_types<T: Copy>(
179+
values: &Buffer<T>,
180+
ranges: &[Range<u32>],
181+
num_rows: usize,
182+
) -> Buffer<T> {
183+
let mut builder: Vec<T> = Vec::with_capacity(num_rows);
184+
for range in ranges {
185+
builder.extend(&values[range.start as usize..range.end as usize]);
186+
}
187+
builder.into()
188+
}
189+
190+
fn take_ranges_string_types(
191+
values: &StringColumn,
192+
ranges: &[Range<u32>],
193+
num_rows: usize,
194+
) -> StringColumn {
195+
let mut offsets: Vec<u64> = Vec::with_capacity(num_rows + 1);
196+
let mut offsets_len = 0;
197+
let mut data_size = 0;
198+
199+
let value_data = values.data().as_slice();
200+
let values_offset = values.offsets().as_slice();
201+
// Build [`offset`] and calculate `data_size` required by [`data`].
202+
unsafe {
203+
*offsets.get_unchecked_mut(offsets_len) = 0;
204+
offsets_len += 1;
205+
for range in ranges {
206+
let mut offset_start = values_offset[range.start as usize];
207+
for offset_end in
208+
values_offset[range.start as usize + 1..range.end as usize + 1].iter()
209+
{
210+
data_size += offset_end - offset_start;
211+
offset_start = *offset_end;
212+
*offsets.get_unchecked_mut(offsets_len) = data_size;
213+
offsets_len += 1;
214+
}
215+
}
216+
offsets.set_len(offsets_len);
217+
}
218+
219+
// Build [`data`].
220+
let mut data: Vec<u8> = Vec::with_capacity(data_size as usize);
221+
let mut data_ptr = data.as_mut_ptr();
222+
223+
unsafe {
224+
for range in ranges {
225+
let col_data = &value_data[values_offset[range.start as usize] as usize
226+
..values_offset[range.end as usize] as usize];
227+
copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len());
228+
}
229+
set_vec_len_by_ptr(&mut data, data_ptr);
230+
}
231+
232+
StringColumn::new(data.into(), offsets.into())
233+
}
234+
235+
fn take_ranges_boolean_types(
236+
bitmap: &Bitmap,
237+
ranges: &[Range<u32>],
238+
num_rows: usize,
239+
) -> Bitmap {
240+
let capacity = num_rows.saturating_add(7) / 8;
241+
let mut builder: Vec<u8> = Vec::with_capacity(capacity);
242+
let mut builder_ptr = builder.as_mut_ptr();
243+
let mut builder_idx = 0;
244+
let mut unset_bits = 0;
245+
let mut buf = 0;
246+
247+
let (bitmap_slice, bitmap_offset, _) = bitmap.as_slice();
248+
unsafe {
249+
for range in ranges {
250+
let mut start = range.start as usize;
251+
let end = range.end as usize;
252+
if builder_idx % 8 != 0 {
253+
while start < end {
254+
if bitmap.get_bit_unchecked(start) {
255+
buf |= BIT_MASK[builder_idx % 8];
256+
} else {
257+
unset_bits += 1;
258+
}
259+
builder_idx += 1;
260+
start += 1;
261+
if builder_idx % 8 == 0 {
262+
store_advance_aligned(buf, &mut builder_ptr);
263+
buf = 0;
264+
break;
265+
}
266+
}
267+
}
268+
let remaining = end - start;
269+
if remaining > 0 {
270+
let (cur_buf, cur_unset_bits) = Self::copy_continuous_bits(
271+
&mut builder_ptr,
272+
bitmap_slice,
273+
builder_idx,
274+
start + bitmap_offset,
275+
remaining,
276+
);
277+
builder_idx += remaining;
278+
unset_bits += cur_unset_bits;
279+
buf = cur_buf;
280+
}
281+
}
282+
283+
if builder_idx % 8 != 0 {
284+
store_advance_aligned(buf, &mut builder_ptr);
285+
}
286+
287+
set_vec_len_by_ptr(&mut builder, builder_ptr);
288+
Bitmap::from_inner(Arc::new(builder.into()), 0, num_rows, unset_bits)
289+
.ok()
290+
.unwrap()
291+
}
292+
}
293+
}

src/query/expression/tests/it/kernel.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use core::ops::Range;
16+
1517
use common_expression::block_debug::assert_block_value_eq;
1618
use common_expression::types::number::*;
1719
use common_expression::types::DataType;
@@ -202,7 +204,23 @@ pub fn test_pass() {
202204
);
203205
}
204206

205-
/// This test covers take.rs, take_chunks.rs, take_compact.rs, filter.rs, concat.rs.
207+
// Build a range selection from a selection array.
208+
pub fn build_range_selection(selection: &[u32], count: usize) -> Vec<Range<u32>> {
209+
let mut range_selection = Vec::with_capacity(count);
210+
let mut start = selection[0];
211+
let mut idx = 1;
212+
while idx < count {
213+
if selection[idx] != selection[idx - 1] + 1 {
214+
range_selection.push(start..selection[idx - 1] + 1);
215+
start = selection[idx];
216+
}
217+
idx += 1;
218+
}
219+
range_selection.push(start..selection[count - 1] + 1);
220+
range_selection
221+
}
222+
223+
/// This test covers take.rs, take_chunks.rs, take_compact.rs, take_ranges.rs, filter.rs, concat.rs.
206224
#[test]
207225
pub fn test_take_and_filter_and_concat() -> common_exception::Result<()> {
208226
use common_expression::types::DataType;
@@ -286,25 +304,34 @@ pub fn test_take_and_filter_and_concat() -> common_exception::Result<()> {
286304
&mut None,
287305
);
288306
let block_4 = DataBlock::concat(&filtered_blocks)?;
307+
let block_5 = concated_blocks.take_ranges(
308+
&build_range_selection(&take_indices, take_indices.len()),
309+
take_indices.len(),
310+
)?;
289311

290312
assert_eq!(block_1.num_columns(), block_2.num_columns());
291313
assert_eq!(block_1.num_rows(), block_2.num_rows());
292314
assert_eq!(block_1.num_columns(), block_3.num_columns());
293315
assert_eq!(block_1.num_rows(), block_3.num_rows());
294316
assert_eq!(block_1.num_columns(), block_4.num_columns());
295317
assert_eq!(block_1.num_rows(), block_4.num_rows());
318+
assert_eq!(block_1.num_columns(), block_5.num_columns());
319+
assert_eq!(block_1.num_rows(), block_5.num_rows());
296320

297321
let columns_1 = block_1.columns();
298322
let columns_2 = block_2.columns();
299323
let columns_3 = block_3.columns();
300324
let columns_4 = block_4.columns();
325+
let columns_5 = block_5.columns();
301326
for idx in 0..columns_1.len() {
302327
assert_eq!(columns_1[idx].data_type, columns_2[idx].data_type);
303328
assert_eq!(columns_1[idx].value, columns_2[idx].value);
304329
assert_eq!(columns_1[idx].data_type, columns_3[idx].data_type);
305330
assert_eq!(columns_1[idx].value, columns_3[idx].value);
306331
assert_eq!(columns_1[idx].data_type, columns_4[idx].data_type);
307332
assert_eq!(columns_1[idx].value, columns_4[idx].value);
333+
assert_eq!(columns_1[idx].data_type, columns_5[idx].data_type);
334+
assert_eq!(columns_1[idx].value, columns_5[idx].value);
308335
}
309336

310337
Ok(())

0 commit comments

Comments
 (0)