Skip to content

Add more doc for physical filter pushdown #16504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 76 additions & 30 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,36 +527,82 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}

/// Handle the result of a child pushdown.
/// This is called as we recurse back up the plan tree after recursing down and calling [`ExecutionPlan::gather_filters_for_pushdown`].
/// Once we know what the result of pushing down filters into children is we ask the current node what it wants to do with that result.
/// For a `DataSourceExec` that may be absorbing the filters to apply them during the scan phase
/// (also known as late materialization).
/// A `FilterExec` may absorb any filters its children could not absorb, or if there are no filters left it
/// may remove itself from the plan altogether.
/// It combines both [`ChildPushdownResult::parent_filters`] and [`ChildPushdownResult::self_filters`] into a single
/// predicate and replaces it's own predicate.
/// Then it passes [`PredicateSupport::Supported`] for each parent predicate to the parent.
/// A `HashJoinExec` may ignore the pushdown result since it needs to apply the filters as part of the join anyhow.
/// It passes [`ChildPushdownResult::parent_filters`] back up to it's parents wrapped in [`FilterPushdownPropagation::transparent`]
/// and [`ChildPushdownResult::self_filters`] is discarded.
///
/// The default implementation is a no-op that passes the result of pushdown from the children to its parent.
///
/// When returning filters via [`FilterPushdownPropagation`] the order of the filters need not match
/// the order they were passed in via `child_pushdown_result`, but preserving the order may be beneficial
/// for debugging and reasoning about the resulting plans so it is recommended to preserve the order.
///
/// There are various helper methods to make implementing this method easier, see:
/// - [`FilterPushdownPropagation::unsupported`]: to indicate that the node does not support filter pushdown at all.
/// - [`FilterPushdownPropagation::transparent`]: to indicate that the node supports filter pushdown but does not involve itself in it,
/// instead if simply transmits the result of pushdown into its children back up to its parent.
/// - [`PredicateSupports::new_with_supported_check`]: takes a callback that returns true / false for each filter to indicate pushdown support.
/// This can be used alongside [`FilterPushdownPropagation::with_filters`] and [`FilterPushdownPropagation::with_updated_node`]
/// to dynamically build a result with a mix of supported and unsupported filters.
///
/// There are two different phases in filter pushdown, which some operators may handle the same and some differently.
/// Depending on the phase the operator may or may not be allowed to modify the plan.
/// See [`FilterPushdownPhase`] for more details.
/// This method is called as we recurse back up the plan tree after pushing
/// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`].
/// It allows the current node to process the results of filter pushdown from
/// its children, deciding whether to absorb filters, modify the plan, or pass
/// filters back up to its parent.
///
/// **Purpose and Context:**
/// Filter pushdown is a critical optimization in DataFusion that aims to
/// reduce the amount of data processed by applying filters as early as
/// possible in the query plan. This method is part of the second phase of
/// filter pushdown, where results are propagated back up the tree after
/// being pushed down. Each node can inspect the pushdown results from its
/// children and decide how to handle any unapplied filters, potentially
/// optimizing the plan structure or filter application.
///
/// **Behavior in Different Nodes:**
/// - For a `DataSourceExec`, this often means absorbing the filters to apply
/// them during the scan phase (late materialization), reducing the data
/// read from the source.
/// - A `FilterExec` may absorb any filters its children could not handle,
/// combining them with its own predicate. If no filters remain (i.e., the
/// predicate becomes trivially true), it may remove itself from the plan
/// altogether. It typically marks parent filters as supported, indicating
/// they have been handled.
/// - A `HashJoinExec` might ignore the pushdown result if filters need to
/// be applied during the join operation. It passes the parent filters back
/// up wrapped in [`FilterPushdownPropagation::transparent`], discarding
/// any self-filters from children.
///
/// **Example Walkthrough:**
/// Consider a query plan: `FilterExec (f1) -> HashJoinExec -> DataSourceExec`.
/// 1. **Downward Phase (`gather_filters_for_pushdown`):** Starting at
/// `FilterExec`, the filter `f1` is gathered and pushed down to
/// `HashJoinExec`. `HashJoinExec` may allow `f1` to pass to one side of
/// the join or add its own filters (e.g., a min-max filter from the build side),
/// then pushes filters to `DataSourceExec`. `DataSourceExec`, being a leaf node,
/// has no children to push to, so it prepares to handle filters in the
/// upward phase.
/// 2. **Upward Phase (`handle_child_pushdown_result`):** Starting at
/// `DataSourceExec`, it absorbs applicable filters from `HashJoinExec`
/// for late materialization during scanning, marking them as supported.
/// `HashJoinExec` receives the result, decides whether to apply any
/// remaining filters during the join, and passes unhandled filters back
/// up to `FilterExec`. `FilterExec` absorbs any unhandled filters,
/// updates its predicate if necessary, or removes itself if the predicate
/// becomes trivial (e.g., `lit(true)`), and marks filters as supported
/// for its parent.
///
/// The default implementation is a no-op that passes the result of pushdown
/// from the children to its parent transparently, ensuring no filters are
/// lost if a node does not override this behavior.
///
/// **Notes for Implementation:**
/// When returning filters via [`FilterPushdownPropagation`], the order of
/// filters need not match the order they were passed in via
/// `child_pushdown_result`. However, preserving the order is recommended for
/// debugging and ease of reasoning about the resulting plans.
///
/// **Helper Methods for Customization:**
/// There are various helper methods to simplify implementing this method:
/// - [`FilterPushdownPropagation::unsupported`]: Indicates that the node
/// does not support filter pushdown at all, rejecting all filters.
/// - [`FilterPushdownPropagation::transparent`]: Indicates that the node
/// supports filter pushdown but does not modify it, simply transmitting
/// the children's pushdown results back up to its parent.
/// - [`PredicateSupports::new_with_supported_check`]: Takes a callback to
/// dynamically determine support for each filter, useful with
/// [`FilterPushdownPropagation::with_filters`] and
/// [`FilterPushdownPropagation::with_updated_node`] to build mixed results
/// of supported and unsupported filters.
///
/// **Filter Pushdown Phases:**
/// There are two different phases in filter pushdown (`Pre` and others),
/// which some operators may handle differently. Depending on the phase, the
/// operator may or may not be allowed to modify the plan. See
/// [`FilterPushdownPhase`] for more details on phase-specific behavior.
///
/// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported
/// [`PredicateSupports::new_with_supported_check`]: crate::filter_pushdown::PredicateSupports::new_with_supported_check
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub enum FilterPushdownPhase {
/// different [`ExecutionPlan`] from [`ExecutionPlan::handle_child_pushdown_result`].
///
/// Pushdown of [`FilterExec`] into `DataSourceExec` is an example of a pre-pushdown.
/// Unlike filter pushdown in the logical phase, which operates on the logical plan to push filters into the logical table scan,
/// the `Pre` phase in the physical plan targets the actual physical scan, pushing filters down to specific data source implementations.
/// For example, Parquet supports filter pushdown to reduce data read during scanning, while CSV typically does not.
///
/// [`ExecutionPlan`]: crate::ExecutionPlan
/// [`FilterExec`]: crate::filter::FilterExec
Expand Down