From aac67ebddca0a48974dee76acd868f6dda2be261 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Fri, 20 Jun 2025 09:48:17 +0200 Subject: [PATCH 1/2] feat: make `with_work_table` a trait method for `ExecutionPlan` Expose `with_work_table` as a public trait method on `ExecutionPlan`, allowing custom and external execution nodes to support recursive query work table injection. Motivation: The `RecursiveQueryExec` dynamically rebuilds its child execution plan during execution, and assumes it will find nodes that support work table injection. Previously, only the concrete `WorkTableExec` type supported this, making it difficult for custom or wrapper nodes to participate in recursive query execution. Changes: - Add `with_work_table` as a public method to the `ExecutionPlan` trait, with a default implementation returning `None`. - Update `WorkTableExec` to implement this method, returning a new instance with the provided work table. - Update documentation: clarify the purpose of `with_work_table` in both the trait and the `WorkTableExec` implementation, and add direct doc links for discoverability. - Update `assign_work_table` logic in `RecursiveQueryExec` to use the trait method, enabling support for custom nodes that wrap or contain a `WorkTableExec`. - Re-export `WorkTable` publicly so external implementors can use it in their own `ExecutionPlan` implementations. This change enables extensibility for recursive query support, allowing external crates and custom execution plans to participate in work table injection without relying on internal module structure or concrete type checks. --- .../physical-plan/src/execution_plan.rs | 19 ++++++++++ datafusion/physical-plan/src/lib.rs | 1 + .../physical-plan/src/recursive_query.rs | 6 ++-- datafusion/physical-plan/src/work_table.rs | 36 +++++++++++-------- 4 files changed, 43 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 2605e26c3c7f..d67c94ef54b0 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -23,6 +23,7 @@ use crate::filter_pushdown::{ pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; pub use crate::stream::EmptyRecordBatchStream; +pub use crate::work_table::WorkTable; pub use datafusion_common::hash_utils; pub use datafusion_common::utils::project_schema; @@ -570,6 +571,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { child_pushdown_result, )) } + + /// Returns a new execution plan that uses the provided work table, if supported. + /// This enables recursive query execution by allowing work table injection. + /// + /// Primarily implemented by [`WorkTableExec`], but custom execution nodes that wrap + /// or contain `WorkTableExec` instances should also implement this to propagate + /// work table injection to their inner components. + /// + /// See [`WorkTableExec::with_work_table`] for the reference implementation. + /// + /// [`WorkTableExec`]: crate::work_table::WorkTableExec + /// [`WorkTableExec::with_work_table`]: crate::work_table::WorkTableExec::with_work_table + fn with_work_table( + &self, + _work_table: Arc, + ) -> Option> { + None + } } /// [`ExecutionPlan`] Invariant Level diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 22ae859e8c5b..a4798536388f 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -50,6 +50,7 @@ pub use crate::ordering::InputOrderMode; pub use crate::stream::EmptyRecordBatchStream; pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; +pub use crate::work_table::WorkTable; pub use spill::spill_manager::SpillManager; mod ordering; diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 210db90c3c7f..c61ec060c748 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -351,16 +351,14 @@ fn assign_work_table( ) -> Result> { let mut work_table_refs = 0; plan.transform_down(|plan| { - if let Some(exec) = plan.as_any().downcast_ref::() { + if let Some(new_plan) = plan.with_work_table(Arc::clone(&work_table)) { if work_table_refs > 0 { not_impl_err!( "Multiple recursive references to the same CTE are not supported" ) } else { work_table_refs += 1; - Ok(Transformed::yes(Arc::new( - exec.with_work_table(Arc::clone(&work_table)), - ))) + Ok(Transformed::yes(new_plan)) } } else if plan.as_any().is::() { not_impl_err!("Recursive queries cannot be nested") diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index e6179cd75ffb..79cac96cd71b 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -57,7 +57,7 @@ impl ReservedBatches { /// See /// This table serves as a mirror or buffer between each iteration of a recursive query. #[derive(Debug)] -pub(super) struct WorkTable { +pub struct WorkTable { batches: Mutex>, } @@ -132,16 +132,6 @@ impl WorkTableExec { Arc::clone(&self.schema) } - pub(super) fn with_work_table(&self, work_table: Arc) -> Self { - Self { - name: self.name.clone(), - schema: Arc::clone(&self.schema), - metrics: ExecutionPlanMetricsSet::new(), - work_table, - cache: self.cache.clone(), - } - } - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { PlanProperties::new( @@ -184,10 +174,6 @@ impl ExecutionPlan for WorkTableExec { &self.cache } - fn children(&self) -> Vec<&Arc> { - vec![] - } - fn maintains_input_order(&self) -> Vec { vec![false] } @@ -196,6 +182,10 @@ impl ExecutionPlan for WorkTableExec { vec![false] } + fn children(&self) -> Vec<&Arc> { + vec![] + } + fn with_new_children( self: Arc, _: Vec>, @@ -234,6 +224,22 @@ impl ExecutionPlan for WorkTableExec { fn partition_statistics(&self, _partition: Option) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + /// Creates a new `WorkTableExec` with the provided work table for recursive query execution. + /// During query planning, `WorkTableExec` nodes are created as placeholders; this method + /// "wires up" the actual work table that coordinates data between recursive iterations. + fn with_work_table( + &self, + work_table: Arc, + ) -> Option> { + Some(Arc::new(Self { + name: self.name.clone(), + schema: Arc::clone(&self.schema), + metrics: ExecutionPlanMetricsSet::new(), + work_table, + cache: self.cache.clone(), + })) + } } #[cfg(test)] From 70e59e70d716ae2498f1cd9e6717382377a99c57 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Mon, 23 Jun 2025 12:01:20 +0200 Subject: [PATCH 2/2] feat: generalize dynamic state injection via `with_new_state` on `ExecutionPlan` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a generic `with_new_state` method to the `ExecutionPlan` trait, superseding the specialized `with_work_table`. This provides a uniform mechanism for execution nodes to accept late-bound, run-time state of any type, improving extensibility for custom operators. Motivation: `RecursiveQueryExec` and other potential operators need to retrofit execution plans with run-time state (e.g. the working table for recursive CTEs). The previous `with_work_table` API was hard-coded to one concrete type, preventing wrapper or third-party nodes from re-using the same mechanism for different kinds of state. Changes: - Replace `with_work_table` with `with_new_state(&self, state: Arc)` on `ExecutionPlan`; default implementation returns `None`. - Update rustdoc on the trait to describe the generic contract and thread- safety (`Send + Sync`) requirements. - Implement `with_new_state` in `WorkTableExec`, down-casting the supplied state to `Arc` and returning an updated plan when applicable. - Refactor `RecursiveQueryExec::assign_work_table` to use the new trait method, passing the work table as generic state. - Refresh documentation throughout to emphasise that the mechanism is generic, using `WorkTableExec` solely as an illustrative example. This refactor future-proofs DataFusion’s execution API, enabling any custom node to inject and propagate arbitrary shared state without introducing new trait methods each time. --- .../physical-plan/src/execution_plan.rs | 28 +++++++++++-------- .../physical-plan/src/recursive_query.rs | 4 ++- datafusion/physical-plan/src/work_table.rs | 17 +++++++---- 3 files changed, 31 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index d67c94ef54b0..b39349433e36 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -23,7 +23,6 @@ use crate::filter_pushdown::{ pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; pub use crate::stream::EmptyRecordBatchStream; -pub use crate::work_table::WorkTable; pub use datafusion_common::hash_utils; pub use datafusion_common::utils::project_schema; @@ -572,20 +571,25 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { )) } - /// Returns a new execution plan that uses the provided work table, if supported. - /// This enables recursive query execution by allowing work table injection. + /// Injects arbitrary run-time state into this execution plan, returning a new plan + /// instance that incorporates that state *if* it is relevant to the concrete + /// node implementation. /// - /// Primarily implemented by [`WorkTableExec`], but custom execution nodes that wrap - /// or contain `WorkTableExec` instances should also implement this to propagate - /// work table injection to their inner components. + /// This is a generic entry point: the `state` can be any type wrapped in + /// `Arc`. A node that cares about the state should + /// down-cast it to the concrete type it expects and, if successful, return a + /// modified copy of itself that captures the provided value. If the state is + /// not applicable, the default behaviour is to return `None` so that parent + /// nodes can continue propagating the attempt further down the plan tree. /// - /// See [`WorkTableExec::with_work_table`] for the reference implementation. - /// - /// [`WorkTableExec`]: crate::work_table::WorkTableExec - /// [`WorkTableExec::with_work_table`]: crate::work_table::WorkTableExec::with_work_table - fn with_work_table( + /// For example, [`WorkTableExec`](crate::work_table::WorkTableExec) + /// down-casts the supplied state to an `Arc` + /// in order to wire up the working table used during recursive-CTE execution. + /// Similar patterns can be followed by custom nodes that need late-bound + /// dependencies or shared state. + fn with_new_state( &self, - _work_table: Arc, + _state: Arc, ) -> Option> { None } diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index c61ec060c748..99b460dfcfdc 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -351,7 +351,9 @@ fn assign_work_table( ) -> Result> { let mut work_table_refs = 0; plan.transform_down(|plan| { - if let Some(new_plan) = plan.with_work_table(Arc::clone(&work_table)) { + if let Some(new_plan) = + plan.with_new_state(Arc::clone(&work_table) as Arc) + { if work_table_refs > 0 { not_impl_err!( "Multiple recursive references to the same CTE are not supported" diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 79cac96cd71b..076e30ab902d 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -225,13 +225,20 @@ impl ExecutionPlan for WorkTableExec { Ok(Statistics::new_unknown(&self.schema())) } - /// Creates a new `WorkTableExec` with the provided work table for recursive query execution. - /// During query planning, `WorkTableExec` nodes are created as placeholders; this method - /// "wires up" the actual work table that coordinates data between recursive iterations. - fn with_work_table( + /// Injects run-time state into this `WorkTableExec`. + /// + /// The only state this node currently understands is an [`Arc`]. + /// If `state` can be down-cast to that type, a new `WorkTableExec` backed + /// by the provided work table is returned. Otherwise `None` is returned + /// so that callers can attempt to propagate the state further down the + /// execution plan tree. + fn with_new_state( &self, - work_table: Arc, + state: Arc, ) -> Option> { + // Down-cast to the expected state type; propagate `None` on failure + let work_table = state.downcast::().ok()?; + Some(Arc::new(Self { name: self.name.clone(), schema: Arc::clone(&self.schema),