Skip to content

Commit d1bb641

Browse files
committed
Fix bugs.
- Parquet schema mismatched. Databend don't support dictionary types, so don't parse schema from meta key value data. - Output schema should be prune by prewhere info.
1 parent 241d6dd commit d1bb641

File tree

2 files changed

+30
-15
lines changed
  • src/query/storages/parquet/src

2 files changed

+30
-15
lines changed

src/query/storages/parquet/src/parquet_reader/meta.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use common_arrow::arrow::io::parquet::read as pread;
1818
use common_arrow::parquet::metadata::ColumnChunkMetaData;
1919
use common_arrow::parquet::metadata::FileMetaData;
2020
use common_arrow::parquet::metadata::RowGroupMetaData;
21+
use common_datavalues::DataField;
2122
use common_datavalues::DataSchema;
2223
use common_exception::ErrorCode;
2324
use common_exception::Result;
@@ -39,12 +40,19 @@ impl ParquetReader {
3940

4041
#[inline]
4142
pub fn infer_schema(meta: &FileMetaData) -> Result<DataSchema> {
42-
let mut arrow_schema = pread::infer_schema(meta)?;
43-
// Need to change all the field name to lowercase.
44-
for field in &mut arrow_schema.fields {
45-
field.name = field.name.to_lowercase();
46-
}
47-
Ok(DataSchema::from(arrow_schema))
43+
// Do not use `pread::infer_schema(meta)` becuase it will use metadata `ARROW:schema`.
44+
// There maybe dictionary types in the schema, which is not supported by Databend.
45+
// So we need to convert the primitive schema directly.
46+
let field = pread::schema::parquet_to_arrow_schema(meta.schema().fields())
47+
.into_iter()
48+
.map(|mut f| {
49+
// Need to change all the field name to lowercase.
50+
f.name = f.name.to_lowercase();
51+
DataField::from(&f)
52+
})
53+
.collect::<Vec<_>>();
54+
55+
Ok(DataSchema::new(field))
4856
}
4957

5058
pub fn get_column_metas<'a>(

src/query/storages/parquet/src/table_function/read.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,7 @@ impl ParquetTable {
8181
)
8282
}
8383

84-
fn adjust_io_request(
85-
&self,
86-
ctx: &Arc<dyn TableContext>,
87-
projection: &Projection,
88-
) -> Result<usize> {
84+
fn adjust_io_request(&self, ctx: &Arc<dyn TableContext>, num_columns: usize) -> Result<usize> {
8985
let conf = GlobalConfig::instance();
9086
let mut max_memory_usage = ctx.get_settings().get_max_memory_usage()? as usize;
9187
if conf.query.table_cache_enabled {
@@ -97,7 +93,7 @@ impl ParquetTable {
9793
let block_file_size = 300 * 1024 * 1024_usize;
9894
let table_column_len = self.table_info.schema().fields().len();
9995
let per_column_bytes = block_file_size / table_column_len;
100-
let scan_column_bytes = per_column_bytes * projection.len();
96+
let scan_column_bytes = per_column_bytes * num_columns;
10197
let estimate_io_requests = max_memory_usage / scan_column_bytes;
10298

10399
let setting_io_requests = std::cmp::max(
@@ -115,9 +111,20 @@ impl ParquetTable {
115111
plan: &DataSourcePlan,
116112
pipeline: &mut Pipeline,
117113
) -> Result<()> {
118-
let projection = PushDownInfo::projection_of_push_downs(&plan.schema(), &plan.push_downs);
119-
let output_schema = Arc::new(projection.project_schema(&plan.source_info.schema()));
120-
let max_io_requests = self.adjust_io_request(&ctx, &projection)?;
114+
let columns_to_read =
115+
PushDownInfo::projection_of_push_downs(&plan.schema(), &plan.push_downs);
116+
let max_io_requests = self.adjust_io_request(&ctx, columns_to_read.len())?;
117+
118+
// If there is a `PrewhereInfo`, the final output should be `PrehwereInfo.output_columns`.
119+
// `PrewhereInfo.output_columns` should be a subset of `PushDownInfo.projection`.
120+
let output_projection = match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) {
121+
None => {
122+
PushDownInfo::projection_of_push_downs(&self.table_info.schema(), &plan.push_downs)
123+
}
124+
Some(v) => v.output_columns.clone(),
125+
};
126+
let output_schema = Arc::new(output_projection.project_schema(&plan.source_info.schema()));
127+
121128
let prewhere_reader = self.build_prewhere_reader(plan)?;
122129
let prewhere_filter =
123130
self.build_prewhere_filter_executor(ctx.clone(), plan, prewhere_reader.schema())?;

0 commit comments

Comments
 (0)