Skip to content

Commit d12105a

Browse files
committed
Prune columns of row groups.
And fix clippy.
1 parent 4d4d0dc commit d12105a

File tree

7 files changed

+47
-46
lines changed

7 files changed

+47
-46
lines changed

src/query/storages/parquet/src/parquet_part.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::any::Any;
1616
use std::collections::hash_map::DefaultHasher;
17+
use std::collections::HashMap;
1718
use std::hash::Hash;
1819
use std::hash::Hasher;
1920
use std::sync::Arc;
@@ -91,9 +92,9 @@ impl From<Compression> for ParquetCompression {
9192
}
9293
}
9394

94-
impl Into<Compression> for ParquetCompression {
95-
fn into(self) -> Compression {
96-
match self {
95+
impl From<ParquetCompression> for Compression {
96+
fn from(value: ParquetCompression) -> Self {
97+
match value {
9798
ParquetCompression::Uncompressed => Compression::Uncompressed,
9899
ParquetCompression::Snappy => Compression::Snappy,
99100
ParquetCompression::Gzip => Compression::Gzip,
@@ -117,7 +118,7 @@ pub struct ColumnMeta {
117118
pub struct ParquetRowGroupPart {
118119
pub location: String,
119120
pub num_rows: usize,
120-
pub column_metas: Vec<ColumnMeta>,
121+
pub column_metas: HashMap<usize, ColumnMeta>,
121122
}
122123

123124
#[typetag::serde(name = "parquet_row_group")]
@@ -144,7 +145,7 @@ impl ParquetRowGroupPart {
144145
pub fn create(
145146
location: String,
146147
num_rows: usize,
147-
column_metas: Vec<ColumnMeta>,
148+
column_metas: HashMap<usize, ColumnMeta>,
148149
) -> Arc<Box<dyn PartInfo>> {
149150
Arc::new(Box::new(ParquetRowGroupPart {
150151
location,

src/query/storages/parquet/src/parquet_reader/deserialize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl ParquetReader {
5050
let mut metas = Vec::with_capacity(indices.len());
5151
let mut chunks = Vec::with_capacity(indices.len());
5252
for index in indices {
53-
let column_meta = &part.column_metas[*index];
53+
let column_meta = &part.column_metas[index];
5454
let cnt = cnt_map.get_mut(index).unwrap();
5555
*cnt -= 1;
5656
let column_chunk = if cnt > &mut 0 {

src/query/storages/parquet/src/parquet_reader/meta.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::fs::File;
1616

1717
use common_arrow::arrow::io::parquet::read as pread;
1818
use common_arrow::parquet::metadata::FileMetaData;
19-
use common_datavalues::DataField;
2019
use common_datavalues::DataSchema;
2120
use common_exception::ErrorCode;
2221
use common_exception::Result;
@@ -38,18 +37,10 @@ impl ParquetReader {
3837

3938
#[inline]
4039
pub fn infer_schema(meta: &FileMetaData) -> Result<DataSchema> {
41-
// Do not use `pread::infer_schema(meta)` becuase it will use metadata `ARROW:schema`.
42-
// There maybe dictionary types in the schema, which is not supported by Databend.
43-
// So we need to convert the primitive schema directly.
44-
let field = pread::schema::parquet_to_arrow_schema(meta.schema().fields())
45-
.into_iter()
46-
.map(|mut f| {
47-
// Need to change all the field name to lowercase.
48-
f.name = f.name.to_lowercase();
49-
DataField::from(&f)
50-
})
51-
.collect::<Vec<_>>();
52-
53-
Ok(DataSchema::new(field))
40+
let mut arrow_schema = pread::infer_schema(meta)?;
41+
arrow_schema.fields.iter_mut().for_each(|f| {
42+
f.name = f.name.to_lowercase();
43+
});
44+
Ok(DataSchema::from(arrow_schema))
5445
}
5546
}

src/query/storages/parquet/src/parquet_reader/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ use common_storage::ColumnLeaves;
2828
use opendal::Operator;
2929
pub use read::IndexedChunk;
3030

31+
/// The reader to parquet files with a projected schema.
32+
///
33+
/// **ALERT**: dictionary type is not supported yet.
34+
/// If there are dictionary pages in the parquet file, the reading process may fail.
3135
#[derive(Clone)]
3236
pub struct ParquetReader {
3337
operator: Operator,
@@ -62,12 +66,7 @@ impl ParquetReader {
6266
let schema_descriptors = to_parquet_schema(&arrow_schema)?;
6367

6468
// Project schema
65-
let projected_schema = match projection {
66-
Projection::Columns(ref indices) => DataSchemaRef::new(schema.project(indices)),
67-
Projection::InnerColumns(ref path_indices) => {
68-
DataSchemaRef::new(schema.inner_project(path_indices))
69-
}
70-
};
69+
let projected_schema = DataSchemaRef::new(projection.project_schema(&schema));
7170
// Project column leaves
7271
let projected_column_leaves = ColumnLeaves {
7372
column_leaves: projection
@@ -101,4 +100,8 @@ impl ParquetReader {
101100
pub fn schema(&self) -> DataSchemaRef {
102101
self.projected_schema.clone()
103102
}
103+
104+
pub fn columns_to_read(&self) -> &HashSet<usize> {
105+
&self.columns_to_read
106+
}
104107
}

src/query/storages/parquet/src/parquet_reader/read.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ impl ParquetReader {
2626
let mut chunks = Vec::with_capacity(self.columns_to_read.len());
2727

2828
for index in &self.columns_to_read {
29-
let meta = &part.column_metas[*index];
29+
let meta = &part.column_metas[index];
3030
let op = self.operator.clone();
3131
let chunk =
3232
Self::sync_read_column(op.object(&part.location), meta.offset, meta.length)?;

src/query/storages/parquet/src/parquet_source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ impl Processor for ParquetSource {
248248
match std::mem::replace(&mut self.state, State::Finish) {
249249
State::ReadDataPrewhere(Some(part)) => {
250250
let rg_part = ParquetRowGroupPart::from_part(&part)?;
251-
let chunks = self.prewhere_reader.sync_read_columns_data(&rg_part)?;
251+
let chunks = self.prewhere_reader.sync_read_columns_data(rg_part)?;
252252
if self.prewhere_filter.is_some() {
253253
self.state = State::PrewhereFilter(part, chunks);
254254
} else {

src/query/storages/parquet/src/table_function/read.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617

1718
use common_catalog::plan::DataSourcePlan;
@@ -126,24 +127,33 @@ impl ParquetTable {
126127
part.location.clone()
127128
})
128129
.collect::<Vec<_>>();
130+
131+
let columns_to_read =
132+
PushDownInfo::projection_of_push_downs(&plan.source_info.schema(), &plan.push_downs);
133+
let max_io_requests = self.adjust_io_request(&ctx, columns_to_read.len())?;
129134
let ctx_ref = ctx.clone();
135+
// `dummy_reader` is only used for prune columns in row groups.
136+
let dummy_reader = ParquetReader::create(
137+
self.operator.clone(),
138+
plan.source_info.schema(),
139+
columns_to_read,
140+
)?;
130141
pipeline.set_on_init(move || {
131142
let mut partitions = Vec::with_capacity(locations.len());
132143
for location in &locations {
133-
let file_meta = ParquetReader::read_meta(&location)?;
144+
let file_meta = ParquetReader::read_meta(location)?;
134145
for rg in &file_meta.row_groups {
135-
let column_metas = rg
136-
.columns()
137-
.iter()
138-
.map(|c| {
139-
let (offset, length) = c.byte_range();
140-
ColumnMeta {
141-
offset,
142-
length,
143-
compression: c.compression().into(),
144-
}
145-
})
146-
.collect();
146+
let mut column_metas =
147+
HashMap::with_capacity(dummy_reader.columns_to_read().len());
148+
for index in dummy_reader.columns_to_read() {
149+
let c = &rg.columns()[*index];
150+
let (offset, length) = c.byte_range();
151+
column_metas.insert(*index, ColumnMeta {
152+
offset,
153+
length,
154+
compression: c.compression().into(),
155+
});
156+
}
147157

148158
partitions.push(ParquetRowGroupPart::create(
149159
location.clone(),
@@ -157,17 +167,13 @@ impl ParquetTable {
157167
Ok(())
158168
});
159169

160-
let columns_to_read =
161-
PushDownInfo::projection_of_push_downs(&plan.schema(), &plan.push_downs);
162-
let max_io_requests = self.adjust_io_request(&ctx, columns_to_read.len())?;
163-
164170
// If there is a `PrewhereInfo`, the final output should be `PrehwereInfo.output_columns`.
165171
// `PrewhereInfo.output_columns` should be a subset of `PushDownInfo.projection`.
166172
let output_projection = match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) {
167173
None => {
168174
PushDownInfo::projection_of_push_downs(&self.table_info.schema(), &plan.push_downs)
169175
}
170-
Some(v) => v.output_columns.clone(),
176+
Some(v) => v.output_columns,
171177
};
172178
let output_schema = Arc::new(output_projection.project_schema(&plan.source_info.schema()));
173179

0 commit comments

Comments
 (0)