Skip to content

fix: make with_new_state a trait method for ExecutionPlan #16469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

geoffreyclaude
Copy link
Contributor

@geoffreyclaude geoffreyclaude commented Jun 20, 2025

Which issue does this PR close?

This PR addresses the extensibility limitation that causes recursive queries to fail when used with the datafusion-tracing crate, as reported in datafusion-contrib/datafusion-tracing#5.

While there isn't a specific DataFusion issue filed, this change enables external crates like datafusion-tracing to properly support recursive queries by implementing work table injection in their wrapper nodes.

Rationale for this change

Currently, recursive query execution in RecursiveQueryExec uses downcasting to find WorkTableExec nodes and inject work tables. This approach fails when execution plans are wrapped by custom execution nodes, as is done by the datafusion-tracing crate, causing recursive queries to fail with "Unexpected empty work table" errors.

The fundamental issue is that RecursiveQueryExec assumes direct access to concrete WorkTableExec types, but wrapper nodes (like those used for tracing) break this assumption. This limits DataFusion's extensibility for recursive queries and prevents external crates from properly supporting this feature.

What changes are included in this PR?

Commit 1

  • Add with_work_table as a public trait method on ExecutionPlan with a default implementation returning None.
  • Update WorkTableExec to implement the trait method, creating a new instance with the provided work table.
  • Modify assign_work_table logic in RecursiveQueryExec to use the trait method instead of down-casting, enabling support for wrapper nodes.
  • Publicly re-export WorkTable so external implementers can access it without knowing internal module structure.
  • Add comprehensive documentation explaining the purpose and usage of with_work_table.

Commit 2 (follow-up/generalisation)

  • Replace the specialised with_work_table API with a generic with_new_state(&self, state: Arc<dyn Any + Send + Sync>) on ExecutionPlan.
    The default implementation still returns None.
  • Implement with_new_state in WorkTableExec by down-casting the supplied state to an Arc<WorkTable>.
  • Refactor RecursiveQueryExec::assign_work_table to use the new generic API.
  • Update documentation throughout to reflect the generic nature of state injection and keep WorkTableExec as an illustrative example.
  • Maintain the public re-export of WorkTable from the root of physical-plan so custom external nodes can downcast Arc<dyn Any + Send + Sync> into Arc<WorkTable>

Are these changes tested?

The changes are covered by existing tests:

  • All existing recursive query tests continue to pass, ensuring backward compatibility
  • The WorkTableExec implementation is tested through existing recursive query execution paths
  • The trait method follows the same logic as the previous direct implementation

This change enables functionality that was previously broken (recursive queries with instrumentation), so it fixes existing test scenarios in external crates rather than requiring new DataFusion-specific tests.

Compiling the datafusion-tracing crate against this branch locally enables full instrumentation of recursive queries, with one "span group" per recursion, as demonstrated by the Jaeger screenshot below for the following basic recursive SQL:

WITH RECURSIVE numbers(n) AS (
    SELECT 1 AS n
    UNION ALL
    SELECT n + 1 FROM numbers WHERE n < 3
) SELECT n FROM numbers

image

Are there any user-facing changes?

API Addition (Non-breaking):

  • Adds with_new_state method to the ExecutionPlan trait with a default None-returning implementation
  • External crates can down-cast any late-bound state they need (e.g. WorkTable) inside their own ExecutionPlan implementations.

Benefits for Users:

  • External crates can now implement with_new_state in their custom execution plans to support recursive queries
  • Instrumentation and wrapper nodes can properly participate in recursive query execution
  • Enables recursive queries to work with tracing, monitoring, and other execution plan decorators

No Breaking Changes:

  • All existing code continues to work unchanged
  • The new trait method has a default implementation, so existing ExecutionPlan implementations don't need updates
  • Internal behavior for WorkTableExec remains identical

This change primarily enables extensibility that was previously impossible, resolving compatibility issues between recursive queries and external crates.

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Jun 20, 2025
@geoffreyclaude geoffreyclaude force-pushed the feat/with_work_table_trait branch from eab7bab to adeb957 Compare June 20, 2025 08:26
geoffreyclaude added a commit to datafusion-contrib/datafusion-tracing that referenced this pull request Jun 20, 2025
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed.

However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented.

See also: apache/datafusion#16469 and related discussions.
geoffreyclaude added a commit to datafusion-contrib/datafusion-tracing that referenced this pull request Jun 20, 2025
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed.

However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented.

See also: apache/datafusion#16469 and related discussions.
geoffreyclaude added a commit to datafusion-contrib/datafusion-tracing that referenced this pull request Jun 20, 2025
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed.

However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented.

See also: apache/datafusion#16469 and related discussions.
geoffreyclaude added a commit to datafusion-contrib/datafusion-tracing that referenced this pull request Jun 20, 2025
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed.

However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented.

See also: apache/datafusion#16469 and related discussions.
geoffreyclaude added a commit to datafusion-contrib/datafusion-tracing that referenced this pull request Jun 20, 2025
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed.

However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented.

See also: apache/datafusion#16469 and related discussions.
geoffreyclaude added a commit to datafusion-contrib/datafusion-tracing that referenced this pull request Jun 20, 2025
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed.

However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented.

See also: apache/datafusion#16469 and related discussions.
geoffreyclaude added a commit to datafusion-contrib/datafusion-tracing that referenced this pull request Jun 20, 2025
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed.

However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented.

See also: apache/datafusion#16469 and related discussions.
@geoffreyclaude geoffreyclaude changed the title feat: make with_work_table a trait method for ExecutionPlan fix: make with_work_table a trait method for ExecutionPlan Jun 20, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @geoffreyclaude -- the problem and solution make sense to me. However, I do wonder if we should try and make the API a little more general.

What do you think?

@@ -580,6 +581,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
// cooperate with yielding.
None
}

/// Returns a new execution plan that uses the provided work table, if supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this comment notes, this method is quite specific to WorkTableExec

I wonder if it might be a good idea to make a slightly more general API for updating state

Maybe something like

fn with_updated_state(&self, state: Arc<dyn Any>) ->  Option<Arc<dyn ExecutionPlan>> 

This would allow any arbitrary state to be updated and would make it eaiser for wrapper nodes like that in Datafusion tracing to pass through state without having to know anything about what is in it 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alamb,
I'm glad you mention the Any solution! I was initially going with something similar, but it seemed to me only the WorkTableExec needed this, so I didn't want to over-generalize.
However, moving to the Any solution anytime in the future (maybe because this late update pattern becomes more frequent for other concrete nodes) will mean breaking the specific WorkTableExec implementation.
So for the sake of forward compatibility, I very much agree with your suggested signature! I'll update the PR and mention you once done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb PR's ready for a second review 👀

Expose `with_work_table` as a public trait method on `ExecutionPlan`, allowing custom and external execution nodes to support recursive query work table injection.

Motivation:
The `RecursiveQueryExec` dynamically rebuilds its child execution plan during execution, and assumes it will find nodes that support work table injection. Previously, only the concrete `WorkTableExec` type supported this, making it difficult for custom or wrapper nodes to participate in recursive query execution.

Changes:
- Add `with_work_table` as a public method to the `ExecutionPlan` trait, with a default implementation returning `None`.
- Update `WorkTableExec` to implement this method, returning a new instance with the provided work table.
- Update documentation: clarify the purpose of `with_work_table` in both the trait and the `WorkTableExec` implementation, and add direct doc links for discoverability.
- Update `assign_work_table` logic in `RecursiveQueryExec` to use the trait method, enabling support for custom nodes that wrap or contain a `WorkTableExec`.
- Re-export `WorkTable` publicly so external implementors can use it in their own `ExecutionPlan` implementations.

This change enables extensibility for recursive query support, allowing external crates and custom execution plans to participate in work table injection without relying on internal module structure or concrete type checks.
@geoffreyclaude geoffreyclaude force-pushed the feat/with_work_table_trait branch 4 times, most recently from 25c65a6 to e1ab372 Compare June 23, 2025 10:16
@geoffreyclaude geoffreyclaude changed the title fix: make with_work_table a trait method for ExecutionPlan fix: make with_new_state a trait method for ExecutionPlan Jun 23, 2025
…cutionPlan`

Introduce a generic `with_new_state` method to the `ExecutionPlan` trait,
superseding the specialized `with_work_table`.  This provides a uniform
mechanism for execution nodes to accept late-bound, run-time state of any
type, improving extensibility for custom operators.

Motivation:
`RecursiveQueryExec` and other potential operators need to retrofit
execution plans with run-time state (e.g. the working table for recursive
CTEs).  The previous `with_work_table` API was hard-coded to one concrete
type, preventing wrapper or third-party nodes from re-using the same
mechanism for different kinds of state.

Changes:
- Replace `with_work_table` with `with_new_state(&self, state: Arc<dyn Any + Send + Sync>)`
  on `ExecutionPlan`; default implementation returns `None`.
- Update rustdoc on the trait to describe the generic contract and thread-
  safety (`Send + Sync`) requirements.
- Implement `with_new_state` in `WorkTableExec`, down-casting the supplied
  state to `Arc<WorkTable>` and returning an updated plan when applicable.
- Refactor `RecursiveQueryExec::assign_work_table` to use the new trait
  method, passing the work table as generic state.
- Refresh documentation throughout to emphasise that the mechanism is
  generic, using `WorkTableExec` solely as an illustrative example.

This refactor future-proofs DataFusion’s execution API, enabling any
custom node to inject and propagate arbitrary shared state without
introducing new trait methods each time.
@geoffreyclaude geoffreyclaude force-pushed the feat/with_work_table_trait branch from e1ab372 to 70e59e7 Compare June 23, 2025 10:25
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me @geoffreyclaude -- and thank you for the very clear and thorough comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants