Skip to content

Commit 8189ebf

Browse files
committed
feat: new settings fuse_parquet_read_batch_size
Which controls the bach size during deserializing of fuse parquet data block. The default value of this setting is 8192.
1 parent ba2d1d3 commit 8189ebf

File tree

9 files changed

+270
-151
lines changed

9 files changed

+270
-151
lines changed

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1242,6 +1242,13 @@ impl DefaultSettings {
12421242
scope: SettingScope::Both,
12431243
range: Some(SettingRange::Numeric(0..=1)),
12441244
}),
1245+
("fuse_parquet_read_batch_size", DefaultSettingValue {
1246+
value: UserSettingValue::UInt64(8192),
1247+
desc: "The batch size while deserializing fuse table with parquet storage format",
1248+
mode: SettingMode::Both,
1249+
scope: SettingScope::Both,
1250+
range: Some(SettingRange::Numeric(0..=1_0000_000)),
1251+
}),
12451252
]);
12461253

12471254
Ok(Arc::new(DefaultSettings {

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -924,4 +924,8 @@ impl Settings {
924924
pub fn get_enable_use_vacuum2_to_purge_transient_table_data(&self) -> Result<bool> {
925925
Ok(self.try_get_u64("use_vacuum2_to_purge_transient_table_data")? == 1)
926926
}
927+
928+
pub fn get_fuse_parquet_read_batch_size(&self) -> Result<usize> {
929+
Ok(self.try_get_u64("fuse_parquet_read_batch_size")? as usize)
930+
}
927931
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl AggIndexReader {
9797
self.index_id
9898
}
9999

100-
pub(super) fn apply_agg_info(&self, block: DataBlock) -> Result<DataBlock> {
100+
pub(super) fn apply_agg_info_to_block(&self, block: DataBlock) -> Result<DataBlock> {
101101
let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS);
102102

103103
// 1. Filter the block if there is a filter.
@@ -145,4 +145,11 @@ impl AggIndexReader {
145145
)),
146146
))
147147
}
148+
149+
pub(super) fn apply_agg_info(&self, block: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
150+
block
151+
.into_iter()
152+
.map(|block| self.apply_agg_info_to_block(block))
153+
.collect::<Result<_>>()
154+
}
148155
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::sync::Arc;
16+
use std::vec;
1617

1718
use databend_common_exception::Result;
1819
use databend_common_expression::DataBlock;
@@ -166,7 +167,8 @@ impl AggIndexReader {
166167
let block = DataBlock::new_from_columns(columns);
167168
blocks.push(block);
168169
}
169-
let block = DataBlock::concat(&blocks)?;
170-
self.apply_agg_info(block)
170+
let blocks = self.apply_agg_info(blocks)?;
171+
172+
DataBlock::concat(blocks.as_slice())
171173
}
172174
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,17 @@ impl AggIndexReader {
113113
&self,
114114
part: PartInfoPtr,
115115
data: BlockReadResult,
116-
) -> Result<DataBlock> {
116+
batch_size: usize,
117+
) -> Result<Vec<DataBlock>> {
117118
let columns_chunks = data.columns_chunks()?;
118119
let part = FuseBlockPartInfo::from_part(&part)?;
119-
let block = self.reader.deserialize_parquet_chunks(
120+
let block = self.reader.deserialize_parquet_to_blocks(
120121
part.nums_rows,
121122
&part.columns_meta,
122123
columns_chunks,
123124
&part.compression,
124125
&part.location,
126+
batch_size,
125127
)?;
126128

127129
self.apply_agg_info(block)

src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use arrow_schema::Schema;
1919
use databend_common_expression::ColumnId;
2020
use databend_common_expression::TableSchema;
2121
use databend_storages_common_table_meta::meta::Compression;
22+
use itertools::Itertools;
2223
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
2324
use parquet::arrow::parquet_to_arrow_field_levels;
2425
use parquet::arrow::ArrowSchemaConverter;
@@ -34,7 +35,8 @@ pub fn column_chunks_to_record_batch(
3435
num_rows: usize,
3536
column_chunks: &HashMap<ColumnId, DataItem>,
3637
compression: &Compression,
37-
) -> databend_common_exception::Result<RecordBatch> {
38+
batch_size: usize,
39+
) -> databend_common_exception::Result<Vec<RecordBatch>> {
3840
let arrow_schema = Schema::from(original_schema);
3941
let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
4042

@@ -66,13 +68,13 @@ pub fn column_chunks_to_record_batch(
6668
ProjectionMask::leaves(&parquet_schema, projection_mask),
6769
Some(arrow_schema.fields()),
6870
)?;
69-
let mut record_reader = ParquetRecordBatchReader::try_new_with_row_groups(
71+
let record_reader = ParquetRecordBatchReader::try_new_with_row_groups(
7072
&field_levels,
7173
row_group.as_ref(),
72-
num_rows,
74+
batch_size,
7375
None,
7476
)?;
75-
let record = record_reader.next().unwrap()?;
76-
assert!(record_reader.next().is_none());
77-
Ok(record)
77+
let records = record_reader.try_collect()?;
78+
// TODO assert the row numbers?
79+
Ok(records)
7880
}

src/query/storages/fuse/src/io/read/block/parquet/mod.rs

Lines changed: 88 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ mod adapter;
3535
mod deserialize;
3636

3737
pub use adapter::RowGroupImplBuilder;
38+
use databend_common_exception::Result;
3839
pub use deserialize::column_chunks_to_record_batch;
3940

4041
use crate::io::read::block::block_reader_merge_io::DataItem;
@@ -48,17 +49,41 @@ impl BlockReader {
4849
column_chunks: HashMap<ColumnId, DataItem>,
4950
compression: &Compression,
5051
block_path: &str,
51-
) -> databend_common_exception::Result<DataBlock> {
52+
) -> Result<DataBlock> {
53+
let mut blocks = self.deserialize_parquet_to_blocks(
54+
num_rows,
55+
column_metas,
56+
column_chunks,
57+
compression,
58+
block_path,
59+
num_rows,
60+
)?;
61+
// Defensive check: using `num_rows` as batch_size, expects only one block
62+
assert_eq!(blocks.len(), 1);
63+
Ok(blocks.pop().unwrap())
64+
}
65+
66+
pub(crate) fn deserialize_parquet_to_blocks(
67+
&self,
68+
num_rows: usize,
69+
column_metas: &HashMap<ColumnId, ColumnMeta>,
70+
column_chunks: HashMap<ColumnId, DataItem>,
71+
compression: &Compression,
72+
block_path: &str,
73+
batch_size: usize,
74+
) -> Result<Vec<DataBlock>> {
5275
if column_chunks.is_empty() {
53-
return self.build_default_values_block(num_rows);
76+
return Ok(vec![self.build_default_values_block(num_rows)?]);
5477
}
55-
let record_batch = column_chunks_to_record_batch(
78+
79+
let record_batches = column_chunks_to_record_batch(
5680
&self.original_schema,
5781
num_rows,
5882
&column_chunks,
5983
compression,
84+
batch_size,
6085
)?;
61-
let mut columns = Vec::with_capacity(self.projected_schema.fields.len());
86+
6287
let name_paths = column_name_paths(&self.projection, &self.original_schema);
6388

6489
let array_cache = if self.put_cache {
@@ -67,58 +92,69 @@ impl BlockReader {
6792
None
6893
};
6994

70-
for ((i, field), column_node) in self
71-
.projected_schema
72-
.fields
73-
.iter()
74-
.enumerate()
75-
.zip(self.project_column_nodes.iter())
76-
{
77-
let data_type = field.data_type().into();
78-
79-
// NOTE, there is something tricky here:
80-
// - `column_chunks` always contains data of leaf columns
81-
// - here we may processing a nested type field
82-
// - But, even if the field being processed is a field with multiple leaf columns
83-
// `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1],
84-
// even if we are getting data from `column_chunks` using a non-leaf
85-
// `column_id` of `projected_schema.fields`
86-
//
87-
// [^1]: Except in the current block, there is no data stored for the
88-
// corresponding field, and a default value has been declared for
89-
// the corresponding field.
90-
//
91-
// Yes, it is too obscure, we need to polish it later.
92-
93-
let value = match column_chunks.get(&field.column_id) {
94-
Some(DataItem::RawData(data)) => {
95-
// get the deserialized arrow array, which may be a nested array
96-
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
97-
if !column_node.is_nested {
98-
if let Some(cache) = &array_cache {
99-
let meta = column_metas.get(&field.column_id).unwrap();
100-
let (offset, len) = meta.offset_length();
101-
let key =
102-
TableDataCacheKey::new(block_path, field.column_id, offset, len);
103-
cache.insert(key.into(), (arrow_array.clone(), data.len()));
95+
let mut blocks = Vec::with_capacity(record_batches.len());
96+
97+
for record_batch in record_batches {
98+
let mut columns = Vec::with_capacity(self.projected_schema.fields.len());
99+
for ((i, field), column_node) in self
100+
.projected_schema
101+
.fields
102+
.iter()
103+
.enumerate()
104+
.zip(self.project_column_nodes.iter())
105+
{
106+
let data_type = field.data_type().into();
107+
108+
// NOTE, there is something tricky here:
109+
// - `column_chunks` always contains data of leaf columns
110+
// - here we may processing a nested type field
111+
// - But, even if the field being processed is a field with multiple leaf columns
112+
// `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1],
113+
// even if we are getting data from `column_chunks` using a non-leaf
114+
// `column_id` of `projected_schema.fields`
115+
//
116+
// [^1]: Except in the current block, there is no data stored for the
117+
// corresponding field, and a default value has been declared for
118+
// the corresponding field.
119+
//
120+
// Yes, it is too obscure, we need to polish it later.
121+
122+
let value = match column_chunks.get(&field.column_id) {
123+
Some(DataItem::RawData(data)) => {
124+
// get the deserialized arrow array, which may be a nested array
125+
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
126+
if !column_node.is_nested {
127+
if let Some(cache) = &array_cache {
128+
let meta = column_metas.get(&field.column_id).unwrap();
129+
let (offset, len) = meta.offset_length();
130+
let key = TableDataCacheKey::new(
131+
block_path,
132+
field.column_id,
133+
offset,
134+
len,
135+
);
136+
cache.insert(key.into(), (arrow_array.clone(), data.len()));
137+
}
104138
}
139+
Value::from_arrow_rs(arrow_array, &data_type)?
105140
}
106-
Value::from_arrow_rs(arrow_array, &data_type)?
107-
}
108-
Some(DataItem::ColumnArray(cached)) => {
109-
if column_node.is_nested {
110-
// a defensive check, should never happen
111-
return Err(ErrorCode::StorageOther(
112-
"unexpected nested field: nested leaf field hits cached",
113-
));
141+
Some(DataItem::ColumnArray(cached)) => {
142+
if column_node.is_nested {
143+
// a defensive check, should never happen
144+
return Err(ErrorCode::StorageOther(
145+
"unexpected nested field: nested leaf field hits cached",
146+
));
147+
}
148+
Value::from_arrow_rs(cached.0.clone(), &data_type)?
114149
}
115-
Value::from_arrow_rs(cached.0.clone(), &data_type)?
116-
}
117-
None => Value::Scalar(self.default_vals[i].clone()),
118-
};
119-
columns.push(BlockEntry::new(data_type, value));
150+
None => Value::Scalar(self.default_vals[i].clone()),
151+
};
152+
columns.push(BlockEntry::new(data_type, value));
153+
}
154+
blocks.push(DataBlock::new(columns, num_rows));
120155
}
121-
Ok(DataBlock::new(columns, num_rows))
156+
157+
Ok(blocks)
122158
}
123159
}
124160

src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@
1414

1515
use std::collections::HashSet;
1616

17+
use arrow_array::RecordBatch;
18+
use databend_common_catalog::plan::VirtualColumnInfo;
1719
use databend_common_exception::Result;
1820
use databend_common_expression::eval_function;
1921
use databend_common_expression::types::DataType;
2022
use databend_common_expression::BlockEntry;
2123
use databend_common_expression::Column;
2224
use databend_common_expression::ColumnId;
2325
use databend_common_expression::DataBlock;
26+
use databend_common_expression::FunctionContext;
27+
use databend_common_expression::TableSchemaRef;
2428
use databend_common_expression::Value;
2529
use databend_common_functions::BUILTIN_FUNCTIONS;
2630
use databend_storages_common_io::MergeIOReader;
@@ -128,30 +132,60 @@ impl VirtualColumnReader {
128132
))
129133
}
130134

131-
pub fn deserialize_virtual_columns(
135+
pub fn try_create_paster(
132136
&self,
133-
mut data_block: DataBlock,
134137
virtual_data: Option<VirtualBlockReadResult>,
135-
) -> Result<DataBlock> {
136-
let record_batch = virtual_data
137-
.map(|virtual_data| {
138-
let columns_chunks = virtual_data.data.columns_chunks()?;
139-
column_chunks_to_record_batch(
140-
&self.virtual_column_info.schema,
141-
virtual_data.num_rows,
142-
&columns_chunks,
143-
&virtual_data.compression,
144-
)
145-
})
146-
.transpose()?;
147-
148-
// If the virtual column has already generated, add it directly,
149-
// otherwise extract it from the source column
138+
batch_size: usize,
139+
) -> Result<VirtualColumnDataModifier> {
140+
let chunks = if let Some(virtual_data) = virtual_data {
141+
let columns_chunks = virtual_data.data.columns_chunks()?;
142+
let chunks = column_chunks_to_record_batch(
143+
&self.virtual_column_info.schema,
144+
virtual_data.num_rows,
145+
&columns_chunks,
146+
&virtual_data.compression,
147+
batch_size,
148+
)?;
149+
Some(chunks)
150+
} else {
151+
None
152+
};
153+
150154
let func_ctx = self.ctx.get_function_context()?;
155+
156+
Ok(VirtualColumnDataModifier {
157+
record_batches: chunks,
158+
function_context: func_ctx,
159+
next_record_batch_index: 0,
160+
virtual_column_info: self.virtual_column_info.clone(),
161+
source_schema: self.source_schema.clone(),
162+
})
163+
}
164+
}
165+
166+
pub struct VirtualColumnDataModifier {
167+
record_batches: Option<Vec<RecordBatch>>,
168+
next_record_batch_index: usize,
169+
function_context: FunctionContext,
170+
virtual_column_info: VirtualColumnInfo,
171+
source_schema: TableSchemaRef,
172+
}
173+
174+
impl VirtualColumnDataModifier {
175+
pub fn paste_virtual_column(&mut self, mut data_block: DataBlock) -> Result<DataBlock> {
176+
let record_batch = if let Some(record_batches) = &self.record_batches {
177+
assert!(record_batches.len() > self.next_record_batch_index);
178+
Some(&record_batches[self.next_record_batch_index])
179+
} else {
180+
None
181+
};
182+
183+
self.next_record_batch_index += 1;
184+
185+
let func_ctx = &self.function_context;
151186
for virtual_column_field in self.virtual_column_info.virtual_column_fields.iter() {
152-
if let Some(arrow_array) = record_batch
153-
.as_ref()
154-
.and_then(|r| r.column_by_name(&virtual_column_field.name).cloned())
187+
if let Some(arrow_array) =
188+
record_batch.and_then(|r| r.column_by_name(&virtual_column_field.name).cloned())
155189
{
156190
let data_type: DataType = virtual_column_field.data_type.as_ref().into();
157191
let value = Value::Column(Column::from_arrow_rs(arrow_array, &data_type)?);

0 commit comments

Comments
 (0)