Skip to content

Commit 352d865

Browse files
committed
feat(parquet): prune row groups before reading.
1 parent 6e7f922 commit 352d865

File tree

6 files changed

+118
-3
lines changed

6 files changed

+118
-3
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/src/plan/pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::plan::Projection;
2121

2222
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
2323
pub struct PrewhereInfo {
24-
/// columns to be ouput be prewhere scan
24+
/// columns to be output by prewhere scan
2525
pub output_columns: Projection,
2626
/// columns used for prewhere
2727
pub prewhere_columns: Projection,

src/query/storages/parquet/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ common-meta-app = { path = "../../../meta/app" }
2424
common-pipeline-core = { path = "../../pipeline/core" }
2525
common-sql = { path = "../../sql" }
2626
common-storage = { path = "../../../common/storage" }
27+
common-storages-pruner = { path = "../pruner" }
28+
common-storages-table-meta = { path = "../table-meta" }
2729

2830
async-trait = { version = "0.1.57", package = "async-trait-fn" }
2931
chrono = { workspace = true }

src/query/storages/parquet/src/parquet_reader/meta.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,22 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
16+
use std::collections::HashSet;
1517
use std::fs::File;
1618

1719
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
1820
use common_arrow::arrow::io::parquet::read as pread;
1921
use common_arrow::parquet::metadata::FileMetaData;
22+
use common_arrow::parquet::metadata::RowGroupMetaData;
23+
use common_datavalues::Column;
24+
use common_datavalues::ColumnRef;
25+
use common_datavalues::IntoColumn;
26+
use common_datavalues::UInt64Column;
2027
use common_exception::ErrorCode;
2128
use common_exception::Result;
29+
use common_storages_table_meta::meta::ColumnStatistics;
30+
use common_storages_table_meta::meta::StatisticsOfColumns;
2231

2332
use crate::ParquetReader;
2433

@@ -43,4 +52,71 @@ impl ParquetReader {
4352
});
4453
Ok(arrow_schema)
4554
}
55+
56+
/// Collect statistics of a batch of row groups of the specified columns.
57+
///
58+
/// The retuened vector's length is the same as `rgs`.
59+
pub fn collect_row_group_stats(
60+
schema: &ArrowSchema,
61+
rgs: &[RowGroupMetaData],
62+
indices: &HashSet<usize>,
63+
) -> Result<Vec<StatisticsOfColumns>> {
64+
let mut stats = Vec::with_capacity(rgs.len());
65+
let mut stats_of_row_groups = HashMap::with_capacity(rgs.len());
66+
67+
for index in indices {
68+
let field = &schema.fields[*index];
69+
let column_stats = pread::statistics::deserialize(field, rgs)?;
70+
stats_of_row_groups.insert(*index, BatchStatistics::from(column_stats));
71+
}
72+
73+
for (rg_idx, _) in rgs.iter().enumerate() {
74+
let mut cols_stats = HashMap::new();
75+
cols_stats.reserve(stats.capacity());
76+
for index in indices {
77+
let col_stats = stats_of_row_groups[index].get(rg_idx);
78+
cols_stats.insert(*index as u32, col_stats);
79+
}
80+
stats.push(cols_stats);
81+
}
82+
83+
Ok(stats)
84+
}
85+
}
86+
87+
/// A temporary struct to present [`pread::statistics::Statistics`].
88+
///
89+
/// Convert the inner fields into Databend data structures.
90+
pub struct BatchStatistics {
91+
pub null_count: UInt64Column,
92+
pub distinct_count: UInt64Column,
93+
pub min_values: ColumnRef,
94+
pub max_values: ColumnRef,
95+
}
96+
97+
impl BatchStatistics {
98+
pub fn get(&self, index: usize) -> ColumnStatistics {
99+
ColumnStatistics {
100+
min: self.min_values.get(0),
101+
max: self.max_values.get(0),
102+
null_count: self.null_count.get_u64(index).unwrap(),
103+
in_memory_size: 0, // this field is not used.
104+
distinct_of_values: self.distinct_count.get_u64(index).ok(),
105+
}
106+
}
107+
}
108+
109+
impl From<pread::statistics::Statistics> for BatchStatistics {
110+
fn from(stats: pread::statistics::Statistics) -> Self {
111+
let null_count = UInt64Column::from_arrow_array(&*stats.null_count);
112+
let distinct_count = UInt64Column::from_arrow_array(&*stats.distinct_count);
113+
let min_values = stats.min_value.clone().into_column();
114+
let max_values = stats.min_value.clone().into_column();
115+
Self {
116+
null_count,
117+
distinct_count,
118+
min_values,
119+
max_values,
120+
}
121+
}
46122
}

src/query/storages/parquet/src/parquet_reader/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct ParquetReader {
5858
/// The actual schema used to read parquet. It will be converted to [`common_datavalues::DataSchema`] when output [`common_datablocks::DataBlock`].
5959
///
6060
/// The reason of using [`ArrowSchema`] to read parquet is that
61-
/// There are some types that Databend not support such as Timestmap of nanoseconds.
61+
/// There are some types that Databend not support such as Timestamp of nanoseconds.
6262
/// Such types will be convert to supported types after deserialization.
6363
projected_arrow_schema: ArrowSchema,
6464
/// [`ColumnLeaves`] corresponding to the `projected_schema`.

src/query/storages/parquet/src/table_function/read.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::collections::HashMap;
1616
use std::sync::Arc;
1717

18+
use common_arrow::arrow::io::parquet::read as pread;
1819
use common_catalog::plan::DataSourcePlan;
1920
use common_catalog::plan::Partitions;
2021
use common_catalog::plan::PartitionsShuffleKind;
@@ -26,6 +27,7 @@ use common_exception::Result;
2627
use common_pipeline_core::Pipeline;
2728
use common_sql::evaluator::EvalNode;
2829
use common_sql::evaluator::Evaluator;
30+
use common_storages_pruner::range_pruner;
2931

3032
use super::ParquetTable;
3133
use super::TableContext;
@@ -134,11 +136,44 @@ impl ParquetTable {
134136
// `dummy_reader` is only used for prune columns in row groups.
135137
let (_, _, _, columns_to_read) =
136138
ParquetReader::do_projection(&plan.source_info.schema().to_arrow(), &columns_to_read)?;
139+
140+
// do parition at the begin of the whole pipeline.
141+
let push_downs = plan.push_downs.clone();
142+
let schema = plan.schema();
137143
pipeline.set_on_init(move || {
138144
let mut partitions = Vec::with_capacity(locations.len());
145+
146+
// build row group pruner.
147+
148+
let filter_expr = push_downs.as_ref().map(|extra| extra.filters.as_slice());
149+
let row_group_pruner = range_pruner::new_range_pruner(&ctx_ref, filter_expr, &schema)?;
150+
139151
for location in &locations {
140152
let file_meta = ParquetReader::read_meta(location)?;
141-
for rg in &file_meta.row_groups {
153+
let arrow_schema = pread::infer_schema(&file_meta)?;
154+
let mut row_group_pruned = vec![false; file_meta.row_groups.len()];
155+
156+
// If collecting stats fails or `should_keep` is true, we still read the row group.
157+
// Otherwise, the row group will be pruned.
158+
if let Ok(row_group_stats) = ParquetReader::collect_row_group_stats(
159+
&arrow_schema,
160+
&file_meta.row_groups,
161+
&columns_to_read,
162+
) {
163+
for (idx, (stats, rg)) in row_group_stats
164+
.iter()
165+
.zip(file_meta.row_groups.iter())
166+
.enumerate()
167+
{
168+
row_group_pruned[idx] =
169+
!row_group_pruner.should_keep(stats, rg.num_rows() as u64);
170+
}
171+
}
172+
173+
for (idx, rg) in file_meta.row_groups.iter().enumerate() {
174+
if row_group_pruned[idx] {
175+
continue;
176+
}
142177
let mut column_metas = HashMap::with_capacity(columns_to_read.len());
143178
for index in &columns_to_read {
144179
let c = &rg.columns()[*index];

0 commit comments

Comments
 (0)