Skip to content

Commit 4283b2d

Browse files
authored
chore(query): introducing ColumnView provides a new abstraction that unifies access to Column and const Column (#18260)
1 parent 28dd7a3 commit 4283b2d

File tree

86 files changed

+1049
-868
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+1049
-868
lines changed

src/query/ee/tests/it/ngram_index/index_refresh.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,16 +115,15 @@ async fn get_block_filter(fixture: &TestFixture, columns: &[String]) -> Result<B
115115
.await
116116
.transpose()?
117117
.unwrap();
118-
let path = block.columns()[0].to_column().remove_nullable();
119-
let path_scalar = path.as_string().unwrap().index(0).unwrap();
120-
let length = block.columns()[1].to_column();
121-
let length_scalar = length.as_number().unwrap().index(0).unwrap();
118+
let path = block.get_by_offset(0).index(0).unwrap();
119+
let path = *path.as_string().unwrap();
120+
let length = block.get_by_offset(1).index(0).unwrap();
122121

123122
load_bloom_filter_by_columns(
124123
DataOperator::instance().operator(),
125124
columns,
126-
path_scalar,
127-
*length_scalar.as_u_int64().unwrap(),
125+
path,
126+
*length.as_number().unwrap().as_u_int64().unwrap(),
128127
)
129128
.await
130129
}

src/query/expression/src/aggregate/aggregate_function.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ use super::AggrState;
2222
use super::AggrStateLoc;
2323
use super::AggrStateRegistry;
2424
use super::StateAddr;
25-
use crate::types::BinaryColumn;
25+
use crate::types::BinaryType;
2626
use crate::types::DataType;
27-
use crate::Column;
27+
use crate::BlockEntry;
2828
use crate::ColumnBuilder;
29-
use crate::InputColumns;
29+
use crate::ColumnView;
30+
use crate::ProjectedBlock;
3031
use crate::Scalar;
3132

3233
pub type AggregateFunctionRef = Arc<dyn AggregateFunction>;
@@ -46,7 +47,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
4647
fn accumulate(
4748
&self,
4849
place: AggrState,
49-
columns: InputColumns,
50+
columns: ProjectedBlock,
5051
validity: Option<&Bitmap>,
5152
input_rows: usize,
5253
) -> Result<()>;
@@ -56,7 +57,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
5657
&self,
5758
addrs: &[StateAddr],
5859
loc: &[AggrStateLoc],
59-
columns: InputColumns,
60+
columns: ProjectedBlock,
6061
_input_rows: usize,
6162
) -> Result<()> {
6263
for (row, addr) in addrs.iter().enumerate() {
@@ -66,7 +67,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
6667
}
6768

6869
// Used in aggregate_null_adaptor
69-
fn accumulate_row(&self, place: AggrState, columns: InputColumns, row: usize) -> Result<()>;
70+
fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()>;
7071

7172
fn serialize(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()>;
7273

@@ -81,7 +82,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
8182
&self,
8283
places: &[StateAddr],
8384
loc: &[AggrStateLoc],
84-
state: &BinaryColumn,
85+
state: &ColumnView<BinaryType>,
8586
) -> Result<()> {
8687
for (place, mut data) in places.iter().zip(state.iter()) {
8788
self.merge(AggrState::new(*place, loc), &mut data)?;
@@ -90,9 +91,9 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
9091
Ok(())
9192
}
9293

93-
fn batch_merge_single(&self, place: AggrState, state: &Column) -> Result<()> {
94-
let c = state.as_binary().unwrap();
95-
for mut data in c.iter() {
94+
fn batch_merge_single(&self, place: AggrState, state: &BlockEntry) -> Result<()> {
95+
let view = state.downcast::<BinaryType>().unwrap();
96+
for mut data in view.iter() {
9697
self.merge(place, &mut data)?;
9798
}
9899
Ok(())
@@ -145,7 +146,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
145146
Ok(None)
146147
}
147148

148-
fn get_if_condition(&self, _columns: InputColumns) -> Option<Bitmap> {
149+
fn get_if_condition(&self, _columns: ProjectedBlock) -> Option<Bitmap> {
149150
None
150151
}
151152

src/query/expression/src/aggregate/aggregate_hashtable.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@ use crate::aggregate::payload_row::row_match_columns;
2727
use crate::group_hash_columns;
2828
use crate::new_sel;
2929
use crate::read;
30+
use crate::types::BinaryType;
3031
use crate::types::DataType;
3132
use crate::AggregateFunctionRef;
32-
use crate::Column;
33+
use crate::BlockEntry;
3334
use crate::ColumnBuilder;
3435
use crate::HashTableConfig;
35-
use crate::InputColumns;
3636
use crate::Payload;
37+
use crate::ProjectedBlock;
3738
use crate::StateAddr;
3839
use crate::BATCH_SIZE;
3940
use crate::LOAD_FACTOR;
@@ -129,9 +130,9 @@ impl AggregateHashTable {
129130
pub fn add_groups(
130131
&mut self,
131132
state: &mut ProbeState,
132-
group_columns: InputColumns,
133-
params: &[InputColumns],
134-
agg_states: InputColumns,
133+
group_columns: ProjectedBlock,
134+
params: &[ProjectedBlock],
135+
agg_states: ProjectedBlock,
135136
row_count: usize,
136137
) -> Result<usize> {
137138
if row_count <= BATCH_ADD_SIZE {
@@ -142,13 +143,13 @@ impl AggregateHashTable {
142143
let end = (start + BATCH_ADD_SIZE).min(row_count);
143144
let step_group_columns = group_columns
144145
.iter()
145-
.map(|c| c.slice(start..end))
146+
.map(|entry| entry.slice(start..end))
146147
.collect::<Vec<_>>();
147148

148-
let step_params: Vec<Vec<Column>> = params
149+
let step_params: Vec<Vec<BlockEntry>> = params
149150
.iter()
150151
.map(|c| c.iter().map(|x| x.slice(start..end)).collect())
151-
.collect::<Vec<_>>();
152+
.collect();
152153
let step_params = step_params.iter().map(|v| v.into()).collect::<Vec<_>>();
153154
let agg_states = agg_states
154155
.iter()
@@ -171,9 +172,9 @@ impl AggregateHashTable {
171172
fn add_groups_inner(
172173
&mut self,
173174
state: &mut ProbeState,
174-
group_columns: InputColumns,
175-
params: &[InputColumns],
176-
agg_states: InputColumns,
175+
group_columns: ProjectedBlock,
176+
params: &[ProjectedBlock],
177+
agg_states: ProjectedBlock,
177178
row_count: usize,
178179
) -> Result<usize> {
179180
state.row_count = row_count;
@@ -218,7 +219,7 @@ impl AggregateHashTable {
218219
.zip(agg_states.iter())
219220
.zip(states_layout.states_loc.iter())
220221
{
221-
func.batch_merge(state_places, loc, state.as_binary().unwrap())?;
222+
func.batch_merge(state_places, loc, &state.downcast::<BinaryType>().unwrap())?;
222223
}
223224
}
224225
}
@@ -245,7 +246,7 @@ impl AggregateHashTable {
245246
fn probe_and_create(
246247
&mut self,
247248
state: &mut ProbeState,
248-
group_columns: InputColumns,
249+
group_columns: ProjectedBlock,
249250
row_count: usize,
250251
) -> usize {
251252
// exceed capacity or should resize
@@ -444,7 +445,7 @@ impl AggregateHashTable {
444445
loc,
445446
&mut builder,
446447
)?;
447-
flush_state.aggregate_results.push(builder.build());
448+
flush_state.aggregate_results.push(builder.build().into());
448449
}
449450
}
450451
Ok(true)

src/query/expression/src/aggregate/group_hash.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,20 @@ use crate::with_decimal_mapped_type;
5151
use crate::with_number_mapped_type;
5252
use crate::with_number_type;
5353
use crate::Column;
54-
use crate::InputColumns;
54+
use crate::ProjectedBlock;
5555
use crate::Scalar;
5656
use crate::ScalarRef;
5757
use crate::Value;
5858

5959
const NULL_HASH_VAL: u64 = 0xd1cefa08eb382d69;
6060

61-
pub fn group_hash_columns(cols: InputColumns, values: &mut [u64]) {
61+
pub fn group_hash_columns(cols: ProjectedBlock, values: &mut [u64]) {
6262
debug_assert!(!cols.is_empty());
63-
for (i, col) in cols.iter().enumerate() {
63+
for (i, entry) in cols.iter().enumerate() {
6464
if i == 0 {
65-
combine_group_hash_column::<true>(col, values);
65+
combine_group_hash_column::<true>(&entry.to_column(), values);
6666
} else {
67-
combine_group_hash_column::<false>(col, values);
67+
combine_group_hash_column::<false>(&entry.to_column(), values);
6868
}
6969
}
7070
}

src/query/expression/src/aggregate/partitioned_payload.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use crate::get_states_layout;
2323
use crate::read;
2424
use crate::types::DataType;
2525
use crate::AggregateFunctionRef;
26-
use crate::InputColumns;
2726
use crate::PayloadFlushState;
27+
use crate::ProjectedBlock;
2828
use crate::StatesLayout;
2929
use crate::BATCH_SIZE;
3030

@@ -111,7 +111,7 @@ impl PartitionedPayload {
111111
&mut self,
112112
state: &mut ProbeState,
113113
new_group_rows: usize,
114-
group_columns: InputColumns,
114+
group_columns: ProjectedBlock,
115115
) {
116116
if self.payloads.len() == 1 {
117117
self.payloads[0].reserve_append_rows(

src/query/expression/src/aggregate/payload.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::Arc;
1717

1818
use bumpalo::Bump;
1919
use databend_common_base::runtime::drop_guard;
20-
use itertools::Itertools;
2120
use log::info;
2221
use strength_reduce::StrengthReducedU64;
2322

@@ -31,8 +30,8 @@ use crate::AggregateFunctionRef;
3130
use crate::Column;
3231
use crate::ColumnBuilder;
3332
use crate::DataBlock;
34-
use crate::InputColumns;
3533
use crate::PayloadFlushState;
34+
use crate::ProjectedBlock;
3635
use crate::SelectVector;
3736
use crate::StateAddr;
3837
use crate::StatesLayout;
@@ -202,7 +201,7 @@ impl Payload {
202201
address: &mut [*const u8],
203202
page_index: &mut [usize],
204203
new_group_rows: usize,
205-
group_columns: InputColumns,
204+
group_columns: ProjectedBlock,
206205
) {
207206
let tuple_size = self.tuple_size;
208207
let (mut page, mut page_index_value) = self.writable_page();
@@ -233,13 +232,13 @@ impl Payload {
233232
address: &mut [*const u8],
234233
page_index: &mut [usize],
235234
new_group_rows: usize,
236-
group_columns: InputColumns,
235+
group_columns: ProjectedBlock,
237236
) {
238237
let mut write_offset = 0;
239238
// write validity
240-
for col in group_columns.iter() {
241-
if let Column::Nullable(c) = col {
242-
let bitmap = &c.validity;
239+
for entry in group_columns.iter() {
240+
if let Column::Nullable(c) = entry.to_column() {
241+
let bitmap = c.validity();
243242
if bitmap.null_count() == 0 || bitmap.null_count() == bitmap.len() {
244243
let val: u8 = if bitmap.null_count() == 0 { 1 } else { 0 };
245244
// faster path
@@ -262,13 +261,13 @@ impl Payload {
262261
}
263262

264263
let mut scratch = vec![];
265-
for (idx, col) in group_columns.iter().enumerate() {
264+
for (idx, entry) in group_columns.iter().enumerate() {
266265
debug_assert!(write_offset == self.group_offsets[idx]);
267266

268267
unsafe {
269268
serialize_column_to_rowformat(
270269
&self.arena,
271-
col,
270+
&entry.to_column(),
272271
select_vector,
273272
new_group_rows,
274273
address,
@@ -424,15 +423,19 @@ impl Payload {
424423

425424
pub fn empty_block(&self, fake_rows: Option<usize>) -> DataBlock {
426425
let fake_rows = fake_rows.unwrap_or(0);
427-
let columns = (0..self.aggrs.len())
428-
.map(|_| ColumnBuilder::repeat_default(&DataType::Binary, fake_rows).build())
426+
let entries = (0..self.aggrs.len())
427+
.map(|_| {
428+
ColumnBuilder::repeat_default(&DataType::Binary, fake_rows)
429+
.build()
430+
.into()
431+
})
429432
.chain(
430433
self.group_types
431434
.iter()
432-
.map(|t| ColumnBuilder::repeat_default(t, fake_rows).build()),
435+
.map(|t| ColumnBuilder::repeat_default(t, fake_rows).build().into()),
433436
)
434-
.collect_vec();
435-
DataBlock::new_from_columns(columns)
437+
.collect();
438+
DataBlock::new(entries, fake_rows)
436439
}
437440
}
438441

src/query/expression/src/aggregate/payload_flush.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::types::NumberType;
4040
use crate::types::ReturnType;
4141
use crate::types::TimestampType;
4242
use crate::with_number_mapped_type;
43+
use crate::BlockEntry;
4344
use crate::Column;
4445
use crate::ColumnBuilder;
4546
use crate::DataBlock;
@@ -49,8 +50,8 @@ use crate::BATCH_SIZE;
4950

5051
pub struct PayloadFlushState {
5152
pub probe_state: ProbeState,
52-
pub group_columns: Vec<Column>,
53-
pub aggregate_results: Vec<Column>,
53+
pub group_columns: Vec<BlockEntry>,
54+
pub(super) aggregate_results: Vec<BlockEntry>,
5455
pub row_count: usize,
5556

5657
pub flush_partition: usize,
@@ -88,10 +89,10 @@ impl PayloadFlushState {
8889
self.flush_page_row = 0;
8990
}
9091

91-
pub fn take_group_columns(&mut self) -> Vec<Column> {
92+
pub fn take_group_columns(&mut self) -> Vec<BlockEntry> {
9293
std::mem::take(&mut self.group_columns)
9394
}
94-
pub fn take_aggregate_results(&mut self) -> Vec<Column> {
95+
pub fn take_aggregate_results(&mut self) -> Vec<BlockEntry> {
9596
std::mem::take(&mut self.aggregate_results)
9697
}
9798
}
@@ -136,7 +137,7 @@ impl Payload {
136137

137138
let row_count = state.row_count;
138139

139-
let mut cols = Vec::with_capacity(self.aggrs.len() + self.group_types.len());
140+
let mut entries = Vec::with_capacity(self.aggrs.len() + self.group_types.len());
140141
if let Some(state_layout) = self.states_layout.as_ref() {
141142
let mut builders = state_layout.serialize_builders(row_count);
142143

@@ -155,24 +156,24 @@ impl Payload {
155156
}
156157
}
157158

158-
cols.extend(
159+
entries.extend(
159160
builders
160161
.into_iter()
161-
.map(|builder| Column::Binary(builder.build())),
162+
.map(|builder| Column::Binary(builder.build()).into()),
162163
);
163164
}
164165

165-
cols.extend_from_slice(&state.take_group_columns());
166-
Ok(Some(DataBlock::new_from_columns(cols)))
166+
entries.extend_from_slice(&state.take_group_columns());
167+
Ok(Some(DataBlock::new(entries, row_count)))
167168
}
168169

169170
pub fn group_by_flush_all(&self) -> Result<DataBlock> {
170171
let mut state = PayloadFlushState::default();
171172
let mut blocks = vec![];
172173

173174
while self.flush(&mut state) {
174-
let cols = state.take_group_columns();
175-
blocks.push(DataBlock::new_from_columns(cols));
175+
let entries = state.take_group_columns();
176+
blocks.push(DataBlock::new(entries, state.row_count));
176177
}
177178

178179
if blocks.is_empty() {
@@ -219,7 +220,7 @@ impl Payload {
219220

220221
for col_index in 0..self.group_types.len() {
221222
let col = self.flush_column(col_index, state);
222-
state.group_columns.push(col);
223+
state.group_columns.push(col.into());
223224
}
224225

225226
state.flush_page_row = end;

0 commit comments

Comments
 (0)