-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-52228][SS][PYSPARK] Construct the benchmark purposed TWS state server with in-memory state impls and the benchmark code in python #50952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… server with in-memory state impls and the benchmark code in python
I'm open for suggestion w.r.t package/module path, better instruction, idea of standardization, etc. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for adding this benchmark tool!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Impressive and good work on the benchmarks! I am assuming we could derive some raw networking latency numbers from this benchmark as well.
@HeartSaVioR - can we add the raw file output in the PR as well ? similar to the state store microbenchmark result txt files ? |
Actually, this benchmark does not follow the existing benchmark infra - this is probably the only one which makes python process to lead the benchmark, which makes the automation to be non-trivial. I'd love to defer this to sometimes later. |
cc. @HyukjinKwon Please take a look, thanks! |
Thanks everyone for reviewing! Merging to master. |
…or transformWithState in PySpark ### What changes were proposed in this pull request? This PR proposes to squeeze the protocol of retrieving timers for transformWithState in PySpark, which will help a lot on dealing with not-to-be-huge number of timers. Here are the changes: * StatefulProcessorHandleImpl.listTimers(), StatefulProcessorHandleImpl.getExpiredTimers() no longer requires additional request to notice there is no further data to read. * We inline the data into proto message, to ease of determine whether the iterator has fully consumed or not. This change is the same mechanism we applied for ListState & MapState. We got performance improvement in the prior case, and we also see this change to be helpful on our internal benchmark. ### Why are the changes needed? To optimize further on some timer operations. We benchmarked the change with listing 100 timers (PR for benchmarking: #50952), and we saw overall performance improvements. > Before the fix ``` ==================== SET IMPLICIT KEY latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 78.250 141.583 184.375 635.792 962743.500 ==================== REGISTER latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 65.375 126.125 162.792 565.833 60809.333 ==================== DELETE latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 68.500 130.000 170.292 610.083 156733.125 ==================== LIST latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 486.833 714.961 998.625 2695.417 167039.959 ==================== SET IMPLICIT KEY latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 77.916 139.000 182.375 671.792 521809.958 ==================== REGISTER latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 65.000 124.333 160.875 596.667 30860.208 ==================== DELETE latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 67.125 127.916 170.250 740.051 64404.416 ==================== LIST latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 482.041 710.333 1050.333 2685.500 76762.583 ==================== SET IMPLICIT KEY latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 78.208 139.959 181.459 722.459 713788.250 ==================== REGISTER latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 65.209 125.125 159.625 636.666 27963.167 ==================== DELETE latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 67.417 129.000 168.875 764.602 12991.667 ==================== LIST latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 479.000 709.584 1045.543 2776.541 92247.542 ``` > After the fix ``` ==================== SET IMPLICIT KEY latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 31.250 47.250 75.875 150.000 551557.750 ==================== REGISTER latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 26.958 39.208 65.208 122.667 78609.292 ==================== DELETE latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 23.500 41.125 64.542 125.958 52641.042 ==================== LIST latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 93.125 118.542 156.500 284.625 19910.000 ==================== SET IMPLICIT KEY latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 30.875 44.083 70.417 128.875 628912.209 ==================== REGISTER latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 26.917 36.416 61.292 109.917 164584.666 ==================== DELETE latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 23.333 38.375 59.542 113.839 114350.250 ==================== LIST latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 94.125 115.208 148.917 246.292 36924.292 ==================== SET IMPLICIT KEY latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 31.375 58.375 93.041 243.750 719545.583 ==================== REGISTER latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 26.959 50.167 81.833 194.375 67609.583 ==================== DELETE latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 24.208 50.834 83.000 211.018 20611.959 ==================== LIST latency (micros) ====================== perc:50 perc:95 perc:99 perc:99.9 perc:100 95.291 132.375 183.875 427.584 36971.792 ``` Worth noting that it is not only impacting the LIST operation - it also impacts other operations as well. It's not clear why it happens, but the direction of reducing round-trips is proven to be the right direction. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51036 from HeartSaVioR/SPARK-52333. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
This PR proposes to introduce the benchmark tool which can perform performance test with state interactions between TWS state server and Python worker.
Since it requires two processes (JVM and Python) with socket connection between the two, we are not going to follow the benchmark suites we have in SQL module as of now. We leave the tool to run manually. It'd be ideal if we can make this to be standardized with existing benchmark suites as well as running automatically, but this is not an immediate goal.
Why are the changes needed?
It has been very painful to run the benchmark and look into the performance of state interactions. It required adding debug logs and running E2E queries, which is really so much work just to see the numbers.
For example, after this benchmark tool has introduced, we can verify the upcoming improvements w.r.t. state interactions - for example, we still have spots to use Arrow in state interactions, and I think this tool can show the perf benefit for the upcoming fix.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Manually tested.
./dev/make-distribution.sh
cd dist
java -classpath "./jars/*" --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED org.apache.spark.sql.execution.python.streaming.benchmark.BenchmarkTransformWithStateInPySparkStateServer
cd python
python3 pyspark/sql/streaming/benchmark/benchmark_tws_state_server.py <port that state server use> <state type> <params if required>
For Python process, it is required to install libraries PySpark required first (including numpy since it's used in the benchmark).
Result will be printed out like following (NOTE: I ran the same benchmark 3 times):
https://gist.github.com/HeartSaVioR/fa4805af4d7a4dc9789c8e3437506be1
Was this patch authored or co-authored using generative AI tooling?
No.