Skip to content

[server][dvc] change the client side transfer timeout configurable and close channel once timeout. #1805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 21, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class BlobSnapshotManager {
private static final InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer =
AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer();
private final static int DEFAULT_SNAPSHOT_RETENTION_TIME_IN_MIN = 30;
public final static int DEFAULT_MAX_CONCURRENT_USERS = 5;
public final static int DEFAULT_SNAPSHOT_CLEANUP_INTERVAL_IN_MINS = 120;

// A map to keep track of the number of hosts using a snapshot for a particular topic and partition, use to restrict
Expand All @@ -60,7 +59,6 @@ public class BlobSnapshotManager {

private final StorageEngineRepository storageEngineRepository;
private final StorageMetadataService storageMetadataService;
private final int maxConcurrentUsers;
private final long snapshotRetentionTimeInMillis;
private final int snapshotCleanupIntervalInMins;
private final BlobTransferUtils.BlobTransferTableFormat blobTransferTableFormat;
Expand All @@ -72,13 +70,11 @@ public class BlobSnapshotManager {
public BlobSnapshotManager(
StorageEngineRepository storageEngineRepository,
StorageMetadataService storageMetadataService,
int maxConcurrentUsers,
int snapshotRetentionTimeInMin,
BlobTransferUtils.BlobTransferTableFormat transferTableFormat,
int snapshotCleanupIntervalInMins) {
this.storageEngineRepository = storageEngineRepository;
this.storageMetadataService = storageMetadataService;
this.maxConcurrentUsers = maxConcurrentUsers;
this.snapshotRetentionTimeInMillis = TimeUnit.MINUTES.toMillis(snapshotRetentionTimeInMin);
this.blobTransferTableFormat = transferTableFormat;
this.snapshotCleanupIntervalInMins = snapshotCleanupIntervalInMins;
Expand Down Expand Up @@ -106,20 +102,18 @@ public BlobSnapshotManager(
this(
storageEngineRepository,
storageMetadataService,
DEFAULT_MAX_CONCURRENT_USERS,
DEFAULT_SNAPSHOT_RETENTION_TIME_IN_MIN,
BlobTransferUtils.BlobTransferTableFormat.BLOCK_BASED_TABLE,
DEFAULT_SNAPSHOT_CLEANUP_INTERVAL_IN_MINS);
}

/**
* Get the transfer metadata for a particular payload
* 1. throttle the request if many concurrent users.
* 2. check snapshot staleness
* 2.1. if stale:
* 2.1.1. if it does not have active users: recreate the snapshot and metadata, then return the metadata
* 2.1.2. if it has active users: no need to recreate the snapshot, throw an exception to let the client move to next candidate.
* 2.2. if not stale, directly return the metadata
* 1. check snapshot staleness
* 1.1. if stale:
* 1.1.1. if it does not have active users: recreate the snapshot and metadata, then return the metadata
* 1.1.2. if it has active users: no need to recreate the snapshot, throw an exception to let the client move to next candidate.
* 1.2. if not stale, directly return the metadata
*
* @param payload the blob transfer payload
* @param successCountedAsActiveCurrentUser Indicates whether this request has been successfully counted as an active user.
Expand All @@ -146,17 +140,14 @@ public BlobTransferPartitionMetadata getTransferMetadata(

ReentrantLock lock = getSnapshotLock(topicName, partitionId);
try (AutoCloseableLock ignored = AutoCloseableLock.of(lock)) {
// 1. check if the concurrent user count exceeds the limit
checkIfConcurrentUserExceedsLimit(topicName, partitionId);

initializeTrackingValues(topicName, partitionId);

boolean havingActiveUsers = getConcurrentSnapshotUsers(topicName, partitionId) > 0;
boolean isSnapshotStale = isSnapshotStale(topicName, partitionId);
increaseConcurrentUserCount(topicName, partitionId);
successCountedAsActiveCurrentUser.set(true);

// 2. Check if the snapshot is stale and needs to be recreated.
// 1. Check if the snapshot is stale and needs to be recreated.
// If the snapshot is stale and there are active users, throw an exception to exit early, allowing the client to
// try the next available peer.
// Even if creating a snapshot is fast, the stale snapshot may still be in use and transferring data for a
Expand Down Expand Up @@ -210,25 +201,6 @@ private void recreateSnapshotAndMetadata(BlobTransferPayload blobTransferRequest
}
}

/**
* Check if the concurrent user count exceeds the limit
* @param topicName the topic name
* @param partitionId the partition id
* @throws VeniceException if the concurrent user count exceeds the limit
*/
private void checkIfConcurrentUserExceedsLimit(String topicName, int partitionId) throws VeniceException {
boolean exceededMaxConcurrentUsers = getConcurrentSnapshotUsers(topicName, partitionId) >= maxConcurrentUsers;
if (exceededMaxConcurrentUsers) {
String errorMessage = String.format(
"Exceeded the maximum number of concurrent users %d for topic %s partition %d",
maxConcurrentUsers,
topicName,
partitionId);
LOGGER.error(errorMessage);
throw new VeniceException(errorMessage);
}
}

/**
* Check if the snapshot is stale
* @param topicName the topic name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public BlobTransferManager<Void> build() {
BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager(
storageEngineRepository,
storageMetadataService,
blobTransferConfig.getMaxConcurrentSnapshotUser(),
blobTransferConfig.getSnapshotRetentionTimeInMin(),
blobTransferConfig.getTransferSnapshotTableFormat(),
blobTransferConfig.getSnapshotCleanupIntervalInMins());
Expand All @@ -124,7 +123,8 @@ public BlobTransferManager<Void> build() {
blobSnapshotManager,
globalTrafficHandler,
sslFactory,
aclHandler),
aclHandler,
blobTransferConfig.getMaxConcurrentSnapshotUser()),
new NettyFileTransferClient(
blobTransferConfig.getP2pTransferClientPort(),
blobTransferConfig.getBaseDir(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class NettyFileTransferClient {
private static final Logger LOGGER = LogManager.getLogger(NettyFileTransferClient.class);
private static final int MAX_METADATA_CONTENT_LENGTH = 1024 * 1024 * 100;
private static final int REQUEST_TIMEOUT_IN_MINUTES = 5;
private static final int CONNECTION_TIMEOUT_IN_MINUTES = 1;
// Maximum time that Netty will wait to establish the initial connection before failing.
private static final int CONNECTION_ESTABLISHMENT_TIMEOUT_MS = 30 * 1000;
Expand Down Expand Up @@ -285,16 +283,6 @@ public CompletionStage<InputStream> get(
requestedTableFormat));
// Send a GET request
ch.writeAndFlush(prepareRequest(storeName, version, partition, requestedTableFormat));
// Set a timeout, otherwise if the host is not responding, the future will never complete
connectTimeoutScheduler.schedule(() -> {
if (!inputStream.toCompletableFuture().isDone()) {
inputStream.toCompletableFuture()
.completeExceptionally(
new TimeoutException(
"Request timed out for store " + storeName + " version " + version + " partition " + partition
+ " table format " + requestedTableFormat + " from host " + host));
}
}, REQUEST_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES);
} catch (Exception e) {
if (!inputStream.toCompletableFuture().isCompletedExceptionally()) {
inputStream.toCompletableFuture().completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,29 @@


public class BlobTransferNettyChannelInitializer extends ChannelInitializer<SocketChannel> {
private final String baseDir;
private final int blobTransferMaxTimeoutInMin;
private BlobSnapshotManager blobSnapshotManager;
private Optional<SSLFactory> sslFactory;
private Optional<BlobTransferAclHandler> aclHandler;

private final GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler;
private final VerifySslHandler verifySsl = new VerifySslHandler();
private final P2PFileTransferServerHandler p2pFileTransferServerHandler;

public BlobTransferNettyChannelInitializer(
String baseDir,
int blobTransferMaxTimeoutInMin,
BlobSnapshotManager blobSnapshotManager,
GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler,
Optional<SSLFactory> sslFactory,
Optional<BlobTransferAclHandler> aclHandler) {
this.baseDir = baseDir;
this.blobTransferMaxTimeoutInMin = blobTransferMaxTimeoutInMin;
this.blobSnapshotManager = blobSnapshotManager;
Optional<BlobTransferAclHandler> aclHandler,
int maxAllowedConcurrentSnapshotUsers) {
this.globalChannelTrafficShapingHandler = globalChannelTrafficShapingHandler;
this.sslFactory = sslFactory;
this.aclHandler = aclHandler;
this.p2pFileTransferServerHandler = new P2PFileTransferServerHandler(
baseDir,
blobTransferMaxTimeoutInMin,
blobSnapshotManager,
maxAllowedConcurrentSnapshotUsers);
}

@Override
Expand All @@ -65,8 +66,6 @@ protected void initChannel(SocketChannel ch) throws Exception {
// for safe writing of chunks for responses
.addLast("chunker", new ChunkedWriteHandler())
// for handling p2p file transfer
.addLast(
"p2pFileTransferHandler",
new P2PFileTransferServerHandler(baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager));
.addLast("p2pFileTransferHandler", p2pFileTransferServerHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public P2PBlobTransferService(
BlobSnapshotManager blobSnapshotManager,
GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler,
Optional<SSLFactory> sslFactory,
Optional<BlobTransferAclHandler> aclHandler) {
Optional<BlobTransferAclHandler> aclHandler,
int maxAllowedConcurrentSnapshotUsers) {
this.port = port;
this.serverBootstrap = new ServerBootstrap();
this.blobSnapshotManager = blobSnapshotManager;
Expand All @@ -66,7 +67,8 @@ public P2PBlobTransferService(
blobSnapshotManager,
globalChannelTrafficShapingHandler,
sslFactory,
aclHandler))
aclHandler,
maxAllowedConcurrentSnapshotUsers))
.option(ChannelOption.SO_BACKLOG, 1000)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
Expand Down
Loading
Loading