From 39f73383f8205e1143f8581be9cf4f428e00d039 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 23 Jun 2025 14:19:27 +0800 Subject: [PATCH] Add more doc for physical filter pushdown --- .../physical-plan/src/execution_plan.rs | 106 +++++++++++++----- .../physical-plan/src/filter_pushdown.rs | 3 + 2 files changed, 79 insertions(+), 30 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 2605e26c3c7f..1b8972ece251 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -527,36 +527,82 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } /// Handle the result of a child pushdown. - /// This is called as we recurse back up the plan tree after recursing down and calling [`ExecutionPlan::gather_filters_for_pushdown`]. - /// Once we know what the result of pushing down filters into children is we ask the current node what it wants to do with that result. - /// For a `DataSourceExec` that may be absorbing the filters to apply them during the scan phase - /// (also known as late materialization). - /// A `FilterExec` may absorb any filters its children could not absorb, or if there are no filters left it - /// may remove itself from the plan altogether. - /// It combines both [`ChildPushdownResult::parent_filters`] and [`ChildPushdownResult::self_filters`] into a single - /// predicate and replaces it's own predicate. - /// Then it passes [`PredicateSupport::Supported`] for each parent predicate to the parent. - /// A `HashJoinExec` may ignore the pushdown result since it needs to apply the filters as part of the join anyhow. - /// It passes [`ChildPushdownResult::parent_filters`] back up to it's parents wrapped in [`FilterPushdownPropagation::transparent`] - /// and [`ChildPushdownResult::self_filters`] is discarded. - /// - /// The default implementation is a no-op that passes the result of pushdown from the children to its parent. - /// - /// When returning filters via [`FilterPushdownPropagation`] the order of the filters need not match - /// the order they were passed in via `child_pushdown_result`, but preserving the order may be beneficial - /// for debugging and reasoning about the resulting plans so it is recommended to preserve the order. - /// - /// There are various helper methods to make implementing this method easier, see: - /// - [`FilterPushdownPropagation::unsupported`]: to indicate that the node does not support filter pushdown at all. - /// - [`FilterPushdownPropagation::transparent`]: to indicate that the node supports filter pushdown but does not involve itself in it, - /// instead if simply transmits the result of pushdown into its children back up to its parent. - /// - [`PredicateSupports::new_with_supported_check`]: takes a callback that returns true / false for each filter to indicate pushdown support. - /// This can be used alongside [`FilterPushdownPropagation::with_filters`] and [`FilterPushdownPropagation::with_updated_node`] - /// to dynamically build a result with a mix of supported and unsupported filters. - /// - /// There are two different phases in filter pushdown, which some operators may handle the same and some differently. - /// Depending on the phase the operator may or may not be allowed to modify the plan. - /// See [`FilterPushdownPhase`] for more details. + /// This method is called as we recurse back up the plan tree after pushing + /// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`]. + /// It allows the current node to process the results of filter pushdown from + /// its children, deciding whether to absorb filters, modify the plan, or pass + /// filters back up to its parent. + /// + /// **Purpose and Context:** + /// Filter pushdown is a critical optimization in DataFusion that aims to + /// reduce the amount of data processed by applying filters as early as + /// possible in the query plan. This method is part of the second phase of + /// filter pushdown, where results are propagated back up the tree after + /// being pushed down. Each node can inspect the pushdown results from its + /// children and decide how to handle any unapplied filters, potentially + /// optimizing the plan structure or filter application. + /// + /// **Behavior in Different Nodes:** + /// - For a `DataSourceExec`, this often means absorbing the filters to apply + /// them during the scan phase (late materialization), reducing the data + /// read from the source. + /// - A `FilterExec` may absorb any filters its children could not handle, + /// combining them with its own predicate. If no filters remain (i.e., the + /// predicate becomes trivially true), it may remove itself from the plan + /// altogether. It typically marks parent filters as supported, indicating + /// they have been handled. + /// - A `HashJoinExec` might ignore the pushdown result if filters need to + /// be applied during the join operation. It passes the parent filters back + /// up wrapped in [`FilterPushdownPropagation::transparent`], discarding + /// any self-filters from children. + /// + /// **Example Walkthrough:** + /// Consider a query plan: `FilterExec (f1) -> HashJoinExec -> DataSourceExec`. + /// 1. **Downward Phase (`gather_filters_for_pushdown`):** Starting at + /// `FilterExec`, the filter `f1` is gathered and pushed down to + /// `HashJoinExec`. `HashJoinExec` may allow `f1` to pass to one side of + /// the join or add its own filters (e.g., a min-max filter from the build side), + /// then pushes filters to `DataSourceExec`. `DataSourceExec`, being a leaf node, + /// has no children to push to, so it prepares to handle filters in the + /// upward phase. + /// 2. **Upward Phase (`handle_child_pushdown_result`):** Starting at + /// `DataSourceExec`, it absorbs applicable filters from `HashJoinExec` + /// for late materialization during scanning, marking them as supported. + /// `HashJoinExec` receives the result, decides whether to apply any + /// remaining filters during the join, and passes unhandled filters back + /// up to `FilterExec`. `FilterExec` absorbs any unhandled filters, + /// updates its predicate if necessary, or removes itself if the predicate + /// becomes trivial (e.g., `lit(true)`), and marks filters as supported + /// for its parent. + /// + /// The default implementation is a no-op that passes the result of pushdown + /// from the children to its parent transparently, ensuring no filters are + /// lost if a node does not override this behavior. + /// + /// **Notes for Implementation:** + /// When returning filters via [`FilterPushdownPropagation`], the order of + /// filters need not match the order they were passed in via + /// `child_pushdown_result`. However, preserving the order is recommended for + /// debugging and ease of reasoning about the resulting plans. + /// + /// **Helper Methods for Customization:** + /// There are various helper methods to simplify implementing this method: + /// - [`FilterPushdownPropagation::unsupported`]: Indicates that the node + /// does not support filter pushdown at all, rejecting all filters. + /// - [`FilterPushdownPropagation::transparent`]: Indicates that the node + /// supports filter pushdown but does not modify it, simply transmitting + /// the children's pushdown results back up to its parent. + /// - [`PredicateSupports::new_with_supported_check`]: Takes a callback to + /// dynamically determine support for each filter, useful with + /// [`FilterPushdownPropagation::with_filters`] and + /// [`FilterPushdownPropagation::with_updated_node`] to build mixed results + /// of supported and unsupported filters. + /// + /// **Filter Pushdown Phases:** + /// There are two different phases in filter pushdown (`Pre` and others), + /// which some operators may handle differently. Depending on the phase, the + /// operator may or may not be allowed to modify the plan. See + /// [`FilterPushdownPhase`] for more details on phase-specific behavior. /// /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported /// [`PredicateSupports::new_with_supported_check`]: crate::filter_pushdown::PredicateSupports::new_with_supported_check diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 3bbe3997fdfc..725abd7fc8b5 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -30,6 +30,9 @@ pub enum FilterPushdownPhase { /// different [`ExecutionPlan`] from [`ExecutionPlan::handle_child_pushdown_result`]. /// /// Pushdown of [`FilterExec`] into `DataSourceExec` is an example of a pre-pushdown. + /// Unlike filter pushdown in the logical phase, which operates on the logical plan to push filters into the logical table scan, + /// the `Pre` phase in the physical plan targets the actual physical scan, pushing filters down to specific data source implementations. + /// For example, Parquet supports filter pushdown to reduce data read during scanning, while CSV typically does not. /// /// [`ExecutionPlan`]: crate::ExecutionPlan /// [`FilterExec`]: crate::filter::FilterExec