diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 2605e26c3c7f..b39349433e36 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -570,6 +570,29 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { child_pushdown_result, )) } + + /// Injects arbitrary run-time state into this execution plan, returning a new plan + /// instance that incorporates that state *if* it is relevant to the concrete + /// node implementation. + /// + /// This is a generic entry point: the `state` can be any type wrapped in + /// `Arc`. A node that cares about the state should + /// down-cast it to the concrete type it expects and, if successful, return a + /// modified copy of itself that captures the provided value. If the state is + /// not applicable, the default behaviour is to return `None` so that parent + /// nodes can continue propagating the attempt further down the plan tree. + /// + /// For example, [`WorkTableExec`](crate::work_table::WorkTableExec) + /// down-casts the supplied state to an `Arc` + /// in order to wire up the working table used during recursive-CTE execution. + /// Similar patterns can be followed by custom nodes that need late-bound + /// dependencies or shared state. + fn with_new_state( + &self, + _state: Arc, + ) -> Option> { + None + } } /// [`ExecutionPlan`] Invariant Level diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 22ae859e8c5b..a4798536388f 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -50,6 +50,7 @@ pub use crate::ordering::InputOrderMode; pub use crate::stream::EmptyRecordBatchStream; pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; +pub use crate::work_table::WorkTable; pub use spill::spill_manager::SpillManager; mod ordering; diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 210db90c3c7f..99b460dfcfdc 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -351,16 +351,16 @@ fn assign_work_table( ) -> Result> { let mut work_table_refs = 0; plan.transform_down(|plan| { - if let Some(exec) = plan.as_any().downcast_ref::() { + if let Some(new_plan) = + plan.with_new_state(Arc::clone(&work_table) as Arc) + { if work_table_refs > 0 { not_impl_err!( "Multiple recursive references to the same CTE are not supported" ) } else { work_table_refs += 1; - Ok(Transformed::yes(Arc::new( - exec.with_work_table(Arc::clone(&work_table)), - ))) + Ok(Transformed::yes(new_plan)) } } else if plan.as_any().is::() { not_impl_err!("Recursive queries cannot be nested") diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index e6179cd75ffb..076e30ab902d 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -57,7 +57,7 @@ impl ReservedBatches { /// See /// This table serves as a mirror or buffer between each iteration of a recursive query. #[derive(Debug)] -pub(super) struct WorkTable { +pub struct WorkTable { batches: Mutex>, } @@ -132,16 +132,6 @@ impl WorkTableExec { Arc::clone(&self.schema) } - pub(super) fn with_work_table(&self, work_table: Arc) -> Self { - Self { - name: self.name.clone(), - schema: Arc::clone(&self.schema), - metrics: ExecutionPlanMetricsSet::new(), - work_table, - cache: self.cache.clone(), - } - } - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { PlanProperties::new( @@ -184,10 +174,6 @@ impl ExecutionPlan for WorkTableExec { &self.cache } - fn children(&self) -> Vec<&Arc> { - vec![] - } - fn maintains_input_order(&self) -> Vec { vec![false] } @@ -196,6 +182,10 @@ impl ExecutionPlan for WorkTableExec { vec![false] } + fn children(&self) -> Vec<&Arc> { + vec![] + } + fn with_new_children( self: Arc, _: Vec>, @@ -234,6 +224,29 @@ impl ExecutionPlan for WorkTableExec { fn partition_statistics(&self, _partition: Option) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + /// Injects run-time state into this `WorkTableExec`. + /// + /// The only state this node currently understands is an [`Arc`]. + /// If `state` can be down-cast to that type, a new `WorkTableExec` backed + /// by the provided work table is returned. Otherwise `None` is returned + /// so that callers can attempt to propagate the state further down the + /// execution plan tree. + fn with_new_state( + &self, + state: Arc, + ) -> Option> { + // Down-cast to the expected state type; propagate `None` on failure + let work_table = state.downcast::().ok()?; + + Some(Arc::new(Self { + name: self.name.clone(), + schema: Arc::clone(&self.schema), + metrics: ExecutionPlanMetricsSet::new(), + work_table, + cache: self.cache.clone(), + })) + } } #[cfg(test)]