-
Notifications
You must be signed in to change notification settings - Fork 97
[server][dvc] change the client side transfer timeout configurable and close channel once timeout. #1805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[server][dvc] change the client side transfer timeout configurable and close channel once timeout. #1805
Conversation
…d close channel once timeout.
…hen the channel becomes inactive.
* @param topicName the topic name | ||
* @param partitionId the partition id | ||
* @throws VeniceException if the concurrent user count exceeds the limit | ||
*/ | ||
private void checkIfConcurrentUserExceedsLimit(String topicName, int partitionId) throws VeniceException { | ||
boolean exceededMaxConcurrentUsers = getConcurrentSnapshotUsers(topicName, partitionId) >= maxConcurrentUsers; | ||
// get the current host level served request count which is all topics and partitions | ||
int totalTopicsPartitionsRequestCount = concurrentSnapshotUsers.values() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the global counter check be done in P2PFileTransferServerHandler
?
Every request will go through this handler, right? And we can check the global counter here and it means:
- Check the counter to see whether it hits the limit or not.
- If not, increase the global counter.
- Do the data transfer.
- In
finally
scope, decrease the global counter.
So that the counter maintenance will happen within a function and it is easy to make it atomic and easier to reason.
We can continue to use the topic/partition tracking to check whether the snapshot is active or not.
} | ||
}, REQUEST_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES); | ||
}, blobReceiveTimeoutInMin, TimeUnit.MINUTES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For connection establishment, we should certainly have a timeout, but for the blob transfer, shall we just use the server timeout instead of both server/client timeout since the misconfiguration can cause weird issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is fine to only enforce the client-side timeout and when client timeout happens, it will close the channel and server will receive the exception during write, and it can close the channel as well.
Problem Statement
When onboarding the blob transfer bootstrap feature to a large store (e.g., 10GB per partition, 120GB per host), the transfer time is so long that it triggers a client-side timeout exception. Upon reaching the timeout, a partition cleanup is performed before moving to the next host.
However, during the cleanup process, the channels are not closed, and Netty continues receiving transferred files. If files are being cleaned up while validation is happening, checksum failures occur, resulting in checksum errors. These failures trigger the exceptionCaught method, which eventually leads to the channel being closed.
As a result, incomplete cleanups occur—some files are deleted, but others that are still being transferred or created after the cleanup begins remain. This race condition arises because file transfers and cleanups are happening concurrently.
Ultimately, even if the blob transfer fails and the bootstrap falls back to Kafka ingestion, the incomplete cleanup leads to database corruption due to residual files.
Solution
Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized
,RWLock
) are used where needed.ConcurrentHashMap
,CopyOnWriteArrayList
).How was this PR tested?
Does this PR introduce any user-facing or breaking changes?