-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Automatically split large single RecordBatches in MemorySource
into smaller batches
#16734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Introduce MIN_BATCH_SIZE constant (1024) in BatchSplitStream - Only wrap the input stream with BatchSplitStream if batch_size >= MIN_BATCH_SIZE - Otherwise, return the original stream directly - This avoids unnecessary stream splitting for very small batch sizes
…ge batches in poll_upstream
- Introduce `batch_split_threshold` configuration in session config and execution options to control minimum batch size for splitting oversized RecordBatches read from sources. - If `batch_split_threshold` is 0, batch splitting is disabled. - Modify DataSourceExec to use new threshold to decide whether to wrap source stream in BatchSplitStream. - Update BatchSplitStream to report metrics on batches split via new SplitMetrics. - Add SplitMetrics for recording number of times input batches are split. - Update tests and add new tests for batch splitting behavior according to threshold. - Adds logging when batch splitting is enabled for a partition. This change improves control over batch sizes to better handle large input batches, enabling memory and performance optimizations.
…rsized RecordBatches
…split threshold comment
I got an alternative idea that might be simpler to implement: We have already implemented MemTable repartition in #15409, this is not working in the issue because now it only handles the case the we have 1 partition with many small batches -- it will re-distribute them evenly. However it won't slice huge batch into smaller ones. datafusion/datafusion/datasource/src/memory.rs Lines 556 to 558 in 4dd7825
Maybe we can extending the existing solution to achieve this goal 🤔 |
hi @2010YOUY01 Do you mean to this PR should be narrowed to only split MemTable datasources by extending #15409?
Per #16717, this could apply to other sources too. |
MemorySource
into smaller batches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @kosiew
I think this PR looks good to me, though I am a little worried about the changes in window.slt
One way we could avoid changes to the plans could to add a new config setting as an "escape" valve to disable this behavior - enable_datasource_repartition
or something
But overall I think it is an improvement that the tests do not rely on some specific output order anymore unless they actually need to
&mut self, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Option<Result<RecordBatch>>> { | ||
match self.input.as_mut().poll_next(cx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the ready!
macro here to avoid some of the handling
https://docs.rs/futures/latest/futures/macro.ready.html
match self.input.as_mut().poll_next(cx) { | |
match ready!(self.input.as_mut().poll_next(cx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL!
@@ -3425,7 +3425,7 @@ physical_plan | |||
06)----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8 | |||
07)------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount], aggr=[] | |||
08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 | |||
09)----------------DataSourceExec: partitions=1, partition_sizes=[1] | |||
09)----------------DataSourceExec: partitions=1, partition_sizes=[2] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the reason the partition sizes increases is that sales_global_with_pk
is created from the output if a query , and since the batch size is 4, this results in more batches from the input.
I think this is a better plan overall
@@ -4535,19 +4535,20 @@ LIMIT 5 | |||
query ITIPTR rowsort | |||
SELECT r.* | |||
FROM sales_global_with_pk as l, sales_global_with_pk as r | |||
ORDER BY 1, 2, 3, 4, 5, 6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that the input is in multiple batches, I agree the order by is required
@@ -4341,6 +4341,9 @@ LIMIT 5; | |||
24 31 | |||
14 94 | |||
|
|||
statement ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this one might be better off as batch_size = 1 to avoid the plan changes later in this file
I see the challenge however.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added another batch_size=1
just before the plans section and preserved the plans.
MemorySource
into smaller batchesMemorySource
into smaller batches
set datafusion.execution.batch_size = 100; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to preserve the results of queries.
In addition, I added ORDER BY to make the row order deterministic.
set datafusion.execution.batch_size = 1; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this to preserve the plans that followed after this line.
Thanks @2010YOUY01 , @alamb for your review. |
I am wandering if we need to do performance benchmark for this PR. |
It is a good idea -- I kicked off some bechmarks. I am not sure how many actually use an in-memory batch. |
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
Thank you @alamb , no regression from above benchmark. |
Which issue does this PR close?
Rationale for this change
Large record batches produced by data sources can overwhelm downstream operators, leading to uneven work distribution, reduced parallelism, or excessive memory usage.
This change introduces a configurable mechanism to automatically split large record batches, ensuring better resource utilization and operator performance.
Additionally, the PR introduces metrics to monitor split activity, aiding observability and performance tuning.
What changes are included in this PR?
Config Options
batch_split
execution option (enabled by default).8192
.Execution Plan Enhancements
BatchSplitStream
to split large record batches before passing them downstream.BatchSplitStream
withinDataSourceExec
.Metrics
SplitMetrics
withbatches_splitted
counter.Testing
datasource_split.rs
:BatchSplitStream
behavior..slt
files) to reflect changed execution plans and batch sizes.SQL Logic Test Adjustments
.slt
files.Are these changes tested?
✅ Yes.
Are there any user-facing changes?
Large record batches will now be automatically split according to
batch_size
(default8192
), potentially impacting performance characteristics but preserving data semantics.