Skip to content

[SPARK-52228][SS][PYSPARK] Construct the benchmark purposed TWS state server with in-memory state impls and the benchmark code in python #50952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented May 20, 2025

What changes were proposed in this pull request?

This PR proposes to introduce the benchmark tool which can perform performance test with state interactions between TWS state server and Python worker.

Since it requires two processes (JVM and Python) with socket connection between the two, we are not going to follow the benchmark suites we have in SQL module as of now. We leave the tool to run manually. It'd be ideal if we can make this to be standardized with existing benchmark suites as well as running automatically, but this is not an immediate goal.

Why are the changes needed?

It has been very painful to run the benchmark and look into the performance of state interactions. It required adding debug logs and running E2E queries, which is really so much work just to see the numbers.

For example, after this benchmark tool has introduced, we can verify the upcoming improvements w.r.t. state interactions - for example, we still have spots to use Arrow in state interactions, and I think this tool can show the perf benefit for the upcoming fix.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manually tested.

TWS Python state server

  • build Spark repo via ./dev/make-distribution.sh
  • cd dist
  • java -classpath "./jars/*" --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED org.apache.spark.sql.execution.python.streaming.benchmark.BenchmarkTransformWithStateInPySparkStateServer

Python process (benchmark code)

  • cd python
  • python3 pyspark/sql/streaming/benchmark/benchmark_tws_state_server.py <port that state server use> <state type> <params if required>

For Python process, it is required to install libraries PySpark required first (including numpy since it's used in the benchmark).

Result will be printed out like following (NOTE: I ran the same benchmark 3 times):
https://gist.github.com/HeartSaVioR/fa4805af4d7a4dc9789c8e3437506be1

Was this patch authored or co-authored using generative AI tooling?

No.

… server with in-memory state impls and the benchmark code in python
@HeartSaVioR
Copy link
Contributor Author

I'm open for suggestion w.r.t package/module path, better instruction, idea of standardization, etc. Thanks!

@HeartSaVioR
Copy link
Contributor Author

@bogao007 @jingz-db Please take a look. Thanks!

Copy link
Contributor

@bogao007 bogao007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for adding this benchmark tool!

Copy link
Contributor

@jingz-db jingz-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impressive and good work on the benchmarks! I am assuming we could derive some raw networking latency numbers from this benchmark as well.

@anishshri-db
Copy link
Contributor

@HeartSaVioR - can we add the raw file output in the PR as well ? similar to the state store microbenchmark result txt files ?

@HeartSaVioR
Copy link
Contributor Author

@anishshri-db

@HeartSaVioR - can we add the raw file output in the PR as well ? similar to the state store microbenchmark result txt files ?

Actually, this benchmark does not follow the existing benchmark infra - this is probably the only one which makes python process to lead the benchmark, which makes the automation to be non-trivial. I'd love to defer this to sometimes later.

@HeartSaVioR
Copy link
Contributor Author

cc. @HyukjinKwon Please take a look, thanks!

@HeartSaVioR
Copy link
Contributor Author

Thanks everyone for reviewing! Merging to master.

HeartSaVioR added a commit that referenced this pull request May 29, 2025
…or transformWithState in PySpark

### What changes were proposed in this pull request?

This PR proposes to squeeze the protocol of retrieving timers for transformWithState in PySpark, which will help a lot on dealing with not-to-be-huge number of timers.

Here are the changes:

* StatefulProcessorHandleImpl.listTimers(), StatefulProcessorHandleImpl.getExpiredTimers() no longer requires additional request to notice there is no further data to read.
  * We inline the data into proto message, to ease of determine whether the iterator has fully consumed or not.

This change is the same mechanism we applied for ListState & MapState. We got performance improvement in the prior case, and we also see this change to be helpful on our internal benchmark.

### Why are the changes needed?

To optimize further on some timer operations.

We benchmarked the change with listing 100 timers (PR for benchmarking: #50952), and we saw overall performance improvements.

> Before the fix

```

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
78.250		141.583		184.375		635.792		962743.500
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
65.375		126.125		162.792		565.833		60809.333
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
68.500		130.000		170.292		610.083		156733.125
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
486.833		714.961		998.625		2695.417		167039.959

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
77.916		139.000		182.375		671.792		521809.958
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
65.000		124.333		160.875		596.667		30860.208
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
67.125		127.916		170.250		740.051		64404.416
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
482.041		710.333		1050.333		2685.500		76762.583

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
78.208		139.959		181.459		722.459		713788.250
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
65.209		125.125		159.625		636.666		27963.167
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
67.417		129.000		168.875		764.602		12991.667
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
479.000		709.584		1045.543		2776.541		92247.542
```

> After the fix

```
 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
31.250		47.250		75.875		150.000		551557.750
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
26.958		39.208		65.208		122.667		78609.292
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
23.500		41.125		64.542		125.958		52641.042
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
93.125		118.542		156.500		284.625		19910.000

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
30.875		44.083		70.417		128.875		628912.209
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
26.917		36.416		61.292		109.917		164584.666
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
23.333		38.375		59.542		113.839		114350.250
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
94.125		115.208		148.917		246.292		36924.292

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
31.375		58.375		93.041		243.750		719545.583
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
26.959		50.167		81.833		194.375		67609.583
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
24.208		50.834		83.000		211.018		20611.959
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
95.291		132.375		183.875		427.584		36971.792
```

Worth noting that it is not only impacting the LIST operation - it also impacts other operations as well. It's not clear why it happens, but the direction of reducing round-trips is proven to be the right direction.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UT.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #51036 from HeartSaVioR/SPARK-52333.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants