-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52673][CONNECT][CLIENT] Add grpc RetryInfo handling to Spark Connect retry policies #51363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
HyukjinKwon
reviewed
Jul 14, 2025
|
||
import unittest | ||
|
||
import google.protobuf.any_pb2 as any_pb2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we import this under if should_test_connect
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure!
HyukjinKwon
approved these changes
Jul 14, 2025
Merged to master. |
haoyangeng-db
pushed a commit
to haoyangeng-db/apache-spark
that referenced
this pull request
Jul 22, 2025
…onnect retry policies ### What changes were proposed in this pull request? Spark Connect Client has a set of retry policies that specify which errors coming from the Server can be retried. This change adds the capability for the Spark Connect Client to use server-provided retry information according to the grpc standards: https://github.com/googleapis/googleapis/blob/master/google/rpc/error_details.proto#L91 The server can include `RetryInfo` gRPC message containing `retry_delay` field in its error response. The Client will now use `RetryInfo` message to classify the error as retriable and will use `retry_delay` to calculate the next time to wait. This behavior is in line with the gRPC standard for client-server communication. The change is needed for two reasons: 1) If the Server is under heavy load or a task takes more time, it can tell the client to wait longer using the `retry_delay` field. 2) If the Server needs to introduce a new retryable error, it can simply include `RetryInfo` in the error message. The error message will be retried automatically by the client. No changes to the client-side retry policies are needed to retry the new error. #### Changes in detail - Adding new `recognize_server_retry_delay` and `max_server_retry_delay` options for `RetryPolicy` classes in Python and Scala clients. - All policies with `recognize_server_retry_delay=True` will take `RetryInfo.retry_delay` into account when calculating the next backoff. - `retry_delay` can override client's `max_backoff` - `retry_delay` is limited by `max_server_retry_delay` (10 minutes by default). - When the server stops sending high retry_delays, the client goes back to using its own backoff policy limited by `max_backoff`. - `DefaultPolicy` has `recognize_server_retry_delay=True` and will use `retry_delay` in the backoff calculation. - Additionally, DefaultPolicy will classify all errors with `RetryInfo` as retryable. - If an error message can be retried by several policies, only retry it with the first one (highest prio) and then stop. This change is needed because `DefaultPolicy` now retries all errors with `RetryInfo`. If we keep the existing behaviour, an error that is both has the `RetryInfo` and is matched by a different `CustomPolicy`, would be retried both by the `DefaultPolicy` and by the `CustomPolicy`. This can lead to excessively long retry periods and complicates the planning of total retry times. - Moving retry policy related tests from `test_client.py` to a new `test_client_retries.py` file. Same for scala. - Extending docstrings. ### Why are the changes needed? See above ### Does this PR introduce _any_ user-facing change? 1. The clients retry all errors with `RetryInfo` grpc message using the DefaultPolicy. 2. The error is only retried by the first policy that matches it. ### How was this patch tested? Old and new unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51363 from khakhlyuk/retryinfo. Authored-by: Alex Khakhlyuk <alex.khakhlyuk@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Spark Connect Client has a set of retry policies that specify which errors coming from the Server can be retried.
This change adds the capability for the Spark Connect Client to use server-provided retry information according to the grpc standards: https://github.com/googleapis/googleapis/blob/master/google/rpc/error_details.proto#L91
The server can include
RetryInfo
gRPC message containingretry_delay
field in its error response. The Client will now useRetryInfo
message to classify the error as retriable and will useretry_delay
to calculate the next time to wait. This behavior is in line with the gRPC standard for client-server communication.The change is needed for two reasons:
retry_delay
field.RetryInfo
in the error message. The error message will be retried automatically by the client. No changes to the client-side retry policies are needed to retry the new error.Changes in detail
recognize_server_retry_delay
andmax_server_retry_delay
options forRetryPolicy
classes in Python and Scala clients.recognize_server_retry_delay=True
will takeRetryInfo.retry_delay
into account when calculating the next backoff.retry_delay
can override client'smax_backoff
retry_delay
is limited bymax_server_retry_delay
(10 minutes by default).max_backoff
.DefaultPolicy
hasrecognize_server_retry_delay=True
and will useretry_delay
in the backoff calculation.RetryInfo
as retryable.DefaultPolicy
now retries all errors withRetryInfo
. If we keep the existing behaviour, an error that is both has theRetryInfo
and is matched by a differentCustomPolicy
, would be retried both by theDefaultPolicy
and by theCustomPolicy
. This can lead to excessively long retry periods and complicates the planning of total retry times.test_client.py
to a newtest_client_retries.py
file. Same for scala.Why are the changes needed?
See above
Does this PR introduce any user-facing change?
RetryInfo
grpc message using the DefaultPolicy.How was this patch tested?
Old and new unit tests.
Was this patch authored or co-authored using generative AI tooling?
No