Skip to content

[server][dvc] change the client side transfer timeout configurable and close channel once timeout. #1805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

jingy-li
Copy link
Contributor

@jingy-li jingy-li commented May 15, 2025

Problem Statement

When onboarding the blob transfer bootstrap feature to a large store (e.g., 10GB per partition, 120GB per host), the transfer time is so long that it triggers a client-side timeout exception. Upon reaching the timeout, a partition cleanup is performed before moving to the next host.

However, during the cleanup process, the channels are not closed, and Netty continues receiving transferred files. If files are being cleaned up while validation is happening, checksum failures occur, resulting in checksum errors. These failures trigger the exceptionCaught method, which eventually leads to the channel being closed.

As a result, incomplete cleanups occur—some files are deleted, but others that are still being transferred or created after the cleanup begins remain. This race condition arises because file transfers and cleanups are happening concurrently.

Ultimately, even if the blob transfer fails and the bootstrap falls back to Kafka ingestion, the incomplete cleanup leads to database corruption due to residual files.

Solution

  1. Allow the client-side timeout to be configurable. Previously, we only have server-side timeout config.
  2. Close the channel upon timeout to prevent continued file reception.
  3. Reduce the server-side active user count when the channel becomes inactive. This prevents the server from maintaining an active connection count if the connection is unexpectedly interrupted.

Code changes

  • Added new code behind a config. If so list the config names and their default values in the PR description.
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

  1. Verify the reduction of the server-side active user count through integration tests and unit test
  2. The timeout should be tested at the host E2E level, as integration tests complete too quickly (in seconds) to reach a timeout set at the minute level.
  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

@jingy-li jingy-li requested review from sixpluszero and gaojieliu May 15, 2025 21:15
* @param topicName the topic name
* @param partitionId the partition id
* @throws VeniceException if the concurrent user count exceeds the limit
*/
private void checkIfConcurrentUserExceedsLimit(String topicName, int partitionId) throws VeniceException {
boolean exceededMaxConcurrentUsers = getConcurrentSnapshotUsers(topicName, partitionId) >= maxConcurrentUsers;
// get the current host level served request count which is all topics and partitions
int totalTopicsPartitionsRequestCount = concurrentSnapshotUsers.values()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the global counter check be done in P2PFileTransferServerHandler?
Every request will go through this handler, right? And we can check the global counter here and it means:

  1. Check the counter to see whether it hits the limit or not.
  2. If not, increase the global counter.
  3. Do the data transfer.
  4. In finally scope, decrease the global counter.

So that the counter maintenance will happen within a function and it is easy to make it atomic and easier to reason.
We can continue to use the topic/partition tracking to check whether the snapshot is active or not.

}
}, REQUEST_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES);
}, blobReceiveTimeoutInMin, TimeUnit.MINUTES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For connection establishment, we should certainly have a timeout, but for the blob transfer, shall we just use the server timeout instead of both server/client timeout since the misconfiguration can cause weird issues?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine to only enforce the client-side timeout and when client timeout happens, it will close the channel and server will receive the exception during write, and it can close the channel as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants