Skip to content

fix: make with_new_state a trait method for ExecutionPlan #16469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,29 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
child_pushdown_result,
))
}

/// Injects arbitrary run-time state into this execution plan, returning a new plan
/// instance that incorporates that state *if* it is relevant to the concrete
/// node implementation.
///
/// This is a generic entry point: the `state` can be any type wrapped in
/// `Arc<dyn Any + Send + Sync>`. A node that cares about the state should
/// down-cast it to the concrete type it expects and, if successful, return a
/// modified copy of itself that captures the provided value. If the state is
/// not applicable, the default behaviour is to return `None` so that parent
/// nodes can continue propagating the attempt further down the plan tree.
///
/// For example, [`WorkTableExec`](crate::work_table::WorkTableExec)
/// down-casts the supplied state to an `Arc<WorkTable>`
/// in order to wire up the working table used during recursive-CTE execution.
/// Similar patterns can be followed by custom nodes that need late-bound
/// dependencies or shared state.
fn with_new_state(
&self,
_state: Arc<dyn Any + Send + Sync>,
) -> Option<Arc<dyn ExecutionPlan>> {
None
}
}

/// [`ExecutionPlan`] Invariant Level
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub use crate::ordering::InputOrderMode;
pub use crate::stream::EmptyRecordBatchStream;
pub use crate::topk::TopK;
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
pub use crate::work_table::WorkTable;
pub use spill::spill_manager::SpillManager;

mod ordering;
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,16 +351,16 @@ fn assign_work_table(
) -> Result<Arc<dyn ExecutionPlan>> {
let mut work_table_refs = 0;
plan.transform_down(|plan| {
if let Some(exec) = plan.as_any().downcast_ref::<WorkTableExec>() {
if let Some(new_plan) =
plan.with_new_state(Arc::clone(&work_table) as Arc<dyn Any + Send + Sync>)
{
if work_table_refs > 0 {
not_impl_err!(
"Multiple recursive references to the same CTE are not supported"
)
} else {
work_table_refs += 1;
Ok(Transformed::yes(Arc::new(
exec.with_work_table(Arc::clone(&work_table)),
)))
Ok(Transformed::yes(new_plan))
}
} else if plan.as_any().is::<RecursiveQueryExec>() {
not_impl_err!("Recursive queries cannot be nested")
Expand Down
43 changes: 28 additions & 15 deletions datafusion/physical-plan/src/work_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl ReservedBatches {
/// See <https://wiki.postgresql.org/wiki/CTEReadme#How_Recursion_Works>
/// This table serves as a mirror or buffer between each iteration of a recursive query.
#[derive(Debug)]
pub(super) struct WorkTable {
pub struct WorkTable {
batches: Mutex<Option<ReservedBatches>>,
}

Expand Down Expand Up @@ -132,16 +132,6 @@ impl WorkTableExec {
Arc::clone(&self.schema)
}

pub(super) fn with_work_table(&self, work_table: Arc<WorkTable>) -> Self {
Self {
name: self.name.clone(),
schema: Arc::clone(&self.schema),
metrics: ExecutionPlanMetricsSet::new(),
work_table,
cache: self.cache.clone(),
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
PlanProperties::new(
Expand Down Expand Up @@ -184,10 +174,6 @@ impl ExecutionPlan for WorkTableExec {
&self.cache
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![false]
}
Expand All @@ -196,6 +182,10 @@ impl ExecutionPlan for WorkTableExec {
vec![false]
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
Expand Down Expand Up @@ -234,6 +224,29 @@ impl ExecutionPlan for WorkTableExec {
fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}

/// Injects run-time state into this `WorkTableExec`.
///
/// The only state this node currently understands is an [`Arc<WorkTable>`].
/// If `state` can be down-cast to that type, a new `WorkTableExec` backed
/// by the provided work table is returned. Otherwise `None` is returned
/// so that callers can attempt to propagate the state further down the
/// execution plan tree.
fn with_new_state(
&self,
state: Arc<dyn Any + Send + Sync>,
) -> Option<Arc<dyn ExecutionPlan>> {
// Down-cast to the expected state type; propagate `None` on failure
let work_table = state.downcast::<WorkTable>().ok()?;

Some(Arc::new(Self {
name: self.name.clone(),
schema: Arc::clone(&self.schema),
metrics: ExecutionPlanMetricsSet::new(),
work_table,
cache: self.cache.clone(),
}))
}
}

#[cfg(test)]
Expand Down