Skip to content

Commit 38d8bab

Browse files
committed
refactor
1 parent fbb7592 commit 38d8bab

File tree

9 files changed

+266
-302
lines changed

9 files changed

+266
-302
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/expression/src/converts/arrow/to.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,7 @@ impl DataBlock {
170170
impl Column {
171171
pub fn into_arrow_rs(self) -> Arc<dyn arrow_array::Array> {
172172
let arrow2_array: Box<dyn databend_common_arrow::arrow::array::Array> = self.as_arrow();
173-
let arrow_array: Arc<dyn arrow_array::Array> = arrow2_array.into();
174-
arrow_array
173+
arrow2_array.into()
175174
}
176175
}
177176

@@ -200,7 +199,7 @@ fn arrow_field_from_arrow2_field(field: Arrow2Field) -> ArrowField {
200199
Arrow2DataType::Struct(f) => {
201200
ArrowDataType::Struct(f.into_iter().map(arrow_field_from_arrow2_field).collect())
202201
}
203-
other => other.into(),
202+
other => ArrowDataType::from(other),
204203
};
205204

206205
ArrowField::new(field.name, data_type, field.is_nullable).with_metadata(metadata)

src/query/storages/common/blocks/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ doctest = false
1212
test = true
1313

1414
[dependencies]
15+
arrow-array = { workspace = true }
16+
arrow-buffer = { workspace = true }
17+
arrow-schema = { workspace = true }
1518
databend-common-exception = { path = "../../../../common/exception" }
1619
databend-common-expression = { path = "../../../expression" }
1720
databend-storages-common-table-meta = { path = "../table_meta" }
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use arrow_array::types::ByteArrayType;
18+
use arrow_array::types::LargeBinaryType;
19+
use arrow_array::types::LargeUtf8Type;
20+
use arrow_array::Array;
21+
use arrow_array::GenericByteArray;
22+
use arrow_schema::DataType as ArrowDataType;
23+
use databend_common_exception::Result;
24+
use databend_storages_common_table_meta::meta::ColumnStatistics;
25+
use parquet_rs::basic::Encoding;
26+
use parquet_rs::file::properties::WriterPropertiesBuilder;
27+
use parquet_rs::schema::types::ColumnPath;
28+
29+
const ROWS_PER_DISTINCT_THRESHOLD: f64 = 10.0;
30+
const SAMPLE_ROWS: usize = 1000;
31+
const AVERAGE_PREFIX_LEN_THRESHOLD: f64 = 8.0;
32+
33+
pub fn choose_byte_array_encoding(
34+
mut props: WriterPropertiesBuilder,
35+
stat: Option<&ColumnStatistics>,
36+
array: Arc<dyn Array>,
37+
column_name: &str,
38+
) -> Result<WriterPropertiesBuilder> {
39+
if array.is_empty() {
40+
return Ok(props);
41+
}
42+
let col_path = ColumnPath::new(vec![column_name.to_string()]);
43+
let ndv = stat.as_ref().and_then(|s| s.distinct_of_values);
44+
let num_rows = array.len();
45+
if let Some(ndv) = ndv {
46+
if num_rows as f64 / ndv as f64 > ROWS_PER_DISTINCT_THRESHOLD {
47+
props = props.set_column_dictionary_enabled(col_path, true);
48+
return Ok(props);
49+
}
50+
}
51+
let data_type = array.data_type();
52+
match data_type {
53+
ArrowDataType::LargeBinary => {
54+
if can_apply_delta_byte_array::<LargeBinaryType>(&array)? {
55+
props = props.set_column_encoding(col_path, Encoding::DELTA_BYTE_ARRAY);
56+
return Ok(props);
57+
}
58+
}
59+
ArrowDataType::LargeUtf8 => {
60+
if can_apply_delta_byte_array::<LargeUtf8Type>(&array)? {
61+
props = props.set_column_encoding(col_path, Encoding::DELTA_BYTE_ARRAY);
62+
return Ok(props);
63+
}
64+
}
65+
_ => {}
66+
};
67+
props = props.set_column_encoding(col_path, Encoding::DELTA_LENGTH_BYTE_ARRAY);
68+
Ok(props)
69+
}
70+
71+
fn can_apply_delta_byte_array<T: ByteArrayType>(array: &dyn Array) -> Result<bool> {
72+
let num_rows = array.len();
73+
let array = array
74+
.as_any()
75+
.downcast_ref::<GenericByteArray<T>>()
76+
.unwrap();
77+
let mut sum_prefix_len = 0;
78+
for i in 1..num_rows.min(SAMPLE_ROWS) {
79+
let last: &[u8] = array.value(i - 1).as_ref();
80+
let cur: &[u8] = array.value(i).as_ref();
81+
let prefix_len = last
82+
.iter()
83+
.zip(cur.iter())
84+
.take_while(|(a, b)| a == b)
85+
.count();
86+
sum_prefix_len += prefix_len;
87+
}
88+
let avg_prefix_len = sum_prefix_len as f64 / num_rows as f64;
89+
Ok(avg_prefix_len > AVERAGE_PREFIX_LEN_THRESHOLD)
90+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_expression::DataBlock;
17+
use databend_common_expression::TableSchema;
18+
use databend_storages_common_table_meta::meta::StatisticsOfColumns;
19+
use parquet_rs::basic::Type as PhysicalType;
20+
use parquet_rs::file::properties::WriterPropertiesBuilder;
21+
use parquet_rs::schema::types::Type;
22+
use parquet_rs::schema::types::TypePtr;
23+
24+
use super::byte_array::choose_byte_array_encoding;
25+
use super::int::choose_int_encoding;
26+
27+
pub fn choose_codec(
28+
mut props: WriterPropertiesBuilder,
29+
block: &DataBlock,
30+
parquet_fields: &[TypePtr],
31+
table_schema: &TableSchema,
32+
stat: &StatisticsOfColumns,
33+
) -> Result<WriterPropertiesBuilder> {
34+
for ((parquet_field, table_field), entry) in parquet_fields
35+
.iter()
36+
.zip(table_schema.fields.iter())
37+
.zip(block.columns())
38+
{
39+
let column = entry.to_column(block.num_rows());
40+
let array = column.into_arrow_rs();
41+
let stat = stat.get(&table_field.column_id);
42+
let column_name = table_field.name.as_str();
43+
match parquet_field.as_ref() {
44+
Type::PrimitiveType { physical_type, .. } => match physical_type {
45+
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
46+
props = choose_byte_array_encoding(props, stat, array, column_name)?;
47+
}
48+
PhysicalType::INT32 | PhysicalType::INT64 | PhysicalType::INT96 => {
49+
props = choose_int_encoding(props, stat, array, column_name)?
50+
}
51+
_ => {}
52+
},
53+
Type::GroupType {
54+
basic_info: _,
55+
fields: _,
56+
} => {} // TODO: handle nested fields
57+
}
58+
}
59+
Ok(props)
60+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use arrow_array::types::ArrowPrimitiveType;
18+
use arrow_array::types::Date32Type;
19+
use arrow_array::types::Decimal128Type;
20+
use arrow_array::types::Decimal256Type;
21+
use arrow_array::types::Int32Type;
22+
use arrow_array::types::Int64Type;
23+
use arrow_array::types::TimestampMicrosecondType;
24+
use arrow_array::types::UInt32Type;
25+
use arrow_array::types::UInt64Type;
26+
use arrow_array::Array;
27+
use arrow_array::ArrowNativeTypeOp;
28+
use arrow_array::PrimitiveArray;
29+
use arrow_buffer::ArrowNativeType;
30+
use arrow_schema::DataType as ArrowDataType;
31+
use arrow_schema::TimeUnit;
32+
use databend_common_exception::Result;
33+
use databend_storages_common_table_meta::meta::ColumnStatistics;
34+
use parquet_rs::basic::Encoding;
35+
use parquet_rs::file::properties::WriterPropertiesBuilder;
36+
use parquet_rs::schema::types::ColumnPath;
37+
38+
const MAX_WIDTH_THRESHOLD: i64 = 3;
39+
40+
pub fn choose_int_encoding(
41+
mut props: WriterPropertiesBuilder,
42+
_stat: Option<&ColumnStatistics>,
43+
array: Arc<dyn Array>,
44+
column_name: &str,
45+
) -> Result<WriterPropertiesBuilder> {
46+
if array.is_empty() {
47+
return Ok(props);
48+
}
49+
let col_path = ColumnPath::new(vec![column_name.to_string()]);
50+
let data_type = array.data_type();
51+
let apply_delta = match data_type {
52+
ArrowDataType::Int32 => can_apply_delta_binary_pack::<Int32Type>(&array)?,
53+
ArrowDataType::Int64 => can_apply_delta_binary_pack::<Int64Type>(&array)?,
54+
ArrowDataType::UInt32 => can_apply_delta_binary_pack::<UInt32Type>(&array)?,
55+
ArrowDataType::UInt64 => can_apply_delta_binary_pack::<UInt64Type>(&array)?,
56+
ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
57+
can_apply_delta_binary_pack::<TimestampMicrosecondType>(&array)?
58+
}
59+
ArrowDataType::Date32 => can_apply_delta_binary_pack::<Date32Type>(&array)?,
60+
ArrowDataType::Decimal128(_, _) => can_apply_delta_binary_pack::<Decimal128Type>(&array)?,
61+
ArrowDataType::Decimal256(_, _) => can_apply_delta_binary_pack::<Decimal256Type>(&array)?,
62+
_ => false,
63+
};
64+
if apply_delta {
65+
props = props.set_column_encoding(col_path, Encoding::DELTA_BINARY_PACKED);
66+
}
67+
Ok(props)
68+
}
69+
70+
fn can_apply_delta_binary_pack<T: ArrowPrimitiveType>(array: &dyn Array) -> Result<bool> {
71+
let mut max_delta = T::Native::MIN_TOTAL_ORDER;
72+
let mut min_delta = T::Native::MAX_TOTAL_ORDER;
73+
let array = array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
74+
for i in 1..array.len() {
75+
let delta = array.value(i).sub_wrapping(array.value(i - 1));
76+
if delta.is_gt(max_delta) {
77+
max_delta = delta;
78+
}
79+
if delta.is_lt(min_delta) {
80+
min_delta = delta;
81+
}
82+
}
83+
let x = max_delta.sub_wrapping(min_delta).as_usize();
84+
Ok(x <= (1 << MAX_WIDTH_THRESHOLD))
85+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod byte_array;
16+
mod choose;
17+
mod int;
18+
19+
pub use choose::choose_codec;

src/query/storages/common/blocks/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@
1414

1515
#![allow(clippy::uninlined_format_args)]
1616

17+
mod codec;
1718
mod parquet_rs;
1819
pub use parquet_rs::blocks_to_parquet;

0 commit comments

Comments
 (0)