Skip to content

Commit cd678e7

Browse files
committed
Add test for ordering of predicate pushdown into parquet
1 parent 17fe504 commit cd678e7

File tree

3 files changed

+83
-31
lines changed

3 files changed

+83
-31
lines changed

datafusion/datasource-parquet/src/source.rs

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use datafusion_physical_expr::conjunction;
4141
use datafusion_physical_expr_common::physical_expr::fmt_sql;
4242
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4343
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
44-
use datafusion_physical_plan::filter_pushdown::PredicateSupport;
4544
use datafusion_physical_plan::filter_pushdown::PredicateSupports;
4645
use datafusion_physical_plan::metrics::Count;
4746
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -613,22 +612,16 @@ impl FileSource for ParquetSource {
613612
let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;
614613

615614
let mut source = self.clone();
616-
let mut allowed_filters = vec![];
617-
let mut remaining_filters = vec![];
618-
for filter in &filters {
619-
if can_expr_be_pushed_down_with_schemas(filter, &file_schema) {
620-
// This filter can be pushed down
621-
allowed_filters.push(Arc::clone(filter));
622-
} else {
623-
// This filter cannot be pushed down
624-
remaining_filters.push(Arc::clone(filter));
625-
}
626-
}
627-
if allowed_filters.is_empty() {
615+
let filters = PredicateSupports::new_with_supported_check(
616+
filters,
617+
|filter| pushdown_filters && can_expr_be_pushed_down_with_schemas(filter, &file_schema),
618+
);
619+
if filters.is_all_unsupported() {
628620
// No filters can be pushed down, so we can just return the remaining filters
629621
// and avoid replacing the source in the physical plan.
630-
return Ok(FilterPushdownPropagation::unsupported(filters));
622+
return Ok(FilterPushdownPropagation::with_filters(filters));
631623
}
624+
let allowed_filters = filters.collect_supported();
632625
let predicate = match source.predicate {
633626
Some(predicate) => conjunction(
634627
std::iter::once(predicate).chain(allowed_filters.iter().cloned()),
@@ -637,23 +630,6 @@ impl FileSource for ParquetSource {
637630
};
638631
source.predicate = Some(predicate);
639632
let source = Arc::new(source);
640-
let filters = PredicateSupports::new(
641-
allowed_filters
642-
.into_iter()
643-
.map(|f| {
644-
if pushdown_filters {
645-
PredicateSupport::Supported(f)
646-
} else {
647-
PredicateSupport::Unsupported(f)
648-
}
649-
})
650-
.chain(
651-
remaining_filters
652-
.into_iter()
653-
.map(PredicateSupport::Unsupported),
654-
)
655-
.collect(),
656-
);
657633
Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source))
658634
}
659635
}

datafusion/physical-plan/src/filter_pushdown.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,25 @@ impl PredicateSupports {
6060
Self::new(pushdowns)
6161
}
6262

63+
/// Create a new [`PredicateSupport`] with filterrs marked as supported if
64+
/// `f` returns true and unsupported otherwise.
65+
pub fn new_with_supported_check(
66+
filters: Vec<Arc<dyn PhysicalExpr>>,
67+
check: impl Fn(&Arc<dyn PhysicalExpr>) -> bool,
68+
) -> Self {
69+
let pushdowns = filters
70+
.into_iter()
71+
.map(|f| {
72+
if check(&f) {
73+
PredicateSupport::Supported(f)
74+
} else {
75+
PredicateSupport::Unsupported(f)
76+
}
77+
})
78+
.collect();
79+
Self::new(pushdowns)
80+
}
81+
6382
/// Transform all filters to supported, returning a new [`PredicateSupports`]
6483
/// with all filters as [`PredicateSupport::Supported`].
6584
/// This does not modify the original [`PredicateSupport`].
@@ -102,6 +121,18 @@ impl PredicateSupports {
102121
.collect()
103122
}
104123

124+
/// Collect supported filters into a Vec, without removing them from the original
125+
/// [`PredicateSupport`].
126+
pub fn collect_supported(&self) -> Vec<Arc<dyn PhysicalExpr>> {
127+
self.0
128+
.iter()
129+
.filter_map(|f| match f {
130+
PredicateSupport::Supported(expr) => Some(Arc::clone(expr)),
131+
PredicateSupport::Unsupported(_) => None,
132+
})
133+
.collect()
134+
}
135+
105136
/// Collect all filters into a Vec, without removing them from the original
106137
/// FilterPushdowns.
107138
pub fn collect_all(self) -> Vec<Arc<dyn PhysicalExpr>> {
@@ -132,6 +163,16 @@ impl PredicateSupports {
132163
pub fn is_empty(&self) -> bool {
133164
self.0.is_empty()
134165
}
166+
167+
/// Check if all filters are supported.
168+
pub fn is_all_supported(&self) -> bool {
169+
self.0.iter().all(|f| matches!(f, PredicateSupport::Supported(_)))
170+
}
171+
172+
/// Check if all filters are unsupported.
173+
pub fn is_all_unsupported(&self) -> bool {
174+
self.0.iter().all(|f| matches!(f, PredicateSupport::Unsupported(_)))
175+
}
135176
}
136177

137178
impl IntoIterator for PredicateSupports {

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,3 +246,38 @@ physical_plan
246246
02)--FilterExec: val@0 != part@1
247247
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
248248
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != d AND val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != d OR d != val_max@1) AND val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c, d)]
249+
250+
# The order of filters should not matter
251+
query TT
252+
EXPLAIN select val, part from t_pushdown where part = 'a' AND part = val;
253+
----
254+
logical_plan
255+
01)Filter: t_pushdown.val = t_pushdown.part
256+
02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val = t_pushdown.part]
257+
physical_plan
258+
01)CoalesceBatchesExec: target_batch_size=8192
259+
02)--FilterExec: val@0 = part@1
260+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
261+
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet
262+
263+
query TT
264+
select val, part from t_pushdown where part = 'a' AND part = val;
265+
----
266+
a a
267+
268+
query TT
269+
EXPLAIN select val, part from t_pushdown where part = val AND part = 'a';
270+
----
271+
logical_plan
272+
01)Filter: t_pushdown.val = t_pushdown.part
273+
02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val = t_pushdown.part]
274+
physical_plan
275+
01)CoalesceBatchesExec: target_batch_size=8192
276+
02)--FilterExec: val@0 = part@1
277+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
278+
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet
279+
280+
query TT
281+
select val, part from t_pushdown where part = val AND part = 'a';
282+
----
283+
a a

0 commit comments

Comments
 (0)