diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 4efb67a37c99..8ab7f06dc7b5 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -614,6 +614,13 @@ config_namespace! { /// during aggregations, if possible pub enable_topk_aggregation: bool, default = true + /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase. + /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer + /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. + /// This means that if we already have 10 timestamps in the year 2025 + /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. + pub enable_dynamic_filter_pushdown: bool, default = true + /// When set to true, the optimizer will insert filters before a join between /// a nullable and non-nullable column to filter out nulls on the nullable side. This /// filter can add additional overhead when the file format does not fully support diff --git a/datafusion/core/tests/fuzz_cases/mod.rs b/datafusion/core/tests/fuzz_cases/mod.rs index 8ccc2a5bc131..9e01621c0257 100644 --- a/datafusion/core/tests/fuzz_cases/mod.rs +++ b/datafusion/core/tests/fuzz_cases/mod.rs @@ -21,6 +21,7 @@ mod join_fuzz; mod merge_fuzz; mod sort_fuzz; mod sort_query_fuzz; +mod topk_filter_pushdown; mod aggregation_fuzzer; mod equivalence; diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs new file mode 100644 index 000000000000..a5934882cbcc --- /dev/null +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::{Arc, LazyLock}; + +use arrow::array::{Int32Array, StringArray, StringDictionaryBuilder}; +use arrow::datatypes::Int32Type; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource_parquet::ParquetFormat; +use datafusion_execution::object_store::ObjectStoreUrl; +use itertools::Itertools; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ObjectStore, PutPayload}; +use parquet::arrow::ArrowWriter; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use tokio::sync::Mutex; +use tokio::task::JoinSet; + +#[derive(Clone)] +struct TestDataSet { + store: Arc, + schema: Arc, +} + +/// List of in memory parquet files with UTF8 data +// Use a mutex rather than LazyLock to allow for async initialization +static TESTFILES: LazyLock>> = + LazyLock::new(|| Mutex::new(vec![])); + +async fn test_files() -> Vec { + let files_mutex = &TESTFILES; + let mut files = files_mutex.lock().await; + if !files.is_empty() { + return (*files).clone(); + } + + let mut rng = StdRng::seed_from_u64(0); + + for nulls_in_ids in [false, true] { + for nulls_in_names in [false, true] { + for nulls_in_departments in [false, true] { + let store = Arc::new(InMemory::new()); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, nulls_in_ids), + Field::new("name", DataType::Utf8, nulls_in_names), + Field::new( + "department", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), + nulls_in_departments, + ), + ])); + + let name_choices = if nulls_in_names { + [Some("Alice"), Some("Bob"), None, Some("David"), None] + } else { + [ + Some("Alice"), + Some("Bob"), + Some("Charlie"), + Some("David"), + Some("Eve"), + ] + }; + + let department_choices = if nulls_in_departments { + [ + Some("Theater"), + Some("Engineering"), + None, + Some("Arts"), + None, + ] + } else { + [ + Some("Theater"), + Some("Engineering"), + Some("Healthcare"), + Some("Arts"), + Some("Music"), + ] + }; + + // Generate 5 files, some with overlapping or repeated ids some without + for i in 0..5 { + let num_batches = rng.random_range(1..3); + let mut batches = Vec::with_capacity(num_batches); + for _ in 0..num_batches { + let num_rows = 25; + let ids = Int32Array::from_iter((0..num_rows).map(|file| { + if nulls_in_ids { + if rng.random_bool(1.0 / 10.0) { + None + } else { + Some(rng.random_range(file..file + 5)) + } + } else { + Some(rng.random_range(file..file + 5)) + } + })); + let names = StringArray::from_iter((0..num_rows).map(|_| { + // randomly select a name + let idx = rng.random_range(0..name_choices.len()); + name_choices[idx].map(|s| s.to_string()) + })); + let mut departments = StringDictionaryBuilder::::new(); + for _ in 0..num_rows { + // randomly select a department + let idx = rng.random_range(0..department_choices.len()); + departments.append_option(department_choices[idx].as_ref()); + } + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(ids), + Arc::new(names), + Arc::new(departments.finish()), + ], + ) + .unwrap(); + batches.push(batch); + } + let mut buf = vec![]; + { + let mut writer = + ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + writer.flush().unwrap(); + } + writer.flush().unwrap(); + writer.finish().unwrap(); + } + let payload = PutPayload::from(buf); + let path = Path::from(format!("file_{i}.parquet")); + store.put(&path, payload).await.unwrap(); + } + files.push(TestDataSet { store, schema }); + } + } + } + (*files).clone() +} + +struct RunResult { + results: Vec, + explain_plan: String, +} + +async fn run_query_with_config( + query: &str, + config: SessionConfig, + dataset: TestDataSet, +) -> RunResult { + let store = dataset.store; + let schema = dataset.schema; + let ctx = SessionContext::new_with_config(config); + let url = ObjectStoreUrl::parse("memory://").unwrap(); + ctx.register_object_store(url.as_ref(), store.clone()); + + let format = Arc::new( + ParquetFormat::default() + .with_options(ctx.state().table_options().parquet.clone()), + ); + let options = ListingOptions::new(format); + let table_path = ListingTableUrl::parse("memory:///").unwrap(); + let config = ListingTableConfig::new(table_path) + .with_listing_options(options) + .with_schema(schema); + let table = Arc::new(ListingTable::try_new(config).unwrap()); + + ctx.register_table("test_table", table).unwrap(); + + let results = ctx.sql(query).await.unwrap().collect().await.unwrap(); + let explain_batches = ctx + .sql(&format!("EXPLAIN ANALYZE {query}")) + .await + .unwrap() + .collect() + .await + .unwrap(); + let explain_plan = pretty_format_batches(&explain_batches).unwrap().to_string(); + RunResult { + results, + explain_plan, + } +} + +#[derive(Debug)] +struct RunQueryResult { + query: String, + result: Vec, + expected: Vec, +} + +impl RunQueryResult { + fn expected_formated(&self) -> String { + format!("{}", pretty_format_batches(&self.expected).unwrap()) + } + + fn result_formated(&self) -> String { + format!("{}", pretty_format_batches(&self.result).unwrap()) + } + + fn is_ok(&self) -> bool { + self.expected_formated() == self.result_formated() + } +} + +/// Iterate over each line in the plan and check that one of them has `DataSourceExec` and `DynamicFilterPhysicalExpr` in the same line. +fn has_dynamic_filter_expr_pushdown(plan: &str) -> bool { + for line in plan.lines() { + if line.contains("DataSourceExec") && line.contains("DynamicFilterPhysicalExpr") { + return true; + } + } + false +} + +async fn run_query( + query: String, + cfg: SessionConfig, + dataset: TestDataSet, +) -> RunQueryResult { + let cfg_with_dynamic_filters = cfg + .clone() + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); + let cfg_without_dynamic_filters = cfg + .clone() + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", false); + + let expected_result = + run_query_with_config(&query, cfg_without_dynamic_filters, dataset.clone()).await; + let result = + run_query_with_config(&query, cfg_with_dynamic_filters, dataset.clone()).await; + // Check that dynamic filters were actually pushed down + if !has_dynamic_filter_expr_pushdown(&result.explain_plan) { + panic!( + "Dynamic filter was not pushed down in query: {query}\n\n{}", + result.explain_plan + ); + } + + RunQueryResult { + query: query.to_string(), + result: result.results, + expected: expected_result.results, + } +} + +struct TestCase { + query: String, + cfg: SessionConfig, + dataset: TestDataSet, +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_fuzz_topk_filter_pushdown() { + let order_columns = ["id", "name", "department"]; + let order_directions = ["ASC", "DESC"]; + let null_orders = ["NULLS FIRST", "NULLS LAST"]; + + let start = datafusion_common::instant::Instant::now(); + let mut orders: HashMap> = HashMap::new(); + for order_column in &order_columns { + for order_direction in &order_directions { + for null_order in &null_orders { + // if there is a vec for this column insert the order, otherwise create a new vec + let ordering = format!("{order_column} {order_direction} {null_order}"); + match orders.get_mut(*order_column) { + Some(order_vec) => { + order_vec.push(ordering); + } + None => { + orders.insert(order_column.to_string(), vec![ordering]); + } + } + } + } + } + + let mut queries = vec![]; + + for limit in [1, 10] { + for num_order_by_columns in [1, 2, 3] { + for order_columns in ["id", "name", "department"] + .iter() + .combinations(num_order_by_columns) + { + for orderings in order_columns + .iter() + .map(|col| orders.get(**col).unwrap()) + .multi_cartesian_product() + { + let query = format!( + "SELECT * FROM test_table ORDER BY {} LIMIT {}", + orderings.into_iter().join(", "), + limit + ); + queries.push(query); + } + } + } + } + + queries.sort_unstable(); + println!( + "Generated {} queries in {:?}", + queries.len(), + start.elapsed() + ); + + let start = datafusion_common::instant::Instant::now(); + let datasets = test_files().await; + println!("Generated test files in {:?}", start.elapsed()); + + let mut test_cases = vec![]; + for enable_filter_pushdown in [true, false] { + for query in &queries { + for dataset in &datasets { + let mut cfg = SessionConfig::new(); + cfg = cfg.set_bool( + "datafusion.optimizer.enable_dynamic_filter_pushdown", + enable_filter_pushdown, + ); + test_cases.push(TestCase { + query: query.to_string(), + cfg, + dataset: dataset.clone(), + }); + } + } + } + + let start = datafusion_common::instant::Instant::now(); + let mut join_set = JoinSet::new(); + for tc in test_cases { + join_set.spawn(run_query(tc.query, tc.cfg, tc.dataset)); + } + let mut results = join_set.join_all().await; + results.sort_unstable_by(|a, b| a.query.cmp(&b.query)); + println!("Ran {} test cases in {:?}", results.len(), start.elapsed()); + + let failures = results + .iter() + .filter(|result| !result.is_ok()) + .collect::>(); + + for failure in &failures { + println!("Failure:"); + println!("Query:\n{}", failure.query); + println!("\nExpected:\n{}", failure.expected_formated()); + println!("\nResult:\n{}", failure.result_formated()); + println!("\n\n"); + } + + if !failures.is_empty() { + panic!("Some test cases failed"); + } else { + println!("All test cases passed"); + } +} diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 16dba7e3f205..f1ef365c9220 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -17,28 +17,41 @@ use std::sync::{Arc, LazyLock}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::{ + array::record_batch, + datatypes::{DataType, Field, Schema, SchemaRef}, + util::pretty::pretty_format_batches, +}; +use arrow_schema::SortOptions; use datafusion::{ logical_expr::Operator, physical_plan::{ expressions::{BinaryExpr, Column, Literal}, PhysicalExpr, }, + prelude::{ParquetReadOptions, SessionConfig, SessionContext}, scalar::ScalarValue, }; use datafusion_common::config::ConfigOptions; +use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_functions_aggregate::count::count_udaf; -use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning}; -use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; +use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr}; +use datafusion_physical_optimizer::{ + filter_pushdown::FilterPushdown, PhysicalOptimizerRule, +}; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_batches::CoalesceBatchesExec, filter::FilterExec, repartition::RepartitionExec, + sorts::sort::SortExec, + ExecutionPlan, }; -use util::{OptimizationTest, TestNode, TestScanBuilder}; +use futures::StreamExt; +use object_store::{memory::InMemory, ObjectStore}; +use util::{format_plan_for_test, OptimizationTest, TestNode, TestScanBuilder}; mod util; @@ -50,7 +63,7 @@ fn test_pushdown_into_scan() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, FilterPushdown{}, true), + OptimizationTest::new(plan, FilterPushdown::new(), true), @r" OptimizationTest: input: @@ -74,7 +87,7 @@ fn test_pushdown_into_scan_with_config_options() { insta::assert_snapshot!( OptimizationTest::new( Arc::clone(&plan), - FilterPushdown {}, + FilterPushdown::new(), false ), @r" @@ -93,7 +106,7 @@ fn test_pushdown_into_scan_with_config_options() { insta::assert_snapshot!( OptimizationTest::new( plan, - FilterPushdown {}, + FilterPushdown::new(), true ), @r" @@ -118,7 +131,7 @@ fn test_filter_collapse() { let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); insta::assert_snapshot!( - OptimizationTest::new(plan, FilterPushdown{}, true), + OptimizationTest::new(plan, FilterPushdown::new(), true), @r" OptimizationTest: input: @@ -146,7 +159,7 @@ fn test_filter_with_projection() { // expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec insta::assert_snapshot!( - OptimizationTest::new(plan, FilterPushdown{}, true), + OptimizationTest::new(plan, FilterPushdown::new(), true), @r" OptimizationTest: input: @@ -169,7 +182,7 @@ fn test_filter_with_projection() { .unwrap(), ); insta::assert_snapshot!( - OptimizationTest::new(plan, FilterPushdown{},true), + OptimizationTest::new(plan, FilterPushdown::new(),true), @r" OptimizationTest: input: @@ -198,7 +211,7 @@ fn test_push_down_through_transparent_nodes() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, FilterPushdown{},true), + OptimizationTest::new(plan, FilterPushdown::new(),true), @r" OptimizationTest: input: @@ -262,7 +275,7 @@ fn test_no_pushdown_through_aggregates() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, FilterPushdown{}, true), + OptimizationTest::new(plan, FilterPushdown::new(), true), @r" OptimizationTest: input: @@ -293,7 +306,7 @@ fn test_node_handles_child_pushdown_result() { let predicate = col_lit_predicate("a", "foo", &schema()); let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate)); insta::assert_snapshot!( - OptimizationTest::new(plan, FilterPushdown{}, true), + OptimizationTest::new(plan, FilterPushdown::new(), true), @r" OptimizationTest: input: @@ -312,7 +325,7 @@ fn test_node_handles_child_pushdown_result() { let predicate = col_lit_predicate("a", "foo", &schema()); let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate)); insta::assert_snapshot!( - OptimizationTest::new(plan, FilterPushdown{}, true), + OptimizationTest::new(plan, FilterPushdown::new(), true), @r" OptimizationTest: input: @@ -332,7 +345,7 @@ fn test_node_handles_child_pushdown_result() { let predicate = col_lit_predicate("a", "foo", &schema()); let plan = Arc::new(TestNode::new(false, Arc::clone(&scan), predicate)); insta::assert_snapshot!( - OptimizationTest::new(plan, FilterPushdown{}, true), + OptimizationTest::new(plan, FilterPushdown::new(), true), @r" OptimizationTest: input: @@ -346,6 +359,136 @@ fn test_node_handles_child_pushdown_result() { ); } +#[tokio::test] +async fn test_topk_dynamic_filter_pushdown() { + let batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["bd", "bc"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap(), + record_batch!( + ("a", Utf8, ["ac", "ad"]), + ("b", Utf8, ["bb", "ba"]), + ("c", Float64, [2.0, 1.0]) + ) + .unwrap(), + ]; + let scan = TestScanBuilder::new(schema()) + .with_support(true) + .with_batches(batches) + .build(); + let plan = Arc::new( + SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new( + col("b", &schema()).unwrap(), + SortOptions::new(true, false), // descending, nulls_first + )]) + .unwrap(), + Arc::clone(&scan), + ) + .with_fetch(Some(1)), + ) as Arc; + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + " + ); + + // Actually apply the optimization to the plan and put some data through it to check that the filter is updated to reflect the TopK state + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(2); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + stream.next().await.unwrap().unwrap(); + // Now check what our filter looks like + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false], filter=[b@1 > bd] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@1 > bd ] + " + ); +} + +/// Integration test for dynamic filter pushdown with TopK. +/// We use an integration test because there are complex interactions in the optimizer rules +/// that the unit tests applying a single optimizer rule do not cover. +#[tokio::test] +async fn test_topk_dynamic_filter_pushdown_integration() { + let store = Arc::new(InMemory::new()) as Arc; + let mut cfg = SessionConfig::new(); + cfg.options_mut().execution.parquet.pushdown_filters = true; + cfg.options_mut().execution.parquet.max_row_group_size = 128; + let ctx = SessionContext::new_with_config(cfg); + ctx.register_object_store( + ObjectStoreUrl::parse("memory://").unwrap().as_ref(), + Arc::clone(&store), + ); + ctx.sql( + r" +COPY ( + SELECT 1372708800 + value AS t + FROM generate_series(0, 99999) + ORDER BY t + ) TO 'memory:///1.parquet' +STORED AS PARQUET; + ", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Register the file with the context + ctx.register_parquet( + "topk_pushdown", + "memory:///1.parquet", + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Create a TopK query that will use dynamic filter pushdown + let df = ctx + .sql(r"EXPLAIN ANALYZE SELECT t FROM topk_pushdown ORDER BY t LIMIT 10;") + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + let explain = format!("{}", pretty_format_batches(&batches).unwrap()); + + assert!(explain.contains("output_rows=128")); // Read 1 row group + assert!(explain.contains("t@0 < 1372708809")); // Dynamic filter was applied + assert!( + explain.contains("pushdown_rows_matched=128, pushdown_rows_pruned=99872"), + "{explain}" + ); + // Pushdown pruned most rows +} + /// Schema: /// a: String /// b: String diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index c60d2b4d8187..e793af8ed4b0 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -29,6 +29,7 @@ use datafusion_datasource::{ use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::FilterPushdownPhase; use datafusion_physical_plan::{ displayable, filter::FilterExec, @@ -271,6 +272,11 @@ impl TestScanBuilder { self } + pub fn with_batches(mut self, batches: Vec) -> Self { + self.batches = batches; + self + } + pub fn build(self) -> Arc { let source = Arc::new(TestSource::new(self.support, self.batches)); let base_config = FileScanConfigBuilder::new( @@ -426,6 +432,15 @@ fn format_lines(s: &str) -> Vec { s.trim().split('\n').map(|s| s.to_string()).collect() } +pub fn format_plan_for_test(plan: &Arc) -> String { + let mut out = String::new(); + for line in format_execution_plan(plan) { + out.push_str(&format!(" - {line}\n")); + } + out.push('\n'); + out +} + #[derive(Debug)] pub(crate) struct TestNode { inject_filter: bool, @@ -496,6 +511,7 @@ impl ExecutionPlan for TestNode { fn gather_filters_for_pushdown( &self, + _phase: FilterPushdownPhase, parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { @@ -506,6 +522,7 @@ impl ExecutionPlan for TestNode { fn handle_child_pushdown_result( &self, + _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 29c3c9c3d7ff..a0f49ad7b16c 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -36,7 +36,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPropagation, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, }; use datafusion_physical_plan::yield_stream::wrap_yield_stream; @@ -318,6 +318,7 @@ impl ExecutionPlan for DataSourceExec { fn handle_child_pushdown_result( &self, + _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, config: &ConfigOptions, ) -> Result>> { diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index d77207fbbcd7..8f46133ed0bb 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -43,6 +43,7 @@ pub use case::{case, CaseExpr}; pub use cast::{cast, CastExpr}; pub use column::{col, with_new_schema, Column}; pub use datafusion_expr::utils::format_state_name; +pub use dynamic_filters::DynamicFilterPhysicalExpr; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 5b2d47106b8d..885280576b4b 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -22,7 +22,8 @@ use crate::PhysicalOptimizerRule; use datafusion_common::{config::ConfigOptions, Result}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPropagation, PredicateSupport, PredicateSupports, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, + PredicateSupport, PredicateSupports, }; use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; @@ -362,11 +363,31 @@ use itertools::izip; /// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec /// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec #[derive(Debug)] -pub struct FilterPushdown {} +pub struct FilterPushdown { + phase: FilterPushdownPhase, + name: String, +} impl FilterPushdown { + fn new_with_phase(phase: FilterPushdownPhase) -> Self { + let name = match phase { + FilterPushdownPhase::Pre => "FilterPushdown", + FilterPushdownPhase::Post => "FilterPushdown(Post)", + } + .to_string(); + Self { phase, name } + } + + /// Create a new [`FilterPushdown`] optimizer rule that runs in the pre-optimization phase. + /// See [`FilterPushdownPhase`] for more details. pub fn new() -> Self { - Self {} + Self::new_with_phase(FilterPushdownPhase::Pre) + } + + /// Create a new [`FilterPushdown`] optimizer rule that runs in the post-optimization phase. + /// See [`FilterPushdownPhase`] for more details. + pub fn new_post_optimization() -> Self { + Self::new_with_phase(FilterPushdownPhase::Post) } } @@ -382,13 +403,15 @@ impl PhysicalOptimizerRule for FilterPushdown { plan: Arc, config: &ConfigOptions, ) -> Result> { - Ok(push_down_filters(Arc::clone(&plan), vec![], config)? - .updated_node - .unwrap_or(plan)) + Ok( + push_down_filters(Arc::clone(&plan), vec![], config, self.phase)? + .updated_node + .unwrap_or(plan), + ) } fn name(&self) -> &str { - "FilterPushdown" + &self.name } fn schema_check(&self) -> bool { @@ -409,6 +432,7 @@ fn push_down_filters( node: Arc, parent_predicates: Vec>, config: &ConfigOptions, + phase: FilterPushdownPhase, ) -> Result>> { // If the node has any child, these will be rewritten as supported or unsupported let mut parent_predicates_pushdown_states = @@ -418,7 +442,7 @@ fn push_down_filters( let children = node.children(); let filter_description = - node.gather_filters_for_pushdown(parent_predicates.clone(), config)?; + node.gather_filters_for_pushdown(phase, parent_predicates.clone(), config)?; for (child, parent_filters, self_filters) in izip!( children, @@ -460,7 +484,7 @@ fn push_down_filters( } // Any filters that could not be pushed down to a child are marked as not-supported to our parents - let result = push_down_filters(Arc::clone(child), all_predicates, config)?; + let result = push_down_filters(Arc::clone(child), all_predicates, config, phase)?; if let Some(new_child) = result.updated_node { // If we have a filter pushdown result, we need to update our children @@ -524,8 +548,11 @@ fn push_down_filters( }) .collect(), ); - // Check what the current node wants to do given the result of pushdown to it's children + // TODO: by calling `handle_child_pushdown_result` we are assuming that the + // `ExecutionPlan` implementation will not change the plan itself. + // Should we have a separate method for dynamic pushdown that does not allow modifying the plan? let mut res = updated_node.handle_child_pushdown_result( + phase, ChildPushdownResult { parent_filters: parent_pushdown_result, self_filters: self_filters_pushdown_supports, diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index d5129cea9d4e..aed81606919e 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -97,8 +97,10 @@ impl PhysicalOptimizer { // Applying the rule early means only directly-connected AggregateExecs must be examined. Arc::new(LimitedDistinctAggregation::new()), // The FilterPushdown rule tries to push down filters as far as it can. - // For example, it will push down filtering from a `FilterExec` to - // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. + // For example, it will push down filtering from a `FilterExec` to `DataSourceExec`. + // Note that this does not push down dynamic filters (such as those created by a `SortExec` operator in TopK mode), + // those are handled by the later `FilterPushdown` rule. + // See `FilterPushdownPhase` for more details. Arc::new(FilterPushdown::new()), // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution // requirements. Please make sure that the whole plan tree is determined before this rule. @@ -139,6 +141,10 @@ impl PhysicalOptimizer { // reduced by narrowing their input tables. Arc::new(ProjectionPushdown::new()), Arc::new(InsertYieldExec::new()), + // This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan. + // Therefore it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references. + // See `FilterPushdownPhase` for more details. + Arc::new(FilterPushdown::new_post_optimization()), // The SanityCheckPlan rule checks whether the order and // distribution requirements of each node in the plan // is satisfied. It will also reject non-runnable query diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index f35231fb6a99..78bd4b4fc3a0 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -37,7 +37,8 @@ use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{BatchCoalescer, CoalescerState}; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPropagation, + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, }; use datafusion_common::config::ConfigOptions; use futures::ready; @@ -229,6 +230,7 @@ impl ExecutionPlan for CoalesceBatchesExec { fn gather_filters_for_pushdown( &self, + _phase: FilterPushdownPhase, parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { @@ -238,6 +240,7 @@ impl ExecutionPlan for CoalesceBatchesExec { fn handle_child_pushdown_result( &self, + _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 5dd090c12c27..75f1c90820e1 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -17,7 +17,8 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPropagation, + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -509,8 +510,13 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// The default implementation bars all parent filters from being pushed down and adds no new filters. /// This is the safest option, making filter pushdown opt-in on a per-node pasis. + /// + /// There are two different phases in filter pushdown, which some operators may handle the same and some differently. + /// Depending on the phase the operator may or may not be allowed to modify the plan. + /// See [`FilterPushdownPhase`] for more details. fn gather_filters_for_pushdown( &self, + _phase: FilterPushdownPhase, parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { @@ -548,10 +554,15 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// This can be used alongside [`FilterPushdownPropagation::with_filters`] and [`FilterPushdownPropagation::with_updated_node`] /// to dynamically build a result with a mix of supported and unsupported filters. /// + /// There are two different phases in filter pushdown, which some operators may handle the same and some differently. + /// Depending on the phase the operator may or may not be allowed to modify the plan. + /// See [`FilterPushdownPhase`] for more details. + /// /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported /// [`PredicateSupports::new_with_supported_check`]: crate::filter_pushdown::PredicateSupports::new_with_supported_check fn handle_child_pushdown_result( &self, + _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 25eb98a61b2e..252af9ebcd49 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -28,7 +28,8 @@ use super::{ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPropagation, + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -449,9 +450,14 @@ impl ExecutionPlan for FilterExec { fn gather_filters_for_pushdown( &self, + phase: FilterPushdownPhase, parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { + if !matches!(phase, FilterPushdownPhase::Pre) { + return Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)); + } let self_filter = split_conjunction(&self.predicate) .into_iter() .cloned() @@ -503,9 +509,15 @@ impl ExecutionPlan for FilterExec { fn handle_child_pushdown_result( &self, + phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { + if !matches!(phase, FilterPushdownPhase::Pre) { + return Ok(FilterPushdownPropagation::transparent( + child_pushdown_result, + )); + } // We absorb any parent filters that were not handled by our children let mut unhandled_filters = child_pushdown_result.parent_filters.collect_unsupported(); diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 5222a98e3dd5..3bbe3997fdfc 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -20,6 +20,45 @@ use std::vec::IntoIter; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +#[derive(Debug, Clone, Copy)] +pub enum FilterPushdownPhase { + /// Pushdown that happens before most other optimizations. + /// This pushdown allows static filters that do not reference any [`ExecutionPlan`]s to be pushed down. + /// Filters that reference an [`ExecutionPlan`] cannot be pushed down at this stage since the whole plan tree may be rewritten + /// by other optimizations. + /// Implementers are however allowed to modify the execution plan themselves during this phase, for example by returning a completely + /// different [`ExecutionPlan`] from [`ExecutionPlan::handle_child_pushdown_result`]. + /// + /// Pushdown of [`FilterExec`] into `DataSourceExec` is an example of a pre-pushdown. + /// + /// [`ExecutionPlan`]: crate::ExecutionPlan + /// [`FilterExec`]: crate::filter::FilterExec + /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result + Pre, + /// Pushdown that happens after most other optimizations. + /// This stage of filter pushdown allows filters that reference an [`ExecutionPlan`] to be pushed down. + /// Since subsequent optimizations should not change the structure of the plan tree except for calling [`ExecutionPlan::with_new_children`] + /// (which generally preserves internal references) it is safe for references between [`ExecutionPlan`]s to be established at this stage. + /// + /// This phase is used to link a [`SortExec`] (with a TopK operator) or a [`HashJoinExec`] to a `DataSourceExec`. + /// + /// [`ExecutionPlan`]: crate::ExecutionPlan + /// [`ExecutionPlan::with_new_children`]: crate::ExecutionPlan::with_new_children + /// [`SortExec`]: crate::sorts::sort::SortExec + /// [`HashJoinExec`]: crate::joins::HashJoinExec + /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result + Post, +} + +impl std::fmt::Display for FilterPushdownPhase { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FilterPushdownPhase::Pre => write!(f, "Pre"), + FilterPushdownPhase::Post => write!(f, "Post"), + } + } +} + /// The result of a plan for pushing down a filter into a child node. /// This contains references to filters so that nodes can mutate a filter /// before pushing it down to a child node (e.g. to adjust a projection) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 8b0f7e9784af..d872a84d7285 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -55,7 +55,8 @@ use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPropagation, + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, }; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -807,6 +808,7 @@ impl ExecutionPlan for RepartitionExec { fn gather_filters_for_pushdown( &self, + _phase: FilterPushdownPhase, parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { @@ -816,6 +818,7 @@ impl ExecutionPlan for RepartitionExec { fn handle_child_pushdown_result( &self, + _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index c31f17291e19..0aa284297b22 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; +use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::limit::LimitStream; use crate::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, @@ -52,7 +53,9 @@ use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; +use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr}; use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; @@ -843,6 +846,8 @@ pub struct SortExec { common_sort_prefix: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// Filter matching the state of the sort for dynamic filter pushdown + filter: Option>, } impl SortExec { @@ -861,6 +866,7 @@ impl SortExec { fetch: None, common_sort_prefix: sort_prefix, cache, + filter: None, } } @@ -906,6 +912,17 @@ impl SortExec { if fetch.is_some() && is_pipeline_friendly { cache = cache.with_boundedness(Boundedness::Bounded); } + let filter = fetch.is_some().then(|| { + // If we already have a filter, keep it. Otherwise, create a new one. + self.filter.clone().unwrap_or_else(|| { + let children = self + .expr + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) + }) + }); SortExec { input: Arc::clone(&self.input), expr: self.expr.clone(), @@ -914,9 +931,15 @@ impl SortExec { common_sort_prefix: self.common_sort_prefix.clone(), fetch, cache, + filter, } } + pub fn with_filter(mut self, filter: Arc) -> Self { + self.filter = Some(filter); + self + } + /// Input schema pub fn input(&self) -> &Arc { &self.input @@ -932,6 +955,11 @@ impl SortExec { self.fetch } + /// If `Some(filter)`, returns the filter expression that matches the state of the sort. + pub fn filter(&self) -> Option> { + self.filter.clone() + } + fn output_partitioning_helper( input: &Arc, preserve_partitioning: bool, @@ -1009,6 +1037,13 @@ impl DisplayAs for SortExec { match self.fetch { Some(fetch) => { write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?; + if let Some(filter) = &self.filter { + if let Ok(current) = filter.current() { + if !current.eq(&lit(true)) { + write!(f, ", filter=[{current}]")?; + } + } + } if !self.common_sort_prefix.is_empty() { write!(f, ", sort_prefix=[")?; let mut first = true; @@ -1079,9 +1114,10 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - let new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) + let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) .with_fetch(self.fetch) .with_preserve_partitioning(self.preserve_partitioning); + new_sort.filter = self.filter.clone(); Ok(Arc::new(new_sort)) } @@ -1122,6 +1158,7 @@ impl ExecutionPlan for SortExec { context.session_config().batch_size(), context.runtime_env(), &self.metrics_set, + self.filter.clone(), )?; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -1228,6 +1265,28 @@ impl ExecutionPlan for SortExec { .with_preserve_partitioning(self.preserve_partitioning()), ))) } + + fn gather_filters_for_pushdown( + &self, + phase: FilterPushdownPhase, + parent_filters: Vec>, + config: &datafusion_common::config::ConfigOptions, + ) -> Result { + if !matches!(phase, FilterPushdownPhase::Post) { + return Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)); + } + if let Some(filter) = &self.filter { + if config.optimizer.enable_dynamic_filter_pushdown { + let filter = Arc::clone(filter) as Arc; + return Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters) + .with_self_filter(filter)); + } + } + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index d8b6f0e400b8..8d06fa73ce8e 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -17,6 +17,12 @@ //! TopK: Combination of Sort / LIMIT +use arrow::{ + array::{Array, AsArray}, + compute::{interleave_record_batch, prep_null_mask_filter, FilterBuilder}, + row::{RowConverter, Rows, SortField}, +}; +use datafusion_expr::{ColumnarValue, Operator}; use std::mem::size_of; use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; @@ -25,14 +31,18 @@ use crate::spill::get_record_batch_memory_size; use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; use arrow::array::{ArrayRef, RecordBatch}; -use arrow::compute::interleave_record_batch; use arrow::datatypes::SchemaRef; -use arrow::row::{RowConverter, Rows, SortField}; -use datafusion_common::{internal_datafusion_err, HashMap, Result}; +use datafusion_common::{ + internal_datafusion_err, internal_err, HashMap, Result, ScalarValue, +}; use datafusion_execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, runtime_env::RuntimeEnv, }; +use datafusion_physical_expr::{ + expressions::{is_not_null, is_null, lit, BinaryExpr, DynamicFilterPhysicalExpr}, + PhysicalExpr, +}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; /// Global TopK @@ -110,6 +120,8 @@ pub struct TopK { common_sort_prefix_converter: Option, /// Common sort prefix between the input and the sort expressions to allow early exit optimization common_sort_prefix: Arc<[PhysicalSortExpr]>, + /// Filter matching the state of the `TopK` heap used for dynamic filter pushdown + filter: Option>, /// If true, indicates that all rows of subsequent batches are guaranteed /// to be greater (by byte order, after row conversion) than the top K, /// which means the top K won't change and the computation can be finished early. @@ -148,6 +160,7 @@ impl TopK { batch_size: usize, runtime: Arc, metrics: &ExecutionPlanMetricsSet, + filter: Option>, ) -> Result { let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) .register(&runtime.memory_pool); @@ -179,6 +192,7 @@ impl TopK { common_sort_prefix_converter: prefix_row_converter, common_sort_prefix: Arc::from(common_sort_prefix), finished: false, + filter, }) } @@ -189,7 +203,7 @@ impl TopK { let baseline = self.metrics.baseline.clone(); let _timer = baseline.elapsed_compute().timer(); - let sort_keys: Vec = self + let mut sort_keys: Vec = self .expr .iter() .map(|expr| { @@ -198,39 +212,202 @@ impl TopK { }) .collect::>>()?; + let mut selected_rows = None; + + if let Some(filter) = self.filter.as_ref() { + // If a filter is provided, update it with the new rows + let filter = filter.current()?; + let filtered = filter.evaluate(&batch)?; + let num_rows = batch.num_rows(); + let array = filtered.into_array(num_rows)?; + let mut filter = array.as_boolean().clone(); + let true_count = filter.true_count(); + if true_count == 0 { + // nothing to filter, so no need to update + return Ok(()); + } + // only update the keys / rows if the filter does not match all rows + if true_count < num_rows { + // Indices in `set_indices` should be correct if filter contains nulls + // So we prepare the filter here. Note this is also done in the `FilterBuilder` + // so there is no overhead to do this here. + if filter.nulls().is_some() { + filter = prep_null_mask_filter(&filter); + } + + let filter_predicate = FilterBuilder::new(&filter); + let filter_predicate = if sort_keys.len() > 1 { + // Optimize filter when it has multiple sort keys + filter_predicate.optimize().build() + } else { + filter_predicate.build() + }; + selected_rows = Some(filter); + sort_keys = sort_keys + .iter() + .map(|key| filter_predicate.filter(key).map_err(|x| x.into())) + .collect::>>()?; + } + }; // reuse existing `Rows` to avoid reallocations let rows = &mut self.scratch_rows; rows.clear(); self.row_converter.append(rows, &sort_keys)?; - // TODO make this algorithmically better?: - // Idea: filter out rows >= self.heap.max() early (before passing to `RowConverter`) - // this avoids some work and also might be better vectorizable. let mut batch_entry = self.heap.register_batch(batch.clone()); - for (index, row) in rows.iter().enumerate() { + + let replacements = match selected_rows { + Some(filter) => { + self.find_new_topk_items(filter.values().set_indices(), &mut batch_entry) + } + None => self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry), + }; + + if replacements > 0 { + self.metrics.row_replacements.add(replacements); + + self.heap.insert_batch_entry(batch_entry); + + // conserve memory + self.heap.maybe_compact()?; + + // update memory reservation + self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, + // which means the top K won't change and the computation can be finished early. + self.attempt_early_completion(&batch)?; + + // update the filter representation of our TopK heap + self.update_filter()?; + } + + Ok(()) + } + + fn find_new_topk_items( + &mut self, + items: impl Iterator, + batch_entry: &mut RecordBatchEntry, + ) -> usize { + let mut replacements = 0; + let rows = &mut self.scratch_rows; + for (index, row) in items.zip(rows.iter()) { match self.heap.max() { // heap has k items, and the new row is greater than the // current max in the heap ==> it is not a new topk Some(max_row) if row.as_ref() >= max_row.row() => {} // don't yet have k items or new item is lower than the currently k low values None | Some(_) => { - self.heap.add(&mut batch_entry, row, index); - self.metrics.row_replacements.add(1); + self.heap.add(batch_entry, row, index); + replacements += 1; } } } - self.heap.insert_batch_entry(batch_entry); + replacements + } - // conserve memory - self.heap.maybe_compact()?; + /// Update the filter representation of our TopK heap. + /// For example, given the sort expression `ORDER BY a DESC, b ASC LIMIT 3`, + /// and the current heap values `[(1, 5), (1, 4), (2, 3)]`, + /// the filter will be updated to: + /// + /// ```sql + /// (a > 1 OR (a = 1 AND b < 5)) AND + /// (a > 1 OR (a = 1 AND b < 4)) AND + /// (a > 2 OR (a = 2 AND b < 3)) + /// ``` + fn update_filter(&mut self) -> Result<()> { + let Some(filter) = &self.filter else { + return Ok(()); + }; + let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? else { + return Ok(()); + }; + + // Create filter expressions for each threshold + let mut filters: Vec> = + Vec::with_capacity(thresholds.len()); + + let mut prev_sort_expr: Option> = None; + for (sort_expr, value) in self.expr.iter().zip(thresholds.iter()) { + // Create the appropriate operator based on sort order + let op = if sort_expr.options.descending { + // For descending sort, we want col > threshold (exclude smaller values) + Operator::Gt + } else { + // For ascending sort, we want col < threshold (exclude larger values) + Operator::Lt + }; + + let value_null = value.is_null(); + + let comparison = Arc::new(BinaryExpr::new( + Arc::clone(&sort_expr.expr), + op, + lit(value.clone()), + )); + + let comparison_with_null = match (sort_expr.options.nulls_first, value_null) { + // For nulls first, transform to (threshold.value is not null) and (threshold.expr is null or comparison) + (true, true) => lit(false), + (true, false) => Arc::new(BinaryExpr::new( + is_null(Arc::clone(&sort_expr.expr))?, + Operator::Or, + comparison, + )), + // For nulls last, transform to (threshold.value is null and threshold.expr is not null) + // or (threshold.value is not null and comparison) + (false, true) => is_not_null(Arc::clone(&sort_expr.expr))?, + (false, false) => comparison, + }; + + let mut eq_expr = Arc::new(BinaryExpr::new( + Arc::clone(&sort_expr.expr), + Operator::Eq, + lit(value.clone()), + )); + + if value_null { + eq_expr = Arc::new(BinaryExpr::new( + is_null(Arc::clone(&sort_expr.expr))?, + Operator::Or, + eq_expr, + )); + } - // update memory reservation - self.reservation.try_resize(self.size())?; + // For a query like order by a, b, the filter for column `b` is only applied if + // the condition a = threshold.value (considering null equality) is met. + // Therefore, we add equality predicates for all preceding fields to the filter logic of the current field, + // and include the current field's equality predicate in `prev_sort_expr` for use with subsequent fields. + match prev_sort_expr.take() { + None => { + prev_sort_expr = Some(eq_expr); + filters.push(comparison_with_null); + } + Some(p) => { + filters.push(Arc::new(BinaryExpr::new( + Arc::clone(&p), + Operator::And, + comparison_with_null, + ))); + + prev_sort_expr = + Some(Arc::new(BinaryExpr::new(p, Operator::And, eq_expr))); + } + } + } - // flag the topK as finished if we know that all - // subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, - // which means the top K won't change and the computation can be finished early. - self.attempt_early_completion(&batch)?; + let dynamic_predicate = filters + .into_iter() + .reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b))); + + if let Some(predicate) = dynamic_predicate { + if !predicate.eq(&lit(true)) { + filter.update(predicate)?; + } + } Ok(()) } @@ -324,6 +501,7 @@ impl TopK { common_sort_prefix_converter: _, common_sort_prefix: _, finished: _, + filter: _, } = self; let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop @@ -566,6 +744,47 @@ impl TopKHeap { + self.store.size() + self.owned_bytes } + + fn get_threshold_values( + &self, + sort_exprs: &[PhysicalSortExpr], + ) -> Result>> { + // If the heap doesn't have k elements yet, we can't create thresholds + let max_row = match self.max() { + Some(row) => row, + None => return Ok(None), + }; + + // Get the batch that contains the max row + let batch_entry = match self.store.get(max_row.batch_id) { + Some(entry) => entry, + None => return internal_err!("Invalid batch ID in TopKRow"), + }; + + // Extract threshold values for each sort expression + let mut scalar_values = Vec::with_capacity(sort_exprs.len()); + for sort_expr in sort_exprs { + // Extract the value for this column from the max row + let expr = Arc::clone(&sort_expr.expr); + let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 1))?; + + // Convert to scalar value - should be a single value since we're evaluating on a single row batch + let scalar = match value { + ColumnarValue::Scalar(scalar) => scalar, + ColumnarValue::Array(array) if array.len() == 1 => { + // Extract the first (and only) value from the array + ScalarValue::try_from_array(&array, 0)? + } + array => { + return internal_err!("Expected a scalar value, got {:?}", array) + } + }; + + scalar_values.push(scalar); + } + + Ok(Some(scalar_values)) + } } /// Represents one of the top K rows held in this heap. Orders @@ -834,6 +1053,7 @@ mod tests { 2, runtime, &metrics, + None, )?; // Create the first batch with two columns: diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 560648271070..e65225084548 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -243,6 +243,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after insert_yield_exec SAME TEXT AS ABOVE +physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] @@ -321,6 +322,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after insert_yield_exec SAME TEXT AS ABOVE +physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -363,6 +365,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after insert_yield_exec SAME TEXT AS ABOVE +physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index dc8b7680d83e..42d64c5ee155 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -285,6 +285,7 @@ datafusion.format.types_info false datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_distinct_aggregation_soft_limit true +datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.expand_views_at_output false @@ -396,6 +397,7 @@ datafusion.format.types_info false Show types in visual representation batches datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. +datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 9d5106bf2caf..3398fa29018b 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -854,7 +854,7 @@ physical_plan 01)ProjectionExec: expr=[1 as foo] 02)--SortPreservingMergeExec: [part_key@0 ASC NULLS LAST], fetch=1 03)----SortExec: TopK(fetch=1), expr=[part_key@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], file_type=parquet +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query I with selection as ( diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index e5b5f5ac878a..f7082c1daaf5 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -280,4 +280,4 @@ physical_plan query TT select val, part from t_pushdown where part = val AND part = 'a'; ---- -a a \ No newline at end of file +a a diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 26fef6d666b1..9ff382d32af9 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -316,7 +316,7 @@ explain select number, letter, age from partial_sorted order by number desc, let ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Explain variations of the above query with different orderings, and different sort prefixes. @@ -326,28 +326,28 @@ explain select number, letter, age from partial_sorted order by age desc limit 3 ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[age@2 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by number desc, letter desc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by number asc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by letter asc, number desc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[letter@1 ASC NULLS LAST, number@0 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Explicit NULLS ordering cases (reversing the order of the NULLS on the number and letter orderings) query TT @@ -355,14 +355,14 @@ explain select number, letter, age from partial_sorted order by number desc, let ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC], preserve_partitioning=[false], sort_prefix=[number@0 DESC] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by number desc NULLS LAST, letter asc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC NULLS LAST, letter@1 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Verify that the sort prefix is correctly computed on the normalized ordering (removing redundant aliased columns) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 42282e39e41f..fcb2360b87e3 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -101,6 +101,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | +| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. |