Skip to content

Commit d32bc16

Browse files
committed
feat: new settings fuse_parquet_read_batch_size
Which controls the bach size during deserializing of fuse parquet data block. The default value of this setting is 8192.
1 parent 407e425 commit d32bc16

File tree

10 files changed

+229
-140
lines changed

10 files changed

+229
-140
lines changed

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,6 +1264,13 @@ impl DefaultSettings {
12641264
scope: SettingScope::Both,
12651265
range: Some(SettingRange::Numeric(0..=1)),
12661266
}),
1267+
("fuse_parquet_read_batch_size", DefaultSettingValue {
1268+
value: UserSettingValue::UInt64(8192),
1269+
desc: "The batch size while deserializing fuse table with parquet storage format",
1270+
mode: SettingMode::Both,
1271+
scope: SettingScope::Both,
1272+
range: Some(SettingRange::Numeric(0..=1_000_000)),
1273+
}),
12671274
]);
12681275

12691276
Ok(Arc::new(DefaultSettings {

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -932,4 +932,8 @@ impl Settings {
932932
pub fn get_enable_block_stream_write(&self) -> Result<bool> {
933933
Ok(self.try_get_u64("enable_block_stream_write")? == 1)
934934
}
935+
936+
pub fn get_fuse_parquet_read_batch_size(&self) -> Result<usize> {
937+
Ok(self.try_get_u64("fuse_parquet_read_batch_size")? as usize)
938+
}
935939
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl AggIndexReader {
9797
self.index_id
9898
}
9999

100-
pub(super) fn apply_agg_info(&self, block: DataBlock) -> Result<DataBlock> {
100+
pub(super) fn apply_agg_info_to_block(&self, block: DataBlock) -> Result<DataBlock> {
101101
let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS);
102102

103103
// 1. Filter the block if there is a filter.
@@ -145,4 +145,11 @@ impl AggIndexReader {
145145
)),
146146
))
147147
}
148+
149+
pub(super) fn apply_agg_info(&self, block: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
150+
block
151+
.into_iter()
152+
.map(|block| self.apply_agg_info_to_block(block))
153+
.collect::<Result<_>>()
154+
}
148155
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::sync::Arc;
16+
use std::vec;
1617

1718
use databend_common_exception::Result;
1819
use databend_common_expression::DataBlock;
@@ -138,7 +139,7 @@ impl AggIndexReader {
138139
}
139140
}
140141

141-
pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result<DataBlock> {
142+
pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result<Vec<DataBlock>> {
142143
let mut all_columns_arrays = vec![];
143144

144145
for (index, column_node) in self.reader.project_column_nodes.iter().enumerate() {
@@ -148,9 +149,9 @@ impl AggIndexReader {
148149
all_columns_arrays.push(arrays);
149150
}
150151
if all_columns_arrays.is_empty() {
151-
return Ok(DataBlock::empty_with_schema(Arc::new(
152+
return Ok(vec![DataBlock::empty_with_schema(Arc::new(
152153
self.reader.data_schema(),
153-
)));
154+
))]);
154155
}
155156
debug_assert!(all_columns_arrays
156157
.iter()
@@ -166,7 +167,6 @@ impl AggIndexReader {
166167
let block = DataBlock::new_from_columns(columns);
167168
blocks.push(block);
168169
}
169-
let block = DataBlock::concat(&blocks)?;
170-
self.apply_agg_info(block)
170+
self.apply_agg_info(blocks)
171171
}
172172
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,17 @@ impl AggIndexReader {
113113
&self,
114114
part: PartInfoPtr,
115115
data: BlockReadResult,
116-
) -> Result<DataBlock> {
116+
batch_size: usize,
117+
) -> Result<Vec<DataBlock>> {
117118
let columns_chunks = data.columns_chunks()?;
118119
let part = FuseBlockPartInfo::from_part(&part)?;
119-
let block = self.reader.deserialize_parquet_chunks(
120+
let block = self.reader.deserialize_parquet_to_blocks(
120121
part.nums_rows,
121122
&part.columns_meta,
122123
columns_chunks,
123124
&part.compression,
124125
&part.location,
126+
batch_size,
125127
)?;
126128

127129
self.apply_agg_info(block)

src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use arrow_schema::Schema;
1919
use databend_common_expression::ColumnId;
2020
use databend_common_expression::TableSchema;
2121
use databend_storages_common_table_meta::meta::Compression;
22+
use itertools::Itertools;
2223
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
2324
use parquet::arrow::parquet_to_arrow_field_levels;
2425
use parquet::arrow::ArrowSchemaConverter;
@@ -34,7 +35,8 @@ pub fn column_chunks_to_record_batch(
3435
num_rows: usize,
3536
column_chunks: &HashMap<ColumnId, DataItem>,
3637
compression: &Compression,
37-
) -> databend_common_exception::Result<RecordBatch> {
38+
batch_size: usize,
39+
) -> databend_common_exception::Result<Vec<RecordBatch>> {
3840
let arrow_schema = Schema::from(original_schema);
3941
let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
4042

@@ -66,13 +68,17 @@ pub fn column_chunks_to_record_batch(
6668
ProjectionMask::leaves(&parquet_schema, projection_mask),
6769
Some(arrow_schema.fields()),
6870
)?;
69-
let mut record_reader = ParquetRecordBatchReader::try_new_with_row_groups(
71+
let record_reader = ParquetRecordBatchReader::try_new_with_row_groups(
7072
&field_levels,
7173
row_group.as_ref(),
72-
num_rows,
74+
batch_size,
7375
None,
7476
)?;
75-
let record = record_reader.next().unwrap()?;
76-
assert!(record_reader.next().is_none());
77-
Ok(record)
77+
78+
let records: Vec<_> = record_reader.try_collect()?;
79+
assert_eq!(
80+
num_rows,
81+
records.iter().map(|r| r.num_rows()).sum::<usize>()
82+
);
83+
Ok(records)
7884
}

src/query/storages/fuse/src/io/read/block/parquet/mod.rs

Lines changed: 90 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ mod adapter;
3535
mod deserialize;
3636

3737
pub use adapter::RowGroupImplBuilder;
38+
use databend_common_exception::Result;
3839
pub use deserialize::column_chunks_to_record_batch;
3940

4041
use crate::io::read::block::block_reader_merge_io::DataItem;
@@ -48,17 +49,41 @@ impl BlockReader {
4849
column_chunks: HashMap<ColumnId, DataItem>,
4950
compression: &Compression,
5051
block_path: &str,
51-
) -> databend_common_exception::Result<DataBlock> {
52+
) -> Result<DataBlock> {
53+
let mut blocks = self.deserialize_parquet_to_blocks(
54+
num_rows,
55+
column_metas,
56+
column_chunks,
57+
compression,
58+
block_path,
59+
num_rows,
60+
)?;
61+
// Defensive check: using `num_rows` as batch_size, expects only one block
62+
assert_eq!(blocks.len(), 1);
63+
Ok(blocks.pop().unwrap())
64+
}
65+
66+
pub(crate) fn deserialize_parquet_to_blocks(
67+
&self,
68+
num_rows: usize,
69+
column_metas: &HashMap<ColumnId, ColumnMeta>,
70+
column_chunks: HashMap<ColumnId, DataItem>,
71+
compression: &Compression,
72+
block_path: &str,
73+
batch_size: usize,
74+
) -> Result<Vec<DataBlock>> {
5275
if column_chunks.is_empty() {
53-
return self.build_default_values_block(num_rows);
76+
return Ok(vec![self.build_default_values_block(num_rows)?]);
5477
}
55-
let record_batch = column_chunks_to_record_batch(
78+
79+
let record_batches = column_chunks_to_record_batch(
5680
&self.original_schema,
5781
num_rows,
5882
&column_chunks,
5983
compression,
84+
batch_size,
6085
)?;
61-
let mut columns = Vec::with_capacity(self.projected_schema.fields.len());
86+
6287
let name_paths = column_name_paths(&self.projection, &self.original_schema);
6388

6489
let array_cache = if self.put_cache {
@@ -67,58 +92,71 @@ impl BlockReader {
6792
None
6893
};
6994

70-
for ((i, field), column_node) in self
71-
.projected_schema
72-
.fields
73-
.iter()
74-
.enumerate()
75-
.zip(self.project_column_nodes.iter())
76-
{
77-
let data_type = field.data_type().into();
78-
79-
// NOTE, there is something tricky here:
80-
// - `column_chunks` always contains data of leaf columns
81-
// - here we may processing a nested type field
82-
// - But, even if the field being processed is a field with multiple leaf columns
83-
// `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1],
84-
// even if we are getting data from `column_chunks` using a non-leaf
85-
// `column_id` of `projected_schema.fields`
86-
//
87-
// [^1]: Except in the current block, there is no data stored for the
88-
// corresponding field, and a default value has been declared for
89-
// the corresponding field.
90-
//
91-
// Yes, it is too obscure, we need to polish it later.
92-
93-
let value = match column_chunks.get(&field.column_id) {
94-
Some(DataItem::RawData(data)) => {
95-
// get the deserialized arrow array, which may be a nested array
96-
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
97-
if !column_node.is_nested {
98-
if let Some(cache) = &array_cache {
99-
let meta = column_metas.get(&field.column_id).unwrap();
100-
let (offset, len) = meta.offset_length();
101-
let key =
102-
TableDataCacheKey::new(block_path, field.column_id, offset, len);
103-
cache.insert(key.into(), (arrow_array.clone(), data.len()));
95+
let mut blocks = Vec::with_capacity(record_batches.len());
96+
97+
for record_batch in record_batches {
98+
let num_rows_record_batch = record_batch.num_rows();
99+
let mut columns = Vec::with_capacity(self.projected_schema.fields.len());
100+
for ((i, field), column_node) in self
101+
.projected_schema
102+
.fields
103+
.iter()
104+
.enumerate()
105+
.zip(self.project_column_nodes.iter())
106+
{
107+
let data_type = field.data_type().into();
108+
109+
// NOTE, there is something tricky here:
110+
// - `column_chunks` always contains data of leaf columns
111+
// - here we may processing a nested type field
112+
// - But, even if the field being processed is a field with multiple leaf columns
113+
// `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1],
114+
// even if we are getting data from `column_chunks` using a non-leaf
115+
// `column_id` of `projected_schema.fields`
116+
//
117+
// [^1]: Except in the current block, there is no data stored for the
118+
// corresponding field, and a default value has been declared for
119+
// the corresponding field.
120+
//
121+
// Yes, it is too obscure, we need to polish it later.
122+
123+
let value = match column_chunks.get(&field.column_id) {
124+
Some(DataItem::RawData(data)) => {
125+
// get the deserialized arrow array, which may be a nested array
126+
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
127+
if !column_node.is_nested {
128+
if let Some(cache) = &array_cache {
129+
let meta = column_metas.get(&field.column_id).unwrap();
130+
let (offset, len) = meta.offset_length();
131+
let key = TableDataCacheKey::new(
132+
block_path,
133+
field.column_id,
134+
offset,
135+
len,
136+
);
137+
cache.insert(key.into(), (arrow_array.clone(), data.len()));
138+
}
104139
}
140+
Value::from_arrow_rs(arrow_array, &data_type)?
105141
}
106-
Value::from_arrow_rs(arrow_array, &data_type)?
107-
}
108-
Some(DataItem::ColumnArray(cached)) => {
109-
if column_node.is_nested {
110-
// a defensive check, should never happen
111-
return Err(ErrorCode::StorageOther(
112-
"unexpected nested field: nested leaf field hits cached",
113-
));
142+
Some(DataItem::ColumnArray(cached)) => {
143+
// TODO this is NOT correct!
144+
if column_node.is_nested {
145+
// a defensive check, should never happen
146+
return Err(ErrorCode::StorageOther(
147+
"unexpected nested field: nested leaf field hits cached",
148+
));
149+
}
150+
Value::from_arrow_rs(cached.0.clone(), &data_type)?
114151
}
115-
Value::from_arrow_rs(cached.0.clone(), &data_type)?
116-
}
117-
None => Value::Scalar(self.default_vals[i].clone()),
118-
};
119-
columns.push(BlockEntry::new(data_type, value));
152+
None => Value::Scalar(self.default_vals[i].clone()),
153+
};
154+
columns.push(BlockEntry::new(data_type, value));
155+
}
156+
blocks.push(DataBlock::new(columns, num_rows_record_batch));
120157
}
121-
Ok(DataBlock::new(columns, num_rows))
158+
159+
Ok(blocks)
122160
}
123161
}
124162

src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
use std::collections::HashSet;
1616
use std::sync::Arc;
1717

18+
use arrow_array::RecordBatch;
19+
use databend_common_catalog::plan::VirtualColumnField;
1820
use databend_common_exception::Result;
1921
use databend_common_expression::eval_function;
2022
use databend_common_expression::types::DataType;
2123
use databend_common_expression::BlockEntry;
2224
use databend_common_expression::Column;
2325
use databend_common_expression::ColumnId;
2426
use databend_common_expression::DataBlock;
27+
use databend_common_expression::FunctionContext;
2528
use databend_common_expression::TableSchema;
2629
use databend_common_expression::TableSchemaRef;
2730
use databend_common_expression::Value;

src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ pub struct NativeDeserializeDataTransform {
191191
// Structures for driving the pipeline:
192192
input: Arc<InputPort>,
193193
output: Arc<OutputPort>,
194-
output_data: Option<DataBlock>,
194+
output_data: Vec<DataBlock>,
195195
parts: VecDeque<PartInfoPtr>,
196196
columns: VecDeque<NativeDataSource>,
197197
scan_progress: Arc<Progress>,
@@ -309,7 +309,7 @@ impl NativeDeserializeDataTransform {
309309
block_reader,
310310
input,
311311
output,
312-
output_data: None,
312+
output_data: vec![],
313313
parts: VecDeque::new(),
314314
columns: VecDeque::new(),
315315
prewhere_columns,
@@ -353,7 +353,7 @@ impl NativeDeserializeDataTransform {
353353
};
354354
self.scan_progress.incr(&progress_values);
355355
Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, data_block.memory_size());
356-
self.output_data = Some(data_block);
356+
self.output_data = vec![data_block];
357357
}
358358

359359
/// Check if can skip the whole block by default values.
@@ -846,7 +846,7 @@ impl Processor for NativeDeserializeDataTransform {
846846
return Ok(Event::NeedConsume);
847847
}
848848

849-
if let Some(data_block) = self.output_data.take() {
849+
if let Some(data_block) = self.output_data.pop() {
850850
self.output.push_data(Ok(data_block));
851851
return Ok(Event::NeedConsume);
852852
}
@@ -891,8 +891,8 @@ impl Processor for NativeDeserializeDataTransform {
891891
let columns = match columns {
892892
NativeDataSource::AggIndex(data) => {
893893
let agg_index_reader = self.index_reader.as_ref().as_ref().unwrap();
894-
let block = agg_index_reader.deserialize_native_data(data)?;
895-
self.output_data = Some(block);
894+
let blocks = agg_index_reader.deserialize_native_data(data)?;
895+
self.output_data = blocks;
896896
self.finish_partition();
897897
return Ok(());
898898
}

0 commit comments

Comments
 (0)