-
Notifications
You must be signed in to change notification settings - Fork 1.5k
fix: make with_new_state
a trait method for ExecutionPlan
#16469
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: make with_new_state
a trait method for ExecutionPlan
#16469
Conversation
eab7bab
to
adeb957
Compare
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed. However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented. See also: apache/datafusion#16469 and related discussions.
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed. However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented. See also: apache/datafusion#16469 and related discussions.
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed. However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented. See also: apache/datafusion#16469 and related discussions.
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed. However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented. See also: apache/datafusion#16469 and related discussions.
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed. However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented. See also: apache/datafusion#16469 and related discussions.
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed. However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented. See also: apache/datafusion#16469 and related discussions.
This change introduces partial support for tracing and metrics instrumentation of recursive queries in DataFusion. Recursive queries now spawn a new span group for each recursive call, as the recursive implementation recreates the recursive nodes for each loop iteration. This is enabled by supporting the `with_new_inner` method on `InstrumentedExec`, which allows the optimizer to rebuild instrumented plans as needed. However, due to current limitations in DataFusion, nodes of type `WorkTableExec` are not instrumented, as wrapping them would break recursive query execution. This limitation will be revisited once upstream changes allow `WorkTableExec` to be safely instrumented. See also: apache/datafusion#16469 and related discussions.
with_work_table
a trait method for ExecutionPlan
with_work_table
a trait method for ExecutionPlan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @geoffreyclaude -- the problem and solution make sense to me. However, I do wonder if we should try and make the API a little more general.
What do you think?
@@ -580,6 +581,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |||
// cooperate with yielding. | |||
None | |||
} | |||
|
|||
/// Returns a new execution plan that uses the provided work table, if supported. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this comment notes, this method is quite specific to WorkTableExec
I wonder if it might be a good idea to make a slightly more general API for updating state
Maybe something like
fn with_updated_state(&self, state: Arc<dyn Any>) -> Option<Arc<dyn ExecutionPlan>>
This would allow any arbitrary state to be updated and would make it eaiser for wrapper nodes like that in Datafusion tracing to pass through state without having to know anything about what is in it 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @alamb,
I'm glad you mention the Any
solution! I was initially going with something similar, but it seemed to me only the WorkTableExec
needed this, so I didn't want to over-generalize.
However, moving to the Any
solution anytime in the future (maybe because this late update pattern becomes more frequent for other concrete nodes) will mean breaking the specific WorkTableExec
implementation.
So for the sake of forward compatibility, I very much agree with your suggested signature! I'll update the PR and mention you once done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb PR's ready for a second review 👀
Expose `with_work_table` as a public trait method on `ExecutionPlan`, allowing custom and external execution nodes to support recursive query work table injection. Motivation: The `RecursiveQueryExec` dynamically rebuilds its child execution plan during execution, and assumes it will find nodes that support work table injection. Previously, only the concrete `WorkTableExec` type supported this, making it difficult for custom or wrapper nodes to participate in recursive query execution. Changes: - Add `with_work_table` as a public method to the `ExecutionPlan` trait, with a default implementation returning `None`. - Update `WorkTableExec` to implement this method, returning a new instance with the provided work table. - Update documentation: clarify the purpose of `with_work_table` in both the trait and the `WorkTableExec` implementation, and add direct doc links for discoverability. - Update `assign_work_table` logic in `RecursiveQueryExec` to use the trait method, enabling support for custom nodes that wrap or contain a `WorkTableExec`. - Re-export `WorkTable` publicly so external implementors can use it in their own `ExecutionPlan` implementations. This change enables extensibility for recursive query support, allowing external crates and custom execution plans to participate in work table injection without relying on internal module structure or concrete type checks.
25c65a6
to
e1ab372
Compare
with_work_table
a trait method for ExecutionPlan
with_new_state
a trait method for ExecutionPlan
…cutionPlan` Introduce a generic `with_new_state` method to the `ExecutionPlan` trait, superseding the specialized `with_work_table`. This provides a uniform mechanism for execution nodes to accept late-bound, run-time state of any type, improving extensibility for custom operators. Motivation: `RecursiveQueryExec` and other potential operators need to retrofit execution plans with run-time state (e.g. the working table for recursive CTEs). The previous `with_work_table` API was hard-coded to one concrete type, preventing wrapper or third-party nodes from re-using the same mechanism for different kinds of state. Changes: - Replace `with_work_table` with `with_new_state(&self, state: Arc<dyn Any + Send + Sync>)` on `ExecutionPlan`; default implementation returns `None`. - Update rustdoc on the trait to describe the generic contract and thread- safety (`Send + Sync`) requirements. - Implement `with_new_state` in `WorkTableExec`, down-casting the supplied state to `Arc<WorkTable>` and returning an updated plan when applicable. - Refactor `RecursiveQueryExec::assign_work_table` to use the new trait method, passing the work table as generic state. - Refresh documentation throughout to emphasise that the mechanism is generic, using `WorkTableExec` solely as an illustrative example. This refactor future-proofs DataFusion’s execution API, enabling any custom node to inject and propagate arbitrary shared state without introducing new trait methods each time.
e1ab372
to
70e59e7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me @geoffreyclaude -- and thank you for the very clear and thorough comments
Which issue does this PR close?
This PR addresses the extensibility limitation that causes recursive queries to fail when used with the
datafusion-tracing
crate, as reported in datafusion-contrib/datafusion-tracing#5.While there isn't a specific DataFusion issue filed, this change enables external crates like
datafusion-tracing
to properly support recursive queries by implementing work table injection in their wrapper nodes.Rationale for this change
Currently, recursive query execution in
RecursiveQueryExec
uses downcasting to findWorkTableExec
nodes and inject work tables. This approach fails when execution plans are wrapped by custom execution nodes, as is done by thedatafusion-tracing
crate, causing recursive queries to fail with "Unexpected empty work table" errors.The fundamental issue is that
RecursiveQueryExec
assumes direct access to concreteWorkTableExec
types, but wrapper nodes (like those used for tracing) break this assumption. This limits DataFusion's extensibility for recursive queries and prevents external crates from properly supporting this feature.What changes are included in this PR?
Commit 1
with_work_table
as a public trait method onExecutionPlan
with a default implementation returningNone
.WorkTableExec
to implement the trait method, creating a new instance with the provided work table.assign_work_table
logic inRecursiveQueryExec
to use the trait method instead of down-casting, enabling support for wrapper nodes.WorkTable
so external implementers can access it without knowing internal module structure.with_work_table
.Commit 2 (follow-up/generalisation)
with_work_table
API with a genericwith_new_state(&self, state: Arc<dyn Any + Send + Sync>)
onExecutionPlan
.The default implementation still returns
None
.with_new_state
inWorkTableExec
by down-casting the supplied state to anArc<WorkTable>
.RecursiveQueryExec::assign_work_table
to use the new generic API.WorkTableExec
as an illustrative example.WorkTable
from the root ofphysical-plan
so custom external nodes can downcastArc<dyn Any + Send + Sync>
intoArc<WorkTable>
Are these changes tested?
The changes are covered by existing tests:
WorkTableExec
implementation is tested through existing recursive query execution pathsThis change enables functionality that was previously broken (recursive queries with instrumentation), so it fixes existing test scenarios in external crates rather than requiring new DataFusion-specific tests.
Compiling the
datafusion-tracing
crate against this branch locally enables full instrumentation of recursive queries, with one "span group" per recursion, as demonstrated by the Jaeger screenshot below for the following basic recursive SQL:Are there any user-facing changes?
API Addition (Non-breaking):
with_new_state
method to theExecutionPlan
trait with a defaultNone
-returning implementationWorkTable
) inside their ownExecutionPlan
implementations.Benefits for Users:
with_new_state
in their custom execution plans to support recursive queriesNo Breaking Changes:
ExecutionPlan
implementations don't need updatesWorkTableExec
remains identicalThis change primarily enables extensibility that was previously impossible, resolving compatibility issues between recursive queries and external crates.