Skip to content

Commit 73c1696

Browse files
alambadriangb
authored andcommitted
Improve documentation for filter pushdown (#23)
1 parent eacf985 commit 73c1696

File tree

2 files changed

+140
-81
lines changed

2 files changed

+140
-81
lines changed

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 139 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::stream::RecordBatchStreamAdapter;
4646
use arrow::array::{Array, RecordBatch};
4747
use arrow::datatypes::SchemaRef;
4848
use datafusion_common::config::ConfigOptions;
49-
use datafusion_common::{exec_err, Constraints, DataFusionError, Result};
49+
use datafusion_common::{exec_err, Constraints, Result};
5050
use datafusion_common_runtime::JoinSet;
5151
use datafusion_execution::TaskContext;
5252
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExprRef};
@@ -469,7 +469,35 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
469469
Ok(None)
470470
}
471471

472-
/// A physical optimizer rule that pushes down filters in the execution plan.
472+
/// Attempts to recursively push given filters into this `ExecutionPlan` or
473+
/// its children and push any filters from this node into its children.
474+
///
475+
/// This is used to implement filter pushdown in the physical plan. Note
476+
/// that DataFusion also implements filter pushdown in the logical plan,
477+
/// which is a different code path. This method is here to support
478+
/// additional optimizations that may be only be possible in the physical
479+
/// plan such as dynamic filtering (see below).
480+
///
481+
/// See [`try_pushdown_filters_to_input`] for a simple implementation
482+
///
483+
/// # Arguments
484+
/// * `plan`: `Arc`d instance of self
485+
/// * `parent_filters`: A vector of [`PhysicalExpr`]s from the parent of this node
486+
/// to try and push down
487+
/// * `config`: The configuration options for execution
488+
///
489+
/// # Default Implementation
490+
///
491+
/// The default implementation assumes:
492+
/// * Parent filters can't be passed onto children.
493+
/// * This node has no filters to contribute.
494+
///
495+
/// Note the default implementation still recurses into children to
496+
/// recursively call `try_pushdown_filters` on subtrees that may have
497+
/// filters to pushdown.
498+
///
499+
/// # Example: Push filter into a `DataSourceExec`
500+
///
473501
/// For example, consider the following plan:
474502
///
475503
/// ```text
@@ -491,7 +519,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
491519
/// ```
492520
///
493521
/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node.
494-
/// If this filter is selective it can avoid massive amounts of data being read from the source (the projection is `*` so all matching columns are read).
522+
///
523+
/// If this filter is selective pushing it into the scan can avoid massive
524+
/// amounts of data being read from the source (the projection is `*` so all
525+
/// matching columns are read).
526+
///
495527
/// In this simple case we:
496528
/// 1. Enter the recursion with no filters.
497529
/// 2. We find the [`FilterExec`] node and call [`ExecutionPlan::try_pushdown_filters`] on it.
@@ -513,68 +545,75 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
513545
/// ▼
514546
/// ┌──────────────────────┐
515547
/// │ DataSourceExec │
516-
// │ projection = * │
517-
// │ filters = [ id=1] │
548+
/// │ projection = * │
549+
/// │ filters = [ id=1] │
518550
/// └──────────────────────┘
519551
/// ```
520552
///
521-
/// Let's consider a more complex example involving a [`ProjectionExec`] node in betweeen the [`FilterExec`] and `DataSourceExec` nodes that creates a new column that the filter depends on.
553+
/// # Example: Push filters with `ProjectionExec`
554+
///
555+
/// Let's consider a more complex example involving a [`ProjectionExec`]
556+
/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
557+
/// creates a new column that the filter depends on.
522558
///
523559
/// ```text
524-
// ┌──────────────────────┐
525-
// │ CoalesceBatchesExec │
526-
// └──────────────────────┘
527-
// │
528-
// ▼
529-
// ┌──────────────────────┐
530-
// │ FilterExec │
531-
// │ filters = │
532-
// │ [cost>50,id=1] │
533-
// └──────────────────────┘
534-
// │
535-
// ▼
536-
// ┌──────────────────────┐
537-
// │ ProjectionExec │
538-
// │ cost = price * 1.2 │
539-
// └──────────────────────┘
540-
// │
541-
// ▼
542-
// ┌──────────────────────┐
543-
// │ DataSourceExec │
544-
// │ projection = * │
545-
// └──────────────────────┘
560+
/// ┌──────────────────────┐
561+
/// │ CoalesceBatchesExec │
562+
/// └──────────────────────┘
563+
///
564+
///
565+
/// ┌──────────────────────┐
566+
/// │ FilterExec │
567+
/// │ filters = │
568+
/// │ [cost>50,id=1] │
569+
/// └──────────────────────┘
570+
///
571+
///
572+
/// ┌──────────────────────┐
573+
/// │ ProjectionExec │
574+
/// │ cost = price * 1.2 │
575+
/// └──────────────────────┘
576+
///
577+
///
578+
/// ┌──────────────────────┐
579+
/// │ DataSourceExec │
580+
/// │ projection = * │
581+
/// └──────────────────────┘
546582
/// ```
547583
///
548-
/// We want to push down the filters [id=1] to the `DataSourceExec` node, but can't push down `cost>50` because it requires the [`ProjectionExec`] node to be executed first.
549-
/// A simple thing to do would be to split up the filter into two separate filters and push down the first one:
584+
/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
585+
/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
586+
/// node to be executed first. A simple thing to do would be to split up the
587+
/// filter into two separate filters and push down the first one:
550588
///
551589
/// ```text
552-
// ┌──────────────────────┐
553-
// │ CoalesceBatchesExec │
554-
// └──────────────────────┘
555-
// │
556-
// ▼
557-
// ┌──────────────────────┐
558-
// │ FilterExec │
559-
// │ filters = │
560-
// │ [cost>50] │
561-
// └──────────────────────┘
562-
// │
563-
// ▼
564-
// ┌──────────────────────┐
565-
// │ ProjectionExec │
566-
// │ cost = price * 1.2 │
567-
// └──────────────────────┘
568-
// │
569-
// ▼
570-
// ┌──────────────────────┐
571-
// │ DataSourceExec │
572-
// │ projection = * │
573-
// │ filters = [ id=1] │
574-
// └──────────────────────┘
590+
/// ┌──────────────────────┐
591+
/// │ CoalesceBatchesExec │
592+
/// └──────────────────────┘
593+
///
594+
///
595+
/// ┌──────────────────────┐
596+
/// │ FilterExec │
597+
/// │ filters = │
598+
/// │ [cost>50] │
599+
/// └──────────────────────┘
600+
///
601+
///
602+
/// ┌──────────────────────┐
603+
/// │ ProjectionExec │
604+
/// │ cost = price * 1.2 │
605+
/// └──────────────────────┘
606+
///
607+
///
608+
/// ┌──────────────────────┐
609+
/// │ DataSourceExec │
610+
/// │ projection = * │
611+
/// │ filters = [ id=1] │
612+
/// └──────────────────────┘
575613
/// ```
576614
///
577-
/// We can actually however do better by pushing down `price * 1.2 > 50` instead of `cost > 50`:
615+
/// We can actually however do better by pushing down `price * 1.2 > 50`
616+
/// instead of `cost > 50`:
578617
///
579618
/// ```text
580619
/// ┌──────────────────────┐
@@ -596,8 +635,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
596635
/// └──────────────────────┘
597636
/// ```
598637
///
599-
/// There are also cases where we may be able to push down filters within a subtree but not the entire tree.
600-
/// A good exmaple of this is aggreagation nodes:
638+
/// # Example: Push filters within a subtree
639+
///
640+
/// There are also cases where we may be able to push down filters within a
641+
/// subtree but not the entire tree. A good example of this is aggregation
642+
/// nodes:
601643
///
602644
/// ```text
603645
/// ┌──────────────────────┐
@@ -632,7 +674,8 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
632674
/// └──────────────────────┘
633675
/// ```
634676
///
635-
/// The transformation here is to push down the `id=1` filter to the `DataSourceExec` node:
677+
/// The transformation here is to push down the `id=1` filter to the
678+
/// `DataSourceExec` node:
636679
///
637680
/// ```text
638681
/// ┌──────────────────────┐
@@ -666,11 +709,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
666709
/// 1. We cannot push down `sum > 10` through the `AggregateExec` node into the `DataSourceExec` node.
667710
/// Any filters above the `AggregateExec` node are not pushed down.
668711
/// This is determined by calling [`ExecutionPlan::try_pushdown_filters`] on the [`AggregateExec`] node.
669-
/// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push down the `id=1` filter.
712+
/// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push
713+
/// down the `id=1` filter.
670714
///
671-
/// It is also possible to push down filters through joins and from joins.
672-
/// For example, a hash join where we build a hash table of the left side and probe the right side
673-
/// (ignoring why we would choose this order, typically it depends on the size of each table, etc.).
715+
/// # Example: Push filters through Joins
716+
///
717+
/// It is also possible to push down filters through joins and filters that
718+
/// originate from joins. For example, a hash join where we build a hash
719+
/// table of the left side and probe the right side (ignoring why we would
720+
/// choose this order, typically it depends on the size of each table,
721+
/// etc.).
674722
///
675723
/// ```text
676724
/// ┌─────────────────────┐
@@ -696,10 +744,12 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
696744
/// ```
697745
///
698746
/// There are two pushdowns we can do here:
699-
/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec` node for the `departments` table.
747+
/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec`
748+
/// node for the `departments` table.
700749
/// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading
701-
/// rows from teh `users` table that will be eliminated by the join.
702-
/// This can be done via a bloom filter or similar.
750+
/// rows from the `users` table that will be eliminated by the join.
751+
/// This can be done via a bloom filter or similar and is not (yet) supported
752+
/// in DataFusion. See <https://github.com/apache/datafusion/issues/7955>.
703753
///
704754
/// ```text
705755
/// ┌─────────────────────┐
@@ -718,20 +768,23 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
718768
/// └─────────────────────┘ └─────────────────────┘
719769
/// ```
720770
///
721-
/// You may notice in this case that the filter is *dynamic*: the hash table is built
722-
/// _after_ the `departments` table is read and at runtime.
723-
/// We don't have a concrete `InList` filter or similar to push down at optimization time.
724-
/// These sorts of dynamic filters are handled by building a specialized
725-
/// [`PhysicalExpr`] that can be evaluated at runtime
771+
/// You may notice in this case that the filter is *dynamic*: the hash table
772+
/// is built _after_ the `departments` table is read and at runtime. We
773+
/// don't have a concrete `InList` filter or similar to push down at
774+
/// optimization time. These sorts of dynamic filters are handled by
775+
/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime
726776
/// and internally maintains a reference to the hash table or other state.
777+
///
727778
/// To make working with these sorts of dynamic filters more tractable we have the method `PhysicalExpr::snapshot`
728779
/// (TODO: add reference after <https://github.com/apache/datafusion/pull/15568> is merged)
729780
/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter.
730781
/// For a join this could mean converting it to an `InList` filter or a min/max filter for example.
731782
/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
732783
///
733-
/// Another form of dyanmic filter is pushing down the state of a `TopK` operator for queries like
734-
/// `SELECT * FROM t ORDER BY id LIMIT 10`:
784+
/// # Example: Push TopK filters into Scans
785+
///
786+
/// Another form of dynamic filter is pushing down the state of a `TopK`
787+
/// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`:
735788
///
736789
/// ```text
737790
/// ┌──────────────────────┐
@@ -765,9 +818,12 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
765818
/// └──────────────────────┘
766819
/// ```
767820
///
768-
/// Now as we fill our `TopK` heap we can push down the state of the heap to the `DataSourceExec` node
769-
/// to avoid reading files / row groups / pages / rows that could not possibly be in the top 10.
770-
/// This is implemented in datafusion/physical-plan/src/sorts/sort_filters.rs.
821+
/// Now as we fill our `TopK` heap we can push down the state of the heap to
822+
/// the `DataSourceExec` node to avoid reading files / row groups / pages /
823+
/// rows that could not possibly be in the top 10.
824+
///
825+
/// This is not yet implemented in DataFusion. See
826+
/// <https://github.com/apache/datafusion/issues/15037>
771827
///
772828
/// [`FilterExec`]: crate::filter::FilterExec
773829
/// [`ProjectionExec`]: crate::projection::ProjectionExec
@@ -797,9 +853,9 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
797853
pushed = true;
798854
// Support should be empty, we didn't pass any filters
799855
if !support.is_empty() {
800-
return Err(DataFusionError::Internal(
801-
"Child plan did not have any filters pushed down".to_string(),
802-
));
856+
return internal_err!(
857+
"No filters passed, but child plan reported pushed filters"
858+
);
803859
}
804860
}
805861
}
@@ -820,10 +876,12 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
820876
pub type ExecutionPlanFilterPushdownResult = FilterPushdownResult<Arc<dyn ExecutionPlan>>;
821877

822878
/// A default implementation of [`ExecutionPlan::try_pushdown_filters`] that
823-
/// pushes down filters transparently to the input of the plan for plans that:
824-
/// * Have a single input / child node.
825-
/// * Support transparent filter pushdown (do not modify the cardinality or schema of the data).
826-
/// * Do not have any filters of their own.
879+
/// pushes down filters transparently to an input.
880+
///
881+
/// Requires that the input:
882+
/// * Has a single input / child node.
883+
/// * Supports transparent filter pushdown (does not modify the cardinality or schema of the data).
884+
/// * Does not have any filters of its own.
827885
pub fn try_pushdown_filters_to_input(
828886
plan: &Arc<dyn ExecutionPlan>,
829887
input: &Arc<dyn ExecutionPlan>,

datafusion/physical-plan/src/filter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ impl ExecutionPlan for FilterExec {
443443
parent_filters: &[PhysicalExprRef],
444444
config: &ConfigOptions,
445445
) -> Result<ExecutionPlanFilterPushdownResult> {
446+
// filters are in terms of the output columns of this plan
446447
let mut all_filters = parent_filters.to_vec();
447448
all_filters.push(Arc::clone(&self.predicate));
448449
let all_filters = if self.projection.is_some() {

0 commit comments

Comments
 (0)