Skip to content

Commit d74b8ac

Browse files
committed
Merge remote-tracking branch 'origin/main' into column-orders-parquet-stats
2 parents 56eb96a + 6cf74d6 commit d74b8ac

File tree

26 files changed

+532
-292
lines changed

26 files changed

+532
-292
lines changed

datafusion-examples/examples/parquet_index.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use arrow::datatypes::{Int32Type, SchemaRef};
2323
use arrow::util::pretty::pretty_format_batches;
2424
use async_trait::async_trait;
2525
use datafusion::catalog::Session;
26+
use datafusion::common::pruning::PruningStatistics;
2627
use datafusion::common::{
2728
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
2829
};
@@ -39,9 +40,7 @@ use datafusion::parquet::arrow::{
3940
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
4041
};
4142
use datafusion::physical_expr::PhysicalExpr;
42-
use datafusion::physical_optimizer::pruning::{
43-
ColumnOrdering, PruningPredicate, PruningStatistics,
44-
};
43+
use datafusion::physical_optimizer::pruning::{ColumnOrdering, PruningPredicate};
4544
use datafusion::physical_plan::ExecutionPlan;
4645
use datafusion::prelude::*;
4746
use std::any::Any;

datafusion-examples/examples/pruning.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@ use std::sync::Arc;
2020

2121
use arrow::array::{ArrayRef, BooleanArray, Int32Array};
2222
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
23+
use datafusion::common::pruning::PruningStatistics;
2324
use datafusion::common::{DFSchema, ScalarValue};
2425
use datafusion::execution::context::ExecutionProps;
2526
use datafusion::physical_expr::create_physical_expr;
26-
use datafusion::physical_optimizer::pruning::{
27-
ColumnOrdering, PruningPredicate, PruningStatistics,
28-
};
27+
use datafusion::physical_optimizer::pruning::{ColumnOrdering, PruningPredicate};
2928
use datafusion::prelude::*;
3029

3130
/// This example shows how to use DataFusion's `PruningPredicate` to prove

datafusion/common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub mod format;
4747
pub mod hash_utils;
4848
pub mod instant;
4949
pub mod parsers;
50+
pub mod pruning;
5051
pub mod rounding;
5152
pub mod scalar;
5253
pub mod spans;

datafusion/common/src/pruning.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{ArrayRef, BooleanArray};
19+
use std::collections::HashSet;
20+
21+
use crate::Column;
22+
use crate::ScalarValue;
23+
24+
/// A source of runtime statistical information to [`PruningPredicate`]s.
25+
///
26+
/// # Supported Information
27+
///
28+
/// 1. Minimum and maximum values for columns
29+
///
30+
/// 2. Null counts and row counts for columns
31+
///
32+
/// 3. Whether the values in a column are contained in a set of literals
33+
///
34+
/// # Vectorized Interface
35+
///
36+
/// Information for containers / files are returned as Arrow [`ArrayRef`], so
37+
/// the evaluation happens once on a single `RecordBatch`, which amortizes the
38+
/// overhead of evaluating the predicate. This is important when pruning 1000s
39+
/// of containers which often happens in analytic systems that have 1000s of
40+
/// potential files to consider.
41+
///
42+
/// For example, for the following three files with a single column `a`:
43+
/// ```text
44+
/// file1: column a: min=5, max=10
45+
/// file2: column a: No stats
46+
/// file2: column a: min=20, max=30
47+
/// ```
48+
///
49+
/// PruningStatistics would return:
50+
///
51+
/// ```text
52+
/// min_values("a") -> Some([5, Null, 20])
53+
/// max_values("a") -> Some([10, Null, 30])
54+
/// min_values("X") -> None
55+
/// ```
56+
///
57+
/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html
58+
pub trait PruningStatistics {
59+
/// Return the minimum values for the named column, if known.
60+
///
61+
/// If the minimum value for a particular container is not known, the
62+
/// returned array should have `null` in that row. If the minimum value is
63+
/// not known for any row, return `None`.
64+
///
65+
/// Note: the returned array must contain [`Self::num_containers`] rows
66+
fn min_values(&self, column: &Column) -> Option<ArrayRef>;
67+
68+
/// Return the maximum values for the named column, if known.
69+
///
70+
/// See [`Self::min_values`] for when to return `None` and null values.
71+
///
72+
/// Note: the returned array must contain [`Self::num_containers`] rows
73+
fn max_values(&self, column: &Column) -> Option<ArrayRef>;
74+
75+
/// Return the number of containers (e.g. Row Groups) being pruned with
76+
/// these statistics.
77+
///
78+
/// This value corresponds to the size of the [`ArrayRef`] returned by
79+
/// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
80+
/// and [`Self::row_counts`].
81+
fn num_containers(&self) -> usize;
82+
83+
/// Return the number of null values for the named column as an
84+
/// [`UInt64Array`]
85+
///
86+
/// See [`Self::min_values`] for when to return `None` and null values.
87+
///
88+
/// Note: the returned array must contain [`Self::num_containers`] rows
89+
///
90+
/// [`UInt64Array`]: arrow::array::UInt64Array
91+
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
92+
93+
/// Return the number of rows for the named column in each container
94+
/// as an [`UInt64Array`].
95+
///
96+
/// See [`Self::min_values`] for when to return `None` and null values.
97+
///
98+
/// Note: the returned array must contain [`Self::num_containers`] rows
99+
///
100+
/// [`UInt64Array`]: arrow::array::UInt64Array
101+
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
102+
103+
/// Returns [`BooleanArray`] where each row represents information known
104+
/// about specific literal `values` in a column.
105+
///
106+
/// For example, Parquet Bloom Filters implement this API to communicate
107+
/// that `values` are known not to be present in a Row Group.
108+
///
109+
/// The returned array has one row for each container, with the following
110+
/// meanings:
111+
/// * `true` if the values in `column` ONLY contain values from `values`
112+
/// * `false` if the values in `column` are NOT ANY of `values`
113+
/// * `null` if the neither of the above holds or is unknown.
114+
///
115+
/// If these statistics can not determine column membership for any
116+
/// container, return `None` (the default).
117+
///
118+
/// Note: the returned array must contain [`Self::num_containers`] rows
119+
fn contained(
120+
&self,
121+
column: &Column,
122+
values: &HashSet<ScalarValue>,
123+
) -> Option<BooleanArray>;
124+
}

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
550550

551551
ListingOptions::new(Arc::new(file_format))
552552
.with_file_extension(self.file_extension)
553-
.with_target_partitions(config.target_partitions())
553+
.with_session_config_options(config)
554554
.with_table_partition_cols(self.table_partition_cols.clone())
555555
.with_file_sort_order(self.file_sort_order.clone())
556556
}
@@ -585,9 +585,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
585585

586586
ListingOptions::new(Arc::new(file_format))
587587
.with_file_extension(self.file_extension)
588-
.with_target_partitions(config.target_partitions())
589588
.with_table_partition_cols(self.table_partition_cols.clone())
590589
.with_file_sort_order(self.file_sort_order.clone())
590+
.with_session_config_options(config)
591591
}
592592

593593
async fn get_resolved_schema(
@@ -615,7 +615,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
615615

616616
ListingOptions::new(Arc::new(file_format))
617617
.with_file_extension(self.file_extension)
618-
.with_target_partitions(config.target_partitions())
618+
.with_session_config_options(config)
619619
.with_table_partition_cols(self.table_partition_cols.clone())
620620
.with_file_sort_order(self.file_sort_order.clone())
621621
}
@@ -643,7 +643,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
643643

644644
ListingOptions::new(Arc::new(file_format))
645645
.with_file_extension(self.file_extension)
646-
.with_target_partitions(config.target_partitions())
646+
.with_session_config_options(config)
647647
.with_table_partition_cols(self.table_partition_cols.clone())
648648
}
649649

@@ -669,7 +669,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> {
669669

670670
ListingOptions::new(Arc::new(file_format))
671671
.with_file_extension(self.file_extension)
672-
.with_target_partitions(config.target_partitions())
672+
.with_session_config_options(config)
673673
.with_table_partition_cols(self.table_partition_cols.clone())
674674
}
675675

datafusion/core/src/datasource/listing/table.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use datafusion_catalog::TableProvider;
3232
use datafusion_common::{config_err, DataFusionError, Result};
3333
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
3434
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
35+
use datafusion_execution::config::SessionConfig;
3536
use datafusion_expr::dml::InsertOp;
3637
use datafusion_expr::{Expr, TableProviderFilterPushDown};
3738
use datafusion_expr::{SortExpr, TableType};
@@ -195,7 +196,8 @@ impl ListingTableConfig {
195196

196197
let listing_options = ListingOptions::new(file_format)
197198
.with_file_extension(listing_file_extension)
198-
.with_target_partitions(state.config().target_partitions());
199+
.with_target_partitions(state.config().target_partitions())
200+
.with_collect_stat(state.config().collect_statistics());
199201

200202
Ok(Self {
201203
table_paths: self.table_paths,
@@ -313,18 +315,29 @@ impl ListingOptions {
313315
/// - use default file extension filter
314316
/// - no input partition to discover
315317
/// - one target partition
316-
/// - stat collection
318+
/// - do not collect statistics
317319
pub fn new(format: Arc<dyn FileFormat>) -> Self {
318320
Self {
319321
file_extension: format.get_ext(),
320322
format,
321323
table_partition_cols: vec![],
322-
collect_stat: true,
324+
collect_stat: false,
323325
target_partitions: 1,
324326
file_sort_order: vec![],
325327
}
326328
}
327329

330+
/// Set options from [`SessionConfig`] and returns self.
331+
///
332+
/// Currently this sets `target_partitions` and `collect_stat`
333+
/// but if more options are added in the future that need to be coordinated
334+
/// they will be synchronized thorugh this method.
335+
pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
336+
self = self.with_target_partitions(config.target_partitions());
337+
self = self.with_collect_stat(config.collect_statistics());
338+
self
339+
}
340+
328341
/// Set file extension on [`ListingOptions`] and returns self.
329342
///
330343
/// # Example
@@ -1282,7 +1295,9 @@ mod tests {
12821295

12831296
#[tokio::test]
12841297
async fn read_single_file() -> Result<()> {
1285-
let ctx = SessionContext::new();
1298+
let ctx = SessionContext::new_with_config(
1299+
SessionConfig::new().with_collect_statistics(true),
1300+
);
12861301

12871302
let table = load_table(&ctx, "alltypes_plain.parquet").await?;
12881303
let projection = None;
@@ -1309,7 +1324,7 @@ mod tests {
13091324

13101325
#[cfg(feature = "parquet")]
13111326
#[tokio::test]
1312-
async fn load_table_stats_by_default() -> Result<()> {
1327+
async fn do_not_load_table_stats_by_default() -> Result<()> {
13131328
use crate::datasource::file_format::parquet::ParquetFormat;
13141329

13151330
let testdata = crate::test_util::parquet_test_data();
@@ -1321,6 +1336,22 @@ mod tests {
13211336

13221337
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
13231338
let schema = opt.infer_schema(&state, &table_path).await?;
1339+
let config = ListingTableConfig::new(table_path.clone())
1340+
.with_listing_options(opt)
1341+
.with_schema(schema);
1342+
let table = ListingTable::try_new(config)?;
1343+
1344+
let exec = table.scan(&state, None, &[], None).await?;
1345+
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
1346+
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
1347+
assert_eq!(
1348+
exec.partition_statistics(None)?.total_byte_size,
1349+
Precision::Absent
1350+
);
1351+
1352+
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
1353+
.with_collect_stat(true);
1354+
let schema = opt.infer_schema(&state, &table_path).await?;
13241355
let config = ListingTableConfig::new(table_path)
13251356
.with_listing_options(opt)
13261357
.with_schema(schema);

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,8 @@ impl TableProviderFactory for ListingTableFactory {
111111
let table_path = ListingTableUrl::parse(&cmd.location)?;
112112

113113
let options = ListingOptions::new(file_format)
114-
.with_collect_stat(state.config().collect_statistics())
115114
.with_file_extension(file_extension)
116-
.with_target_partitions(state.config().target_partitions())
115+
.with_session_config_options(session_state.config())
117116
.with_table_partition_cols(table_partition_cols);
118117

119118
options

0 commit comments

Comments
 (0)