Replies: 5 comments 14 replies
-
Thanks for the proposal! @zanmato1984 What do you think about this proposal? |
Beta Was this translation helpful? Give feedback.
-
I think this is a thoughtful and reasonable proposal. Thanks for putting it all together! It’s also delightful to see people actively using Acero “the raw way,” and I really appreciate the contributions you and your colleagues have already made. While Acero isn’t intended to be a cutting-edge query engine competing with systems like DuckDB or Velox, what we value most is enablement: giving users the ability to compose, orchestrate, and integrate Arrow-based data processing in customizable ways. From that perspective, I see your proposals as strengthening this core purpose, and none of them seem in conflict with it. I’d be glad to help continue the discussion or review individual follow-up issues/PRs. I’ve noticed some of the ongoing work already (the pipe node family idea in particular is really cool!). That said, I do need to prioritize my engagement based on factors like criticality, author enthusiasm, PR size, and my familiarity with the relative code path. So please don’t hesitate to ping me directly if there’s something you’d like me to prioritize. Overall, I really appreciate you bringing these ideas forward. It’s great to see thoughtful proposals like this shaping Acero’s future. |
Beta Was this translation helpful? Give feedback.
-
I have created umbrella Issue #47383 and multiple subissues. Thank you @kou @pitrou @zanmato1984 for the help. Can you somehow expedite review of PRs #47386 and #47392. I really want to keep the momentum going and those two are blockers. |
Beta Was this translation helpful? Give feedback.
-
Cool, I can help reviewing them shortly. |
Beta Was this translation helpful? Give feedback.
-
I am path finding best way to unify access patterns of different ExecNodes. One important question arises that I am not sure of the answer. Is it legal to submit blocking task to io_executor? With blocking I mean waiting on condition to become true (not blocking as in waiting for read to complete). |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I would like to start a broad conversation on future of Acero development.
I have been using Acero for quite some time now and I think I have general understanding of current state. I do not use substrait nor python. I am strictly using Acero for streaming execution and I found Acero to be well designed and thought through. At first used concepts were overwhelming but later on I found them all useful and powerful. However along the way I spotted some hiccups and with my colleagues we fixed few of them.
With all my experience with Acero I came to conclusion that Acero needs some core changes in order to remain versatile and extensible library for streaming execution. This is general idea of this discussion. If/How such core changes can be introduced. I would like to split the discussion into few distinct topics.
In my understanding Acero was initially designed to execute queries on arrow datasets. Usually queries are not executed in ordered fashion - ordering is optionally added as final step (hence OrderBySinkNode). As such Acero did not tackle source ordering. Later on additional concept of batch.index was introduced that paved the way for maintaining and leveraging source ordering (Topic 1).
All queries that I know of have only one output and as expected all ExecNodes have only one output. I am aware that Acero foundation does not prohibit multiple outputs in general, but as of now there are no multi-output nodes. (Topic 2).
With current state of Acero few coding patters occur. I think they should be considered for factoring out do remove code duplication and to simplify amintenance (Topic 3).
Since introduction of batch.index not all exec nodes comply to this new semantics - even though multiple exec nodes do realy on data order. Most notable:
-asof_join - ordering done GH-41706. Backpressure pending merge GH-46421
-sorted_merge -
PR ready GH-47269not ready-aggregate -
PR ready GH-47269 - ordering needed conditionallynot readyIn addition to those nodes all source and sink nodes need to account now ordering concept and user intention to maintain ordering of the source.
Solution to all those nodes that require ordering was to introduce SerialSequencingQueue. Although it fixes ordering it unfortunately breaks backpressure (SerialSequencingQueue does not limit how many items are queued). To fix backpressure SerialSequencingQueue has to produce its own pause signal and also propagate pause from downstream. I think the logic of this becomes to convoluted to replicate it in every ExecNode. So since ordering is now a global concept I think we should move validation_of_ordering + sequencing + backpressure logic out of specific ExecNodes and into ExecNode base. This would let implementer of new exec node focus on actual data processing and use already implemented access patterns of inputs and outputs, that have already emerged.
As extra feature Ordering could offer additional “stream” guarantees. Stream guarantee would hold condition that is guaranteed to be true for the rest of data stream (like “timestamp>x). This could be used to push timeline/segment in order leveraging execnodes.
Multiple outputs ExecNodes are mentioned in sevaral placed across documentation and issues, but no implementation of such node was ever implemented. In my application I found it neccesary to produce multiple outputs from single processing pipeline. In the beginning dataset
tee
node was enough, but along the way it turned out to be not flexible enough. A little bit inspired by implementation of tee node I created new pipe concept that fits Acero quite well and provides quite elegant alternative to multiple output nodes. In summary there are three new nodes:-
pipe_sink
- node consumes all exec batches and replicates them to allpipe_source
nodes-
pipe_source
- is a source node that receives batches.-
pipe_tee
- node replicates batches topipe_source
and outputAll pipe nodes have names and sinks are connected by name with sources at init stage. Additionally
pipe
can be instantiated as an element in exec node to provide additional outputs (for example “filter” node can provide additional output of filtered out data). I have been using this concept in my processing pipeline extensively and now I completely stopped usingtee
node.The reason why I think pipes are strong alternative to multi-output nodes is that pipes fit elegantly with entire
Declaration
infrastructure. With single output declaration always is a tree. Multiple outputs would create a directed graph that requires changing literary entire ExecPlan building infrastructure. With pipes we get effectively the same functionality that fits current ExecPlan building process.I find several changes that would benefit Acero as a whole:
I am not a maintainer of Arrow nor I am affiliated with Apache in any way, but I hope this discussion produces some kind of general roadmap for future development of Acero. All this might seem like extreme makeover but I honestly believe this is shortest path for fixing the current issues with backpressure and as a bonus really cleanup and simplify Acero codebase. I am ready to invest some time into it, but first I want to know whether this complies with maintainers plans and vision.
Beta Was this translation helpful? Give feedback.
All reactions