Skip to content

Commit d89a29e

Browse files
committed
Chore: Add basic filter pushdown tests
1 parent ecc89f9 commit d89a29e

File tree

4 files changed

+241
-15
lines changed

4 files changed

+241
-15
lines changed
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
19+
use datafusion::{
20+
datasource::object_store::ObjectStoreUrl,
21+
logical_expr::Operator,
22+
physical_plan::{
23+
expressions::{BinaryExpr, Column, Literal},
24+
PhysicalExpr,
25+
},
26+
scalar::ScalarValue,
27+
};
28+
use datafusion_common::config::{ConfigOptions, TableParquetOptions};
29+
use datafusion_common::internal_err;
30+
use datafusion_datasource::file::FileSource;
31+
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
32+
use datafusion_datasource::source::DataSourceExec;
33+
use datafusion_datasource_parquet::source::ParquetSource;
34+
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
35+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
36+
use datafusion_physical_plan::filter::FilterExec;
37+
use datafusion_physical_plan::{displayable, ExecutionPlan};
38+
use insta;
39+
use std::fmt::{Display, Formatter};
40+
use std::sync::{Arc, OnceLock};
41+
42+
#[test]
43+
fn test_pushdown_into_scan() {
44+
let scan = parquet_scan();
45+
let predicate = col_lit_predicate("a", "foo", schema());
46+
let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
47+
48+
// expect the predicate to be pushed down into the DataSource
49+
insta::assert_snapshot!(
50+
OptimizationTest::new(plan, FilterPushdown{}),
51+
@r"
52+
OptimizationTest:
53+
input:
54+
- FilterExec: a@0 = foo
55+
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet
56+
output:
57+
Ok:
58+
- FilterExec: a@0 = foo
59+
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet, predicate=a@0 = foo
60+
"
61+
);
62+
}
63+
64+
#[test]
65+
fn test_parquet_pushdown() {
66+
// filter should be pushed down into the parquet scan with two filters
67+
let scan = parquet_scan();
68+
let predicate1 = col_lit_predicate("a", "foo", schema());
69+
let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap());
70+
let predicate2 = col_lit_predicate("b", "bar", schema());
71+
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
72+
73+
insta::assert_snapshot!(
74+
OptimizationTest::new(plan, FilterPushdown{}),
75+
@r"
76+
OptimizationTest:
77+
input:
78+
- FilterExec: b@1 = bar
79+
- FilterExec: a@0 = foo
80+
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet
81+
output:
82+
Ok:
83+
- FilterExec: b@1 = bar
84+
- FilterExec: a@0 = foo AND b@1 = bar
85+
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet, predicate=b@1 = bar AND a@0 = foo
86+
"
87+
);
88+
}
89+
90+
/// Schema:
91+
/// a: String
92+
/// b: String
93+
/// c: f64
94+
static TEST_SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
95+
96+
fn schema() -> &'static SchemaRef {
97+
TEST_SCHEMA.get_or_init(|| {
98+
let fields = vec![
99+
Field::new("a", DataType::Utf8, false),
100+
Field::new("b", DataType::Utf8, false),
101+
Field::new("c", DataType::Float64, false),
102+
];
103+
Arc::new(Schema::new(fields))
104+
})
105+
}
106+
107+
/// Return a execution plan that reads from a parquet file
108+
fn parquet_scan() -> Arc<dyn ExecutionPlan> {
109+
let schema = schema();
110+
let source = ParquetSource::new(TableParquetOptions::default())
111+
.with_schema(Arc::clone(schema));
112+
let base_config = FileScanConfigBuilder::new(
113+
ObjectStoreUrl::parse("test://").unwrap(),
114+
Arc::clone(schema),
115+
source,
116+
)
117+
.build();
118+
DataSourceExec::from_data_source(base_config)
119+
}
120+
121+
/// Returns a predicate that is a binary expression col = lit
122+
fn col_lit_predicate(
123+
column_name: &str,
124+
scalar_value: impl Into<ScalarValue>,
125+
schema: &Schema,
126+
) -> Arc<dyn PhysicalExpr> {
127+
let scalar_value = scalar_value.into();
128+
Arc::new(BinaryExpr::new(
129+
Arc::new(Column::new_with_schema(column_name, schema).unwrap()),
130+
Operator::Eq,
131+
Arc::new(Literal::new(scalar_value)),
132+
))
133+
}
134+
135+
/// A harness for testing physical optimizers.
136+
///
137+
/// You can use this to test the output of a physical optimizer rule using insta snapshots
138+
#[derive(Debug)]
139+
pub struct OptimizationTest {
140+
input: Vec<String>,
141+
output: Result<Vec<String>, String>,
142+
}
143+
144+
impl OptimizationTest {
145+
pub fn new<O>(input_plan: Arc<dyn ExecutionPlan>, opt: O) -> Self
146+
where
147+
O: PhysicalOptimizerRule,
148+
{
149+
Self::new_with_config(input_plan, opt, &ConfigOptions::default())
150+
}
151+
152+
pub fn new_with_config<O>(
153+
input_plan: Arc<dyn ExecutionPlan>,
154+
opt: O,
155+
config: &ConfigOptions,
156+
) -> Self
157+
where
158+
O: PhysicalOptimizerRule,
159+
{
160+
let input = format_execution_plan(&input_plan);
161+
162+
let input_schema = input_plan.schema();
163+
164+
let output_result = opt.optimize(input_plan, config);
165+
let output = output_result
166+
.and_then(|plan| {
167+
if opt.schema_check() && (plan.schema() != input_schema) {
168+
internal_err!(
169+
"Schema mismatch:\n\nBefore:\n{:?}\n\nAfter:\n{:?}",
170+
input_schema,
171+
plan.schema()
172+
)
173+
} else {
174+
Ok(plan)
175+
}
176+
})
177+
.map(|plan| format_execution_plan(&plan))
178+
.map_err(|e| e.to_string());
179+
180+
Self { input, output }
181+
}
182+
}
183+
184+
impl Display for OptimizationTest {
185+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
186+
writeln!(f, "OptimizationTest:")?;
187+
writeln!(f, " input:")?;
188+
for line in &self.input {
189+
writeln!(f, " - {line}")?;
190+
}
191+
writeln!(f, " output:")?;
192+
match &self.output {
193+
Ok(output) => {
194+
writeln!(f, " Ok:")?;
195+
for line in output {
196+
writeln!(f, " - {line}")?;
197+
}
198+
}
199+
Err(err) => {
200+
writeln!(f, " Err: {err}")?;
201+
}
202+
}
203+
Ok(())
204+
}
205+
}
206+
207+
pub fn format_execution_plan(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
208+
format_lines(&displayable(plan.as_ref()).indent(false).to_string())
209+
}
210+
211+
fn format_lines(s: &str) -> Vec<String> {
212+
s.trim().split('\n').map(|s| s.to_string()).collect()
213+
}

datafusion/core/tests/physical_optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod aggregate_statistics;
2121
mod combine_partial_final_agg;
2222
mod enforce_distribution;
2323
mod enforce_sorting;
24+
mod filter_pushdown;
2425
mod join_selection;
2526
mod limit_pushdown;
2627
mod limited_distinct_aggregation;

datafusion/datasource/src/source.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,3 +287,13 @@ impl DataSourceExec {
287287
})
288288
}
289289
}
290+
291+
/// Create a new `DataSourceExec` from a `DataSource`
292+
impl<S> From<S> for DataSourceExec
293+
where
294+
S: DataSource + 'static,
295+
{
296+
fn from(source: S) -> Self {
297+
Self::new(Arc::new(source))
298+
}
299+
}

datafusion/physical-optimizer/src/filter_pushdown.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -173,21 +173,21 @@ fn pushdown_filters(
173173
/// For example, consider the following plan:
174174
///
175175
/// ```text
176-
// ┌──────────────────────┐
177-
// │ CoalesceBatchesExec │
178-
// └──────────────────────┘
179-
// │
180-
// ▼
181-
// ┌──────────────────────┐
182-
// │ FilterExec │
183-
// │ filters = [ id=1] │
184-
// └──────────────────────┘
185-
// │
186-
// ▼
187-
// ┌──────────────────────┐
188-
// │ DataSourceExec │
189-
// │ projection = * │
190-
// └──────────────────────┘
176+
/// ┌──────────────────────┐
177+
/// │ CoalesceBatchesExec │
178+
/// └──────────────────────┘
179+
///
180+
///
181+
/// ┌──────────────────────┐
182+
/// │ FilterExec │
183+
/// │ filters = [ id=1] │
184+
/// └──────────────────────┘
185+
///
186+
///
187+
/// ┌──────────────────────┐
188+
/// │ DataSourceExec │
189+
/// │ projection = * │
190+
/// └──────────────────────┘
191191
/// ```
192192
///
193193
/// Our goal is to move the `id = 1` filter from the `FilterExec` node to the `DataSourceExec` node.
@@ -444,6 +444,8 @@ fn pushdown_filters(
444444
/// Now as we fill our `TopK` heap we can push down the state of the heap to the `DataSourceExec` node
445445
/// to avoid reading files / row groups / pages / rows that could not possibly be in the top 10.
446446
/// This is implemented in datafusion/physical-plan/src/sorts/sort_filters.rs.
447+
///
448+
// TODO potentially rename this to align with logical optimizer `PushdownFilter`
447449
#[derive(Debug)]
448450
pub struct FilterPushdown {}
449451

0 commit comments

Comments
 (0)