From 0fc2da6c1e60e5f8081388f4c65b34e3a16320dd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 3 May 2025 06:06:47 -0500 Subject: [PATCH 1/4] fix query results for predicates referencing partition columns and data columns --- .../datasource-parquet/src/file_format.rs | 4 +-- .../datasource-parquet/src/row_filter.rs | 33 +++---------------- 2 files changed, 6 insertions(+), 31 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index a0ca46422786..392bb502fc13 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -464,7 +464,7 @@ impl FileFormat for ParquetFormat { fn supports_filters_pushdown( &self, file_schema: &Schema, - table_schema: &Schema, + _table_schema: &Schema, filters: &[&Expr], ) -> Result { if !self.options().global.pushdown_filters { @@ -472,7 +472,7 @@ impl FileFormat for ParquetFormat { } let all_supported = filters.iter().all(|filter| { - can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema) + can_expr_be_pushed_down_with_schemas(filter, file_schema) }); Ok(if all_supported { diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 2d2993c29a6f..e03aaa135dbf 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -299,6 +299,7 @@ struct PushdownChecker<'schema> { non_primitive_columns: bool, /// Does the expression reference any columns that are in the table /// schema but not in the file schema? + /// This includes partition columns and projected columns. projected_columns: bool, // Indices into the table schema of the columns required to evaluate the expression required_columns: BTreeSet, @@ -387,13 +388,12 @@ fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bo /// Otherwise, true. pub fn can_expr_be_pushed_down_with_schemas( expr: &datafusion_expr::Expr, - _file_schema: &Schema, - table_schema: &Schema, + file_schema: &Schema, ) -> bool { let mut can_be_pushed = true; expr.apply(|expr| match expr { datafusion_expr::Expr::Column(column) => { - can_be_pushed &= !would_column_prevent_pushdown(column.name(), table_schema); + can_be_pushed &= !would_column_prevent_pushdown(column.name(), file_schema); Ok(if can_be_pushed { TreeNodeRecursion::Jump } else { @@ -649,8 +649,6 @@ mod test { #[test] fn nested_data_structures_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![Field::new( "list_col", DataType::Struct(Fields::empty()), @@ -662,14 +660,11 @@ mod test { assert!(!can_expr_be_pushed_down_with_schemas( &expr, &file_schema, - &table_schema )); } #[test] - fn projected_columns_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - + fn projected_or_partition_columns_prevent_pushdown() { let file_schema = Schema::new(vec![Field::new("existing_col", DataType::Int64, true)]); @@ -678,14 +673,11 @@ mod test { assert!(!can_expr_be_pushed_down_with_schemas( &expr, &file_schema, - &table_schema )); } #[test] fn basic_expr_doesnt_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]); @@ -694,14 +686,11 @@ mod test { assert!(can_expr_be_pushed_down_with_schemas( &expr, &file_schema, - &table_schema )); } #[test] fn complex_expr_doesnt_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![ Field::new("string_col", DataType::Utf8, true), Field::new("bigint_col", DataType::Int64, true), @@ -714,20 +703,6 @@ mod test { assert!(can_expr_be_pushed_down_with_schemas( &expr, &file_schema, - &table_schema )); } - - fn get_basic_table_schema() -> Schema { - let testdata = datafusion_common::test_util::parquet_test_data(); - let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) - .expect("opening file"); - - let reader = SerializedFileReader::new(file).expect("creating reader"); - - let metadata = reader.metadata(); - - parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) - .expect("parsing schema") - } } From 854c264dbef354fa080cbfd26c94305557202f53 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 3 May 2025 06:09:47 -0500 Subject: [PATCH 2/4] fmt --- .../datasource-parquet/src/file_format.rs | 6 +++--- .../datasource-parquet/src/row_filter.rs | 20 ++++--------------- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 392bb502fc13..e1d393caa8f3 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -471,9 +471,9 @@ impl FileFormat for ParquetFormat { return Ok(FilePushdownSupport::NoSupport); } - let all_supported = filters.iter().all(|filter| { - can_expr_be_pushed_down_with_schemas(filter, file_schema) - }); + let all_supported = filters + .iter() + .all(|filter| can_expr_be_pushed_down_with_schemas(filter, file_schema)); Ok(if all_supported { FilePushdownSupport::Supported diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index e03aaa135dbf..d7bbe30c8943 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -657,10 +657,7 @@ mod test { let expr = col("list_col").is_not_null(); - assert!(!can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - )); + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } #[test] @@ -670,10 +667,7 @@ mod test { let expr = col("nonexistent_column").is_null(); - assert!(!can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - )); + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } #[test] @@ -683,10 +677,7 @@ mod test { let expr = col("string_col").is_null(); - assert!(can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - )); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } #[test] @@ -700,9 +691,6 @@ mod test { .is_not_null() .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5))))); - assert!(can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - )); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } } From 938e8ad9e8181b756352cccb33307b995f038f18 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 3 May 2025 06:24:22 -0500 Subject: [PATCH 3/4] add e2e test --- .../test_files/parquet_filter_pushdown.slt | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 758113b70835..ecb24eb10078 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -156,3 +156,71 @@ DROP TABLE t; statement ok DROP TABLE t_pushdown; + +## Test filter pushdown with a predicate that references both a partition column and a file column +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +## Create table +statement ok +CREATE EXTERNAL TABLE t_pushdown(part text, val text) +STORED AS PARQUET +PARTITIONED BY (part) +LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/'; + +statement ok +COPY ( + SELECT arrow_cast('a', 'Utf8') AS val +) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet' +STORED AS PARQUET; + +statement ok +COPY ( + SELECT arrow_cast('b', 'Utf8') AS val +) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet' +STORED AS PARQUET; + +statement ok +COPY ( + SELECT arrow_cast('xyz', 'Utf8') AS val +) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet' +STORED AS PARQUET; + +query TT +select * from t_pushdown where part == val +---- +a a +b b + +query TT +select * from t_pushdown where part != val +---- +xyz c + +# If we reference both a file and partition column the predicate cannot be pushed down +query TT +EXPLAIN select * from t_pushdown where part != val +---- +logical_plan +01)Filter: t_pushdown.val != t_pushdown.part +02)--TableScan: t_pushdown projection=[val, part], partial_filters=[t_pushdown.val != t_pushdown.part] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: val@0 != part@1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != part@1 + + +# If we reference only a partition column it gets evaluted during the listing phase +query TT +EXPLAIN select * from t_pushdown where part != 'a'; +---- +logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part != Utf8("a")] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet + +# And if we reference only a file column it gets pushed down +query TT +EXPLAIN select * from t_pushdown where val != 'c'; +---- +logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.val != Utf8("c")] +physical_plan DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)] From 59f8d1bcd781b1ede788a2c8ee2931d63322263c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 3 May 2025 06:24:58 -0500 Subject: [PATCH 4/4] newline --- datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index ecb24eb10078..a32db2ff0524 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -210,7 +210,6 @@ physical_plan 03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != part@1 - # If we reference only a partition column it gets evaluted during the listing phase query TT EXPLAIN select * from t_pushdown where part != 'a';