Skip to content

Commit 91be296

Browse files
authored
Merge pull request #9228 from RinChanNOWWW/prune-row-group
feat(parquet): prune row groups before reading.
2 parents 3baa946 + 2980c30 commit 91be296

File tree

15 files changed

+227
-40
lines changed

15 files changed

+227
-40
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ members = [
6060
"src/query/storages/system",
6161
"src/query/storages/view",
6262
"src/query/storages/parquet",
63+
"src/query/storages/pruner",
6364
"src/query/users",
6465
# databend-query
6566
"src/query/service",

src/query/catalog/src/plan/pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::plan::Projection;
2121

2222
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
2323
pub struct PrewhereInfo {
24-
/// columns to be ouput be prewhere scan
24+
/// columns to be output by prewhere scan
2525
pub output_columns: Projection,
2626
/// columns used for prewhere
2727
pub prewhere_columns: Projection,

src/query/storages/fuse/fuse/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ common-sql = { path = "../../../sql" }
3131
common-storage = { path = "../../../../common/storage" }
3232
common-storages-cache = { path = "../../cache" }
3333
common-storages-index = { path = "../../index" }
34+
common-storages-pruner = { path = "../../pruner" }
3435
common-storages-table-meta = { path = "../../table-meta" }
3536

3637
async-trait = { version = "0.1.57", package = "async-trait-fn" }

src/query/storages/fuse/fuse/src/pruning/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
mod limiter;
1615
mod pruner;
1716
mod pruning_executor;
18-
mod range_pruner;
1917
mod topn_pruner;
2018

2119
pub use pruning_executor::BlockIndex;

src/query/storages/fuse/fuse/src/pruning/pruning_executor.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ use common_catalog::table_context::TableContext;
2525
use common_datavalues::DataSchemaRef;
2626
use common_exception::ErrorCode;
2727
use common_exception::Result;
28+
use common_storages_pruner::LimiterPruner;
29+
use common_storages_pruner::LimiterPrunerCreator;
30+
use common_storages_pruner::RangePruner;
31+
use common_storages_pruner::RangePrunerCreator;
2832
use common_storages_table_meta::meta::BlockMeta;
2933
use common_storages_table_meta::meta::Location;
3034
use common_storages_table_meta::meta::SegmentInfo;
@@ -35,11 +39,7 @@ use tracing::Instrument;
3539

3640
use super::pruner;
3741
use crate::io::MetaReaders;
38-
use crate::pruning::limiter;
39-
use crate::pruning::limiter::LimiterPruner;
4042
use crate::pruning::pruner::Pruner;
41-
use crate::pruning::range_pruner;
42-
use crate::pruning::range_pruner::RangePruner;
4343
use crate::pruning::topn_pruner;
4444

4545
pub type BlockIndex = (usize, usize);
@@ -79,11 +79,11 @@ impl BlockPruner {
7979
.and_then(|p| p.limit);
8080

8181
// prepare the limiter. in case that limit is none, an unlimited limiter will be returned
82-
let limiter = limiter::new_limiter(limit);
82+
let limiter = LimiterPrunerCreator::create(limit);
8383

8484
// prepare the range filter.
8585
// if filter_expression is none, an dummy pruner will be returned, which prunes nothing
86-
let range_pruner = range_pruner::new_range_pruner(ctx, filter_expressions, &schema)?;
86+
let range_pruner = RangePrunerCreator::try_create(ctx, filter_expressions, &schema)?;
8787

8888
// prepare the filter.
8989
// None will be returned, if filter is not applicable (e.g. unsuitable filter expression, index not available, etc.)

src/query/storages/fuse/fuse/src/pruning/topn_pruner.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,14 @@ use common_storages_table_meta::meta::ColumnStatistics;
2424

2525
use crate::pruning::BlockIndex;
2626

27-
pub(crate) struct TopNPrunner {
27+
pub struct TopNPrunner {
2828
schema: DataSchemaRef,
2929
sort: Vec<(Expression, bool, bool)>,
3030
limit: usize,
3131
}
3232

3333
impl TopNPrunner {
34-
pub(crate) fn new(
35-
schema: DataSchemaRef,
36-
sort: Vec<(Expression, bool, bool)>,
37-
limit: usize,
38-
) -> Self {
34+
pub fn new(schema: DataSchemaRef, sort: Vec<(Expression, bool, bool)>, limit: usize) -> Self {
3935
Self {
4036
schema,
4137
sort,
@@ -45,7 +41,7 @@ impl TopNPrunner {
4541
}
4642

4743
impl TopNPrunner {
48-
pub(crate) fn prune(
44+
pub fn prune(
4945
&self,
5046
metas: Vec<(BlockIndex, Arc<BlockMeta>)>,
5147
) -> Result<Vec<(BlockIndex, Arc<BlockMeta>)>> {

src/query/storages/parquet/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ common-meta-app = { path = "../../../meta/app" }
2424
common-pipeline-core = { path = "../../pipeline/core" }
2525
common-sql = { path = "../../sql" }
2626
common-storage = { path = "../../../common/storage" }
27+
common-storages-pruner = { path = "../pruner" }
28+
common-storages-table-meta = { path = "../table-meta" }
2729

2830
async-trait = { version = "0.1.57", package = "async-trait-fn" }
2931
chrono = { workspace = true }

src/query/storages/parquet/src/parquet_reader/meta.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,22 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
16+
use std::collections::HashSet;
1517
use std::fs::File;
1618

1719
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
1820
use common_arrow::arrow::io::parquet::read as pread;
1921
use common_arrow::parquet::metadata::FileMetaData;
22+
use common_arrow::parquet::metadata::RowGroupMetaData;
23+
use common_datavalues::Column;
24+
use common_datavalues::ColumnRef;
25+
use common_datavalues::IntoColumn;
26+
use common_datavalues::UInt64Column;
2027
use common_exception::ErrorCode;
2128
use common_exception::Result;
29+
use common_storages_table_meta::meta::ColumnStatistics;
30+
use common_storages_table_meta::meta::StatisticsOfColumns;
2231

2332
use crate::ParquetReader;
2433

@@ -43,4 +52,79 @@ impl ParquetReader {
4352
});
4453
Ok(arrow_schema)
4554
}
55+
56+
/// Collect statistics of a batch of row groups of the specified columns.
57+
///
58+
/// The retuened vector's length is the same as `rgs`.
59+
pub fn collect_row_group_stats(
60+
schema: &ArrowSchema,
61+
rgs: &[RowGroupMetaData],
62+
indices: &HashSet<usize>,
63+
) -> Result<Vec<StatisticsOfColumns>> {
64+
let mut stats = Vec::with_capacity(rgs.len());
65+
let mut stats_of_row_groups = HashMap::with_capacity(rgs.len());
66+
67+
for index in indices {
68+
if rgs
69+
.iter()
70+
.any(|rg| rg.columns()[*index].metadata().statistics.is_none())
71+
{
72+
return Err(ErrorCode::InvalidArgument(
73+
"Some columns of the row groups have no statistics",
74+
));
75+
}
76+
77+
let field = &schema.fields[*index];
78+
let column_stats = pread::statistics::deserialize(field, rgs)?;
79+
stats_of_row_groups.insert(*index, BatchStatistics::from(column_stats));
80+
}
81+
82+
for (rg_idx, _) in rgs.iter().enumerate() {
83+
let mut cols_stats = HashMap::with_capacity(stats.capacity());
84+
for index in indices {
85+
let col_stats = stats_of_row_groups[index].get(rg_idx);
86+
cols_stats.insert(*index as u32, col_stats);
87+
}
88+
stats.push(cols_stats);
89+
}
90+
91+
Ok(stats)
92+
}
93+
}
94+
95+
/// A temporary struct to present [`pread::statistics::Statistics`].
96+
///
97+
/// Convert the inner fields into Databend data structures.
98+
pub struct BatchStatistics {
99+
pub null_count: UInt64Column,
100+
pub distinct_count: UInt64Column,
101+
pub min_values: ColumnRef,
102+
pub max_values: ColumnRef,
103+
}
104+
105+
impl BatchStatistics {
106+
pub fn get(&self, index: usize) -> ColumnStatistics {
107+
ColumnStatistics {
108+
min: self.min_values.get(index),
109+
max: self.max_values.get(index),
110+
null_count: self.null_count.get_u64(index).unwrap(),
111+
in_memory_size: 0, // this field is not used.
112+
distinct_of_values: self.distinct_count.get_u64(index).ok(),
113+
}
114+
}
115+
}
116+
117+
impl From<pread::statistics::Statistics> for BatchStatistics {
118+
fn from(stats: pread::statistics::Statistics) -> Self {
119+
let null_count = UInt64Column::from_arrow_array(&*stats.null_count);
120+
let distinct_count = UInt64Column::from_arrow_array(&*stats.distinct_count);
121+
let min_values = stats.min_value.clone().into_column();
122+
let max_values = stats.max_value.clone().into_column();
123+
Self {
124+
null_count,
125+
distinct_count,
126+
min_values,
127+
max_values,
128+
}
129+
}
46130
}

src/query/storages/parquet/src/parquet_reader/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct ParquetReader {
5858
/// The actual schema used to read parquet. It will be converted to [`common_datavalues::DataSchema`] when output [`common_datablocks::DataBlock`].
5959
///
6060
/// The reason of using [`ArrowSchema`] to read parquet is that
61-
/// There are some types that Databend not support such as Timestmap of nanoseconds.
61+
/// There are some types that Databend not support such as Timestamp of nanoseconds.
6262
/// Such types will be convert to supported types after deserialization.
6363
projected_arrow_schema: ArrowSchema,
6464
/// [`ColumnLeaves`] corresponding to the `projected_schema`.

0 commit comments

Comments
 (0)