Skip to content

Commit 9c1395f

Browse files
authored
fix query results for predicates referencing partition columns and data columns (apache#15935)
* fix query results for predicates referencing partition columns and data columns * fmt * add e2e test * newline
1 parent 6cc4953 commit 9c1395f

File tree

3 files changed

+79
-49
lines changed

3 files changed

+79
-49
lines changed

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -464,16 +464,16 @@ impl FileFormat for ParquetFormat {
464464
fn supports_filters_pushdown(
465465
&self,
466466
file_schema: &Schema,
467-
table_schema: &Schema,
467+
_table_schema: &Schema,
468468
filters: &[&Expr],
469469
) -> Result<FilePushdownSupport> {
470470
if !self.options().global.pushdown_filters {
471471
return Ok(FilePushdownSupport::NoSupport);
472472
}
473473

474-
let all_supported = filters.iter().all(|filter| {
475-
can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema)
476-
});
474+
let all_supported = filters
475+
.iter()
476+
.all(|filter| can_expr_be_pushed_down_with_schemas(filter, file_schema));
477477

478478
Ok(if all_supported {
479479
FilePushdownSupport::Supported

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 8 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ struct PushdownChecker<'schema> {
299299
non_primitive_columns: bool,
300300
/// Does the expression reference any columns that are in the table
301301
/// schema but not in the file schema?
302+
/// This includes partition columns and projected columns.
302303
projected_columns: bool,
303304
// Indices into the table schema of the columns required to evaluate the expression
304305
required_columns: BTreeSet<usize>,
@@ -387,13 +388,12 @@ fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bo
387388
/// Otherwise, true.
388389
pub fn can_expr_be_pushed_down_with_schemas(
389390
expr: &datafusion_expr::Expr,
390-
_file_schema: &Schema,
391-
table_schema: &Schema,
391+
file_schema: &Schema,
392392
) -> bool {
393393
let mut can_be_pushed = true;
394394
expr.apply(|expr| match expr {
395395
datafusion_expr::Expr::Column(column) => {
396-
can_be_pushed &= !would_column_prevent_pushdown(column.name(), table_schema);
396+
can_be_pushed &= !would_column_prevent_pushdown(column.name(), file_schema);
397397
Ok(if can_be_pushed {
398398
TreeNodeRecursion::Jump
399399
} else {
@@ -649,8 +649,6 @@ mod test {
649649

650650
#[test]
651651
fn nested_data_structures_prevent_pushdown() {
652-
let table_schema = get_basic_table_schema();
653-
654652
let file_schema = Schema::new(vec![Field::new(
655653
"list_col",
656654
DataType::Struct(Fields::empty()),
@@ -659,49 +657,31 @@ mod test {
659657

660658
let expr = col("list_col").is_not_null();
661659

662-
assert!(!can_expr_be_pushed_down_with_schemas(
663-
&expr,
664-
&file_schema,
665-
&table_schema
666-
));
660+
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
667661
}
668662

669663
#[test]
670-
fn projected_columns_prevent_pushdown() {
671-
let table_schema = get_basic_table_schema();
672-
664+
fn projected_or_partition_columns_prevent_pushdown() {
673665
let file_schema =
674666
Schema::new(vec![Field::new("existing_col", DataType::Int64, true)]);
675667

676668
let expr = col("nonexistent_column").is_null();
677669

678-
assert!(!can_expr_be_pushed_down_with_schemas(
679-
&expr,
680-
&file_schema,
681-
&table_schema
682-
));
670+
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
683671
}
684672

685673
#[test]
686674
fn basic_expr_doesnt_prevent_pushdown() {
687-
let table_schema = get_basic_table_schema();
688-
689675
let file_schema =
690676
Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]);
691677

692678
let expr = col("string_col").is_null();
693679

694-
assert!(can_expr_be_pushed_down_with_schemas(
695-
&expr,
696-
&file_schema,
697-
&table_schema
698-
));
680+
assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
699681
}
700682

701683
#[test]
702684
fn complex_expr_doesnt_prevent_pushdown() {
703-
let table_schema = get_basic_table_schema();
704-
705685
let file_schema = Schema::new(vec![
706686
Field::new("string_col", DataType::Utf8, true),
707687
Field::new("bigint_col", DataType::Int64, true),
@@ -711,23 +691,6 @@ mod test {
711691
.is_not_null()
712692
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)))));
713693

714-
assert!(can_expr_be_pushed_down_with_schemas(
715-
&expr,
716-
&file_schema,
717-
&table_schema
718-
));
719-
}
720-
721-
fn get_basic_table_schema() -> Schema {
722-
let testdata = datafusion_common::test_util::parquet_test_data();
723-
let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
724-
.expect("opening file");
725-
726-
let reader = SerializedFileReader::new(file).expect("creating reader");
727-
728-
let metadata = reader.metadata();
729-
730-
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
731-
.expect("parsing schema")
694+
assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
732695
}
733696
}

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,70 @@ DROP TABLE t;
156156

157157
statement ok
158158
DROP TABLE t_pushdown;
159+
160+
## Test filter pushdown with a predicate that references both a partition column and a file column
161+
statement ok
162+
set datafusion.execution.parquet.pushdown_filters = true;
163+
164+
## Create table
165+
statement ok
166+
CREATE EXTERNAL TABLE t_pushdown(part text, val text)
167+
STORED AS PARQUET
168+
PARTITIONED BY (part)
169+
LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/';
170+
171+
statement ok
172+
COPY (
173+
SELECT arrow_cast('a', 'Utf8') AS val
174+
) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet'
175+
STORED AS PARQUET;
176+
177+
statement ok
178+
COPY (
179+
SELECT arrow_cast('b', 'Utf8') AS val
180+
) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet'
181+
STORED AS PARQUET;
182+
183+
statement ok
184+
COPY (
185+
SELECT arrow_cast('xyz', 'Utf8') AS val
186+
) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet'
187+
STORED AS PARQUET;
188+
189+
query TT
190+
select * from t_pushdown where part == val
191+
----
192+
a a
193+
b b
194+
195+
query TT
196+
select * from t_pushdown where part != val
197+
----
198+
xyz c
199+
200+
# If we reference both a file and partition column the predicate cannot be pushed down
201+
query TT
202+
EXPLAIN select * from t_pushdown where part != val
203+
----
204+
logical_plan
205+
01)Filter: t_pushdown.val != t_pushdown.part
206+
02)--TableScan: t_pushdown projection=[val, part], partial_filters=[t_pushdown.val != t_pushdown.part]
207+
physical_plan
208+
01)CoalesceBatchesExec: target_batch_size=8192
209+
02)--FilterExec: val@0 != part@1
210+
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
211+
04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != part@1
212+
213+
# If we reference only a partition column it gets evaluted during the listing phase
214+
query TT
215+
EXPLAIN select * from t_pushdown where part != 'a';
216+
----
217+
logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part != Utf8("a")]
218+
physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet
219+
220+
# And if we reference only a file column it gets pushed down
221+
query TT
222+
EXPLAIN select * from t_pushdown where val != 'c';
223+
----
224+
logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.val != Utf8("c")]
225+
physical_plan DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)]

0 commit comments

Comments
 (0)