Skip to content

Automatically split large single RecordBatches in MemorySource into smaller batches #16734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
Jul 17, 2025

Conversation

kosiew
Copy link
Contributor

@kosiew kosiew commented Jul 10, 2025

Which issue does this PR close?

Rationale for this change

Large record batches produced by data sources can overwhelm downstream operators, leading to uneven work distribution, reduced parallelism, or excessive memory usage.
This change introduces a configurable mechanism to automatically split large record batches, ensuring better resource utilization and operator performance.

Additionally, the PR introduces metrics to monitor split activity, aiding observability and performance tuning.


What changes are included in this PR?

  • Config Options

    • Added batch_split execution option (enabled by default).
    • Made batch size configurable with default value 8192.
  • Execution Plan Enhancements

    • Introduced BatchSplitStream to split large record batches before passing them downstream.
    • Integrated BatchSplitStream within DataSourceExec.
  • Metrics

    • Added SplitMetrics with batches_splitted counter.
    • Exposed split metrics via the execution plan's metrics system.
  • Testing

    • Added new integration tests for batch splitting scenarios in datasource_split.rs:
      • Large batches split into multiple parts.
      • Exact batch size yields no split.
      • Small batches pass through unchanged.
      • Empty batches handled correctly.
      • Multiple empty batches processed safely.
    • Added unit tests for BatchSplitStream behavior.
    • Updated SQL logic tests (.slt files) to reflect changed execution plans and batch sizes.
  • SQL Logic Test Adjustments

    • Updated expected physical plans and outputs across various .slt files.
    • Changed default execution batch size in tests for consistency.

Are these changes tested?

✅ Yes.

  • Comprehensive integration and unit tests cover the core functionality of batch splitting.
  • SQL logic test suite updated and re-validated.
  • Metrics collection verified via tests.

Are there any user-facing changes?

  • Default Behavior Change:
    Large record batches will now be automatically split according to batch_size (default 8192), potentially impacting performance characteristics but preserving data semantics.

kosiew added 11 commits July 10, 2025 15:05
- Introduce MIN_BATCH_SIZE constant (1024) in BatchSplitStream
- Only wrap the input stream with BatchSplitStream if batch_size >= MIN_BATCH_SIZE
- Otherwise, return the original stream directly
- This avoids unnecessary stream splitting for very small batch sizes
- Introduce `batch_split_threshold` configuration in session config and execution options
  to control minimum batch size for splitting oversized RecordBatches read from sources.
- If `batch_split_threshold` is 0, batch splitting is disabled.
- Modify DataSourceExec to use new threshold to decide whether to wrap source stream in BatchSplitStream.
- Update BatchSplitStream to report metrics on batches split via new SplitMetrics.
- Add SplitMetrics for recording number of times input batches are split.
- Update tests and add new tests for batch splitting behavior according to threshold.
- Adds logging when batch splitting is enabled for a partition.

This change improves control over batch sizes to better handle large input batches, enabling memory and performance optimizations.
@github-actions github-actions bot added core Core DataFusion crate common Related to common crate execution Related to the execution crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Jul 10, 2025
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jul 10, 2025
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Jul 10, 2025
@2010YOUY01
Copy link
Contributor

I got an alternative idea that might be simpler to implement:

We have already implemented MemTable repartition in #15409, this is not working in the issue because now it only handles the case the we have 1 partition with many small batches -- it will re-distribute them evenly. However it won't slice huge batch into smaller ones.
See

if flatten_batches.len() < target_partitions {
return Ok(None);
}

Maybe we can extending the existing solution to achieve this goal 🤔

@kosiew
Copy link
Contributor Author

kosiew commented Jul 13, 2025

hi @2010YOUY01

Do you mean to this PR should be narrowed to only split MemTable datasources by extending #15409?

... some data sources may produce much larger batches. For example, MemTable provides the RecordBatches it was constructed with -- no additional splitting is done

Per #16717, this could apply to other sources too.

@github-actions github-actions bot removed documentation Improvements or additions to documentation execution Related to the execution crate labels Jul 15, 2025
@github-actions github-actions bot removed the common Related to common crate label Jul 15, 2025
@alamb alamb changed the title Add Configurable RecordBatch Splitting for Large Input Batches Automatically split large single RecordBatchesin MemorySource into smaller batches Jul 15, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @kosiew

I think this PR looks good to me, though I am a little worried about the changes in window.slt

One way we could avoid changes to the plans could to add a new config setting as an "escape" valve to disable this behavior - enable_datasource_repartition or something

But overall I think it is an improvement that the tests do not rely on some specific output order anymore unless they actually need to

&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
match self.input.as_mut().poll_next(cx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the ready! macro here to avoid some of the handling
https://docs.rs/futures/latest/futures/macro.ready.html

Suggested change
match self.input.as_mut().poll_next(cx) {
match ready!(self.input.as_mut().poll_next(cx) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL!

@@ -3425,7 +3425,7 @@ physical_plan
06)----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8
07)------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount], aggr=[]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
09)----------------DataSourceExec: partitions=1, partition_sizes=[2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the reason the partition sizes increases is that sales_global_with_pk is created from the output if a query , and since the batch size is 4, this results in more batches from the input.

I think this is a better plan overall

@@ -4535,19 +4535,20 @@ LIMIT 5
query ITIPTR rowsort
SELECT r.*
FROM sales_global_with_pk as l, sales_global_with_pk as r
ORDER BY 1, 2, 3, 4, 5, 6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that the input is in multiple batches, I agree the order by is required

@@ -4341,6 +4341,9 @@ LIMIT 5;
24 31
14 94

statement ok
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one might be better off as batch_size = 1 to avoid the plan changes later in this file

I see the challenge however.

Copy link
Contributor Author

@kosiew kosiew Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb,

I added another batch_size=1
just before the plans section and preserved the plans.

@alamb alamb changed the title Automatically split large single RecordBatchesin MemorySource into smaller batches Automatically split large single RecordBatches in MemorySource into smaller batches Jul 15, 2025
Comment on lines +4345 to +4346
set datafusion.execution.batch_size = 100;

Copy link
Contributor Author

@kosiew kosiew Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to preserve the results of queries.

In addition, I added ORDER BY to make the row order deterministic.

Comment on lines +5189 to +5190
set datafusion.execution.batch_size = 1;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to preserve the plans that followed after this line.

@kosiew
Copy link
Contributor Author

kosiew commented Jul 16, 2025

Thanks @2010YOUY01 , @alamb for your review.

@kosiew kosiew merged commit 63dd4e2 into apache:main Jul 17, 2025
27 checks passed
@zhuqi-lucas
Copy link
Contributor

I am wandering if we need to do performance benchmark for this PR.

@alamb
Copy link
Contributor

alamb commented Jul 17, 2025

I am wandering if we need to do performance benchmark for this PR.

It is a good idea -- I kicked off some bechmarks. I am not sure how many actually use an in-memory batch.

@alamb
Copy link
Contributor

alamb commented Jul 17, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing perf-16717 (2e8b9c9) to d1e6eb4 diff using: tpch_mem10
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented Jul 17, 2025

🤖: Benchmark completed

Details

Comparing HEAD and perf-16717
--------------------
Benchmark tpch_mem_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃ HEAD ┃ perf-16717 ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │ FAIL │       FAIL │ incomparable │
│ QQuery 2     │ FAIL │       FAIL │ incomparable │
│ QQuery 3     │ FAIL │       FAIL │ incomparable │
│ QQuery 4     │ FAIL │       FAIL │ incomparable │
│ QQuery 5     │ FAIL │       FAIL │ incomparable │
│ QQuery 6     │ FAIL │       FAIL │ incomparable │
│ QQuery 7     │ FAIL │       FAIL │ incomparable │
│ QQuery 8     │ FAIL │       FAIL │ incomparable │
│ QQuery 9     │ FAIL │       FAIL │ incomparable │
│ QQuery 10    │ FAIL │       FAIL │ incomparable │
│ QQuery 11    │ FAIL │       FAIL │ incomparable │
│ QQuery 12    │ FAIL │       FAIL │ incomparable │
│ QQuery 13    │ FAIL │       FAIL │ incomparable │
│ QQuery 14    │ FAIL │       FAIL │ incomparable │
│ QQuery 15    │ FAIL │       FAIL │ incomparable │
│ QQuery 16    │ FAIL │       FAIL │ incomparable │
│ QQuery 17    │ FAIL │       FAIL │ incomparable │
│ QQuery 18    │ FAIL │       FAIL │ incomparable │
│ QQuery 19    │ FAIL │       FAIL │ incomparable │
│ QQuery 20    │ FAIL │       FAIL │ incomparable │
│ QQuery 21    │ FAIL │       FAIL │ incomparable │
│ QQuery 22    │ FAIL │       FAIL │ incomparable │
└──────────────┴──────┴────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┓
┃ Benchmark Summary         ┃        ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━┩
│ Total Time (HEAD)         │ 0.00ms │
│ Total Time (perf-16717)   │ 0.00ms │
│ Average Time (HEAD)       │ 0.00ms │
│ Average Time (perf-16717) │ 0.00ms │
│ Queries Faster            │      0 │
│ Queries Slower            │      0 │
│ Queries with No Change    │      0 │
│ Queries with Failure      │     22 │
└───────────────────────────┴────────┘

@alamb
Copy link
Contributor

alamb commented Jul 17, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing perf-16717 (2e8b9c9) to d1e6eb4 diff using: tpch_mem clickbench_partitioned clickbench_extended
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented Jul 17, 2025

🤖: Benchmark completed

Details

Comparing HEAD and perf-16717
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃        HEAD ┃  perf-16717 ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 0     │  1979.70 ms │  1929.21 ms │ no change │
│ QQuery 1     │   734.88 ms │   735.47 ms │ no change │
│ QQuery 2     │  1427.66 ms │  1445.27 ms │ no change │
│ QQuery 3     │   671.59 ms │   677.41 ms │ no change │
│ QQuery 4     │  1365.49 ms │  1365.76 ms │ no change │
│ QQuery 5     │ 14863.04 ms │ 14636.21 ms │ no change │
│ QQuery 6     │  2068.31 ms │  2066.03 ms │ no change │
│ QQuery 7     │  1933.13 ms │  1894.49 ms │ no change │
└──────────────┴─────────────┴─────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary         ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)         │ 25043.79ms │
│ Total Time (perf-16717)   │ 24749.83ms │
│ Average Time (HEAD)       │  3130.47ms │
│ Average Time (perf-16717) │  3093.73ms │
│ Queries Faster            │          0 │
│ Queries Slower            │          0 │
│ Queries with No Change    │          8 │
│ Queries with Failure      │          0 │
└───────────────────────────┴────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃        HEAD ┃  perf-16717 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     2.71 ms │     2.39 ms │ +1.13x faster │
│ QQuery 1     │    34.38 ms │    34.56 ms │     no change │
│ QQuery 2     │    80.57 ms │    83.66 ms │     no change │
│ QQuery 3     │    99.78 ms │   100.65 ms │     no change │
│ QQuery 4     │   588.61 ms │   614.75 ms │     no change │
│ QQuery 5     │   880.28 ms │   856.56 ms │     no change │
│ QQuery 6     │     2.29 ms │     2.29 ms │     no change │
│ QQuery 7     │    40.76 ms │    37.49 ms │ +1.09x faster │
│ QQuery 8     │   876.80 ms │   848.46 ms │     no change │
│ QQuery 9     │  1176.18 ms │  1174.09 ms │     no change │
│ QQuery 10    │   261.11 ms │   267.77 ms │     no change │
│ QQuery 11    │   292.68 ms │   297.71 ms │     no change │
│ QQuery 12    │   881.33 ms │   872.35 ms │     no change │
│ QQuery 13    │  1279.88 ms │  1256.01 ms │     no change │
│ QQuery 14    │   819.25 ms │   817.66 ms │     no change │
│ QQuery 15    │   775.88 ms │   799.87 ms │     no change │
│ QQuery 16    │  1607.42 ms │  1636.56 ms │     no change │
│ QQuery 17    │  1620.25 ms │  1615.32 ms │     no change │
│ QQuery 18    │  2864.00 ms │  2910.79 ms │     no change │
│ QQuery 19    │    87.02 ms │    87.99 ms │     no change │
│ QQuery 20    │  1168.15 ms │  1135.32 ms │     no change │
│ QQuery 21    │  1307.24 ms │  1293.93 ms │     no change │
│ QQuery 22    │  2160.86 ms │  2165.97 ms │     no change │
│ QQuery 23    │  7590.74 ms │  7605.45 ms │     no change │
│ QQuery 24    │   440.62 ms │   449.98 ms │     no change │
│ QQuery 25    │   299.40 ms │   304.73 ms │     no change │
│ QQuery 26    │   437.38 ms │   453.59 ms │     no change │
│ QQuery 27    │  1559.55 ms │  1545.45 ms │     no change │
│ QQuery 28    │ 12844.37 ms │ 12773.32 ms │     no change │
│ QQuery 29    │   521.46 ms │   523.70 ms │     no change │
│ QQuery 30    │   788.01 ms │   792.27 ms │     no change │
│ QQuery 31    │   782.53 ms │   809.92 ms │     no change │
│ QQuery 32    │  2398.23 ms │  2434.78 ms │     no change │
│ QQuery 33    │  3248.58 ms │  3220.43 ms │     no change │
│ QQuery 34    │  3276.66 ms │  3236.63 ms │     no change │
│ QQuery 35    │  1284.66 ms │  1300.04 ms │     no change │
│ QQuery 36    │   119.97 ms │   122.91 ms │     no change │
│ QQuery 37    │    52.78 ms │    51.83 ms │     no change │
│ QQuery 38    │   126.29 ms │   120.23 ms │     no change │
│ QQuery 39    │   198.72 ms │   196.76 ms │     no change │
│ QQuery 40    │    41.05 ms │    41.62 ms │     no change │
│ QQuery 41    │    38.26 ms │    39.00 ms │     no change │
│ QQuery 42    │    33.57 ms │    32.88 ms │     no change │
└──────────────┴─────────────┴─────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary         ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)         │ 54990.25ms │
│ Total Time (perf-16717)   │ 54967.66ms │
│ Average Time (HEAD)       │  1278.84ms │
│ Average Time (perf-16717) │  1278.32ms │
│ Queries Faster            │          2 │
│ Queries Slower            │          0 │
│ Queries with No Change    │         41 │
│ Queries with Failure      │          0 │
└───────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃      HEAD ┃ perf-16717 ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │  98.04 ms │   97.12 ms │    no change │
│ QQuery 2     │  20.72 ms │   20.25 ms │    no change │
│ QQuery 3     │  32.68 ms │   32.36 ms │    no change │
│ QQuery 4     │  18.30 ms │   18.48 ms │    no change │
│ QQuery 5     │  49.12 ms │   49.42 ms │    no change │
│ QQuery 6     │  11.95 ms │   11.85 ms │    no change │
│ QQuery 7     │  83.23 ms │   88.88 ms │ 1.07x slower │
│ QQuery 8     │  24.62 ms │   23.68 ms │    no change │
│ QQuery 9     │  53.04 ms │   51.93 ms │    no change │
│ QQuery 10    │  43.45 ms │   43.20 ms │    no change │
│ QQuery 11    │  11.36 ms │   11.16 ms │    no change │
│ QQuery 12    │  34.88 ms │   35.35 ms │    no change │
│ QQuery 13    │  26.44 ms │   26.33 ms │    no change │
│ QQuery 14    │   9.81 ms │    9.66 ms │    no change │
│ QQuery 15    │  19.18 ms │   18.98 ms │    no change │
│ QQuery 16    │  17.64 ms │   18.20 ms │    no change │
│ QQuery 17    │  95.45 ms │   96.67 ms │    no change │
│ QQuery 18    │ 184.57 ms │  188.09 ms │    no change │
│ QQuery 19    │  25.16 ms │   25.03 ms │    no change │
│ QQuery 20    │  32.18 ms │   32.05 ms │    no change │
│ QQuery 21    │ 144.18 ms │  145.49 ms │    no change │
│ QQuery 22    │  14.38 ms │   14.61 ms │    no change │
└──────────────┴───────────┴────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary         ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)         │ 1050.39ms │
│ Total Time (perf-16717)   │ 1058.79ms │
│ Average Time (HEAD)       │   47.75ms │
│ Average Time (perf-16717) │   48.13ms │
│ Queries Faster            │         0 │
│ Queries Slower            │         1 │
│ Queries with No Change    │        21 │
│ Queries with Failure      │         0 │
└───────────────────────────┴───────────┘

@zhuqi-lucas
Copy link
Contributor

Thank you @alamb , no regression from above benchmark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Better parallelize large input batches (speed up dataframe access)
4 participants