Skip to content

[server][dvc] change the client side transfer timeout configurable and close channel once timeout. #1805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 21, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ public DaVinciBackend(
backendConfig.getMaxConcurrentSnapshotUser(),
backendConfig.getSnapshotRetentionTimeInMin(),
backendConfig.getBlobTransferMaxTimeoutInMin(),
backendConfig.getBlobReceiveMaxTimeoutInMin(),
backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled()
? BlobTransferTableFormat.PLAIN_TABLE
: BlobTransferTableFormat.BLOCK_BASED_TABLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class BlobSnapshotManager {
private static final InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer =
AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer();
private final static int DEFAULT_SNAPSHOT_RETENTION_TIME_IN_MIN = 30;
public final static int DEFAULT_MAX_CONCURRENT_USERS = 5;
public final static int DEFAULT_MAX_CONCURRENT_USERS = 15;
public final static int DEFAULT_SNAPSHOT_CLEANUP_INTERVAL_IN_MINS = 120;

// A map to keep track of the number of hosts using a snapshot for a particular topic and partition, use to restrict
Expand Down Expand Up @@ -212,15 +212,32 @@ private void recreateSnapshotAndMetadata(BlobTransferPayload blobTransferRequest

/**
* Check if the concurrent user count exceeds the limit
* The map concurrentSnapshotUsers is a per topic and partition map, so we need to sum up to get the total count that the server handler is currently served.
*
* Note:
* Due to the lock is at per partition per topic level, but the check limitation is globally via concurrentSnapshotUsers.
* Then there may be race condition due to "check-then-increment" window:
* 1. Thread A and thread B both check the concurrentSnapshotUsers count and amount = maxConcurrentUsers - 1, at the same time
* 2. Thread A increment the concurrentSnapshotUsers count, then B increment the concurrentSnapshotUsers count, and both of them are allowed to proceed.
* 3. But actually, the concurrentSnapshotUsers count is maxConcurrentUsers + 1
* In rare cases, actual concurrent users may temporarily exceed maxConcurrentUsers, but only by a small margin.
*
* @param topicName the topic name
* @param partitionId the partition id
* @throws VeniceException if the concurrent user count exceeds the limit
*/
private void checkIfConcurrentUserExceedsLimit(String topicName, int partitionId) throws VeniceException {
boolean exceededMaxConcurrentUsers = getConcurrentSnapshotUsers(topicName, partitionId) >= maxConcurrentUsers;
// get the current host level served request count which is all topics and partitions
int totalTopicsPartitionsRequestCount = concurrentSnapshotUsers.values()
.stream()
.flatMap(innerMap -> innerMap.values().stream())
.mapToInt(AtomicInteger::get)
.sum();

boolean exceededMaxConcurrentUsers = totalTopicsPartitionsRequestCount >= maxConcurrentUsers;
if (exceededMaxConcurrentUsers) {
String errorMessage = String.format(
"Exceeded the maximum number of concurrent users %d for topic %s partition %d",
"Exceeded the maximum number of concurrent users %d, request for topic %s partition %d can not be served anymore",
maxConcurrentUsers,
topicName,
partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public BlobTransferManager<Void> build() {
blobTransferConfig.getBaseDir(),
storageMetadataService,
blobTransferConfig.getPeersConnectivityFreshnessInSeconds(),
blobTransferConfig.getBlobReceiveTimeoutInMin(),
globalTrafficHandler,
sslFactory),
blobFinder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class P2PBlobTransferConfig {
private final int snapshotRetentionTimeInMin;
// Max timeout for blob transfer in minutes in server side, to avoid endless sending files.
private final int blobTransferMaxTimeoutInMin;
// Max timeout for blob receive in minutes in client side, to avoid endless receiving files.
private final int blobReceiveMaxTimeoutInMin;
// Table format
private final BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat;
// Peers connectivity records freshness in seconds.
Expand All @@ -34,6 +36,7 @@ public P2PBlobTransferConfig(
int maxConcurrentSnapshotUser,
int snapshotRetentionTimeInMin,
int blobTransferMaxTimeoutInMin,
int blobReceiveMaxTimeoutInMin,
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat,
int peersConnectivityFreshnessInSeconds,
long blobTransferClientReadLimitBytesPerSec,
Expand All @@ -45,6 +48,7 @@ public P2PBlobTransferConfig(
this.maxConcurrentSnapshotUser = maxConcurrentSnapshotUser;
this.snapshotRetentionTimeInMin = snapshotRetentionTimeInMin;
this.blobTransferMaxTimeoutInMin = blobTransferMaxTimeoutInMin;
this.blobReceiveMaxTimeoutInMin = blobReceiveMaxTimeoutInMin;
this.transferSnapshotTableFormat = transferSnapshotTableFormat;
this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds;
this.blobTransferClientReadLimitBytesPerSec = blobTransferClientReadLimitBytesPerSec;
Expand Down Expand Up @@ -76,6 +80,10 @@ public int getBlobTransferMaxTimeoutInMin() {
return blobTransferMaxTimeoutInMin;
}

public int getBlobReceiveTimeoutInMin() {
return blobReceiveMaxTimeoutInMin;
}

public BlobTransferUtils.BlobTransferTableFormat getTransferSnapshotTableFormat() {
return transferSnapshotTableFormat;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class NettyFileTransferClient {
private final String baseDir;
private final int serverPort;
private final int peersConnectivityFreshnessInSeconds; // the freshness of the peers connectivity records
private final int blobReceiveTimeoutInMin; // the timeout for blob receive in minutes in client side
private StorageMetadataService storageMetadataService;
private final ExecutorService hostConnectExecutorService;
private final ScheduledExecutorService connectTimeoutScheduler;
Expand All @@ -73,12 +74,14 @@ public NettyFileTransferClient(
String baseDir,
StorageMetadataService storageMetadataService,
int peersConnectivityFreshnessInSeconds,
int blobReceiveTimeoutInMin,
GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler,
Optional<SSLFactory> sslFactory) {
this.baseDir = baseDir;
this.serverPort = serverPort;
this.storageMetadataService = storageMetadataService;
this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds;
this.blobReceiveTimeoutInMin = blobReceiveTimeoutInMin;

clientBootstrap = new Bootstrap();
workerGroup = new NioEventLoopGroup();
Expand Down Expand Up @@ -292,9 +295,12 @@ public CompletionStage<InputStream> get(
.completeExceptionally(
new TimeoutException(
"Request timed out for store " + storeName + " version " + version + " partition " + partition
+ " table format " + requestedTableFormat + " from host " + host));
+ " table format " + requestedTableFormat + " from host " + host + " in "
+ blobReceiveTimeoutInMin + " minutes."));
// Close the channel if the request times out
ch.close();
}
}, REQUEST_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES);
}, blobReceiveTimeoutInMin, TimeUnit.MINUTES);
} catch (Exception e) {
if (!inputStream.toCompletableFuture().isCompletedExceptionally()) {
inputStream.toCompletableFuture().completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
Expand All @@ -57,6 +58,10 @@ public class P2PFileTransferServerHandler extends SimpleChannelInboundHandler<Fu
// Maximum timeout for blob transfer in minutes per partition
private final int blobTransferMaxTimeoutInMin;
private BlobSnapshotManager blobSnapshotManager;
private static final AttributeKey<BlobTransferPayload> BLOB_TRANSFER_REQUEST =
AttributeKey.valueOf("blobTransferRequest");
private static final AttributeKey<AtomicBoolean> SUCCESS_COUNTED =
AttributeKey.valueOf("successCountedAsActiveCurrentUser");

public P2PFileTransferServerHandler(
String baseDir,
Expand Down Expand Up @@ -92,90 +97,107 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque
return;
}
BlobTransferPayload blobTransferRequest = null;
try {
final File snapshotDir;
BlobTransferPartitionMetadata transferPartitionMetadata;

try {
blobTransferRequest = parseBlobTransferPayload(URI.create(httpRequest.uri()));
snapshotDir = new File(blobTransferRequest.getSnapshotDir());

// Check the snapshot table format
BlobTransferTableFormat currentSnapshotTableFormat = blobSnapshotManager.getBlobTransferTableFormat();
if (blobTransferRequest.getRequestTableFormat() != currentSnapshotTableFormat) {
byte[] errBody = ("Table format mismatch for " + blobTransferRequest.getFullResourceName()
+ ", current snapshot format is " + currentSnapshotTableFormat.name() + ", requested format is "
+ blobTransferRequest.getRequestTableFormat().name()).getBytes();
setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx);
return;
}
final File snapshotDir;
BlobTransferPartitionMetadata transferPartitionMetadata;

try {
transferPartitionMetadata =
blobSnapshotManager.getTransferMetadata(blobTransferRequest, successCountedAsActiveCurrentUser);
} catch (Exception e) {
setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, e.getMessage().getBytes(), false, ctx);
return;
}
try {
blobTransferRequest = parseBlobTransferPayload(URI.create(httpRequest.uri()));
snapshotDir = new File(blobTransferRequest.getSnapshotDir());

if (!snapshotDir.exists() || !snapshotDir.isDirectory()) {
byte[] errBody = ("Snapshot for " + blobTransferRequest.getFullResourceName() + " doesn't exist").getBytes();
setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx);
return;
}
} catch (IllegalArgumentException e) {
setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, e.getMessage().getBytes(), false, ctx);
// Check the snapshot table format
BlobTransferTableFormat currentSnapshotTableFormat = blobSnapshotManager.getBlobTransferTableFormat();
if (blobTransferRequest.getRequestTableFormat() != currentSnapshotTableFormat) {
byte[] errBody = ("Table format mismatch for " + blobTransferRequest.getFullResourceName()
+ ", current snapshot format is " + currentSnapshotTableFormat.name() + ", requested format is "
+ blobTransferRequest.getRequestTableFormat().name()).getBytes();
setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx);
return;
} catch (SecurityException e) {
setupResponseAndFlush(HttpResponseStatus.FORBIDDEN, e.getMessage().getBytes(), false, ctx);
}

try {
transferPartitionMetadata =
blobSnapshotManager.getTransferMetadata(blobTransferRequest, successCountedAsActiveCurrentUser);
ctx.channel().attr(SUCCESS_COUNTED).set(successCountedAsActiveCurrentUser);
ctx.channel().attr(BLOB_TRANSFER_REQUEST).set(blobTransferRequest);
} catch (Exception e) {
setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, e.getMessage().getBytes(), false, ctx);
return;
}

File[] files = snapshotDir.listFiles();
if (files == null || files.length == 0) {
setupResponseAndFlush(
HttpResponseStatus.INTERNAL_SERVER_ERROR,
("Failed to access files at " + snapshotDir).getBytes(),
false,
ctx);
if (!snapshotDir.exists() || !snapshotDir.isDirectory()) {
byte[] errBody = ("Snapshot for " + blobTransferRequest.getFullResourceName() + " doesn't exist").getBytes();
setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx);
return;
}
} catch (IllegalArgumentException e) {
setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, e.getMessage().getBytes(), false, ctx);
return;
} catch (SecurityException e) {
setupResponseAndFlush(HttpResponseStatus.FORBIDDEN, e.getMessage().getBytes(), false, ctx);
return;
}

File[] files = snapshotDir.listFiles();
if (files == null || files.length == 0) {
setupResponseAndFlush(
HttpResponseStatus.INTERNAL_SERVER_ERROR,
("Failed to access files at " + snapshotDir).getBytes(),
false,
ctx);
return;
}

// Set up the time limitation for the transfer
long startTime = System.currentTimeMillis();
// Set up the time limitation for the transfer
long startTime = System.currentTimeMillis();

// transfer files
for (File file: files) {
// check if the transfer for all files is timed out for this partition
if (System.currentTimeMillis() - startTime >= TimeUnit.MINUTES.toMillis(blobTransferMaxTimeoutInMin)) {
String errMessage = String
.format(TRANSFER_TIMEOUT_ERROR_MSG_FORMAT, blobTransferRequest.getFullResourceName(), file.getName());
LOGGER.error(errMessage);
setupResponseAndFlush(HttpResponseStatus.REQUEST_TIMEOUT, errMessage.getBytes(), false, ctx);
return;
}
// send file
sendFile(file, ctx);
// transfer files
for (File file: files) {
// check if the transfer for all files is timed out for this partition
if (System.currentTimeMillis() - startTime >= TimeUnit.MINUTES.toMillis(blobTransferMaxTimeoutInMin)) {
String errMessage =
String.format(TRANSFER_TIMEOUT_ERROR_MSG_FORMAT, blobTransferRequest.getFullResourceName(), file.getName());
LOGGER.error(errMessage);
setupResponseAndFlush(HttpResponseStatus.REQUEST_TIMEOUT, errMessage.getBytes(), false, ctx);
return;
}
// send file
sendFile(file, ctx);
}

sendMetadata(ctx, transferPartitionMetadata);
sendMetadata(ctx, transferPartitionMetadata);

// end of transfer
HttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
endOfTransfer.headers().set(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED);
String fullResourceName = blobTransferRequest.getFullResourceName();
ctx.writeAndFlush(endOfTransfer).addListener(future -> {
if (future.isSuccess()) {
LOGGER.debug("All files sent successfully for {}", fullResourceName);
} else {
LOGGER.error("Failed to send all files for {}", fullResourceName, future.cause());
}
});
} finally {
if (blobTransferRequest != null && successCountedAsActiveCurrentUser.get()) {
// end of transfer
HttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
endOfTransfer.headers().set(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED);
String fullResourceName = blobTransferRequest.getFullResourceName();
ctx.writeAndFlush(endOfTransfer).addListener(future -> {
if (future.isSuccess()) {
LOGGER.debug("All files sent successfully for {}", fullResourceName);
} else {
LOGGER.error("Failed to send all files for {}", fullResourceName, future.cause());
}
});
}

/**
* This method is called when the channel is inactive. It is used to decrease the concurrent user count.
* Because the channel is inactive, we can assume that the transfer is complete.
* If we decrease the concurrent user at channelRead0, when the connection is break in half, we will not be able to decrease the count in server side
* @param ctx
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
AtomicBoolean successCountedAsActiveCurrentUser = ctx.channel().attr(SUCCESS_COUNTED).get();
BlobTransferPayload blobTransferRequest = ctx.channel().attr(BLOB_TRANSFER_REQUEST).get();
if (successCountedAsActiveCurrentUser != null && successCountedAsActiveCurrentUser.get()
&& blobTransferRequest != null) {
try {
blobSnapshotManager.decreaseConcurrentUserCount(blobTransferRequest);
} catch (Exception e) {
LOGGER.error("Failed to decrease the snapshot concurrent user count for request {}", blobTransferRequest, e);
}
}
ctx.fireChannelInactive();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.linkedin.venice.ConfigConstants.DEFAULT_MAX_RECORD_SIZE_BYTES_BACKFILL;
import static com.linkedin.venice.ConfigKeys.ACL_IN_MEMORY_CACHE_TTL_MS;
import static com.linkedin.venice.ConfigKeys.AUTOCREATE_DATA_PATH;
import static com.linkedin.venice.ConfigKeys.BLOB_RECEIVE_MAX_TIMEOUT_IN_MIN;
import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_ACL_ENABLED;
import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_CLIENT_READ_LIMIT_BYTES_PER_SEC;
import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_DISABLED_OFFSET_LAG_THRESHOLD;
Expand Down Expand Up @@ -597,6 +598,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int snapshotRetentionTimeInMin;
private final int maxConcurrentSnapshotUser;
private final int blobTransferMaxTimeoutInMin;
private final int blobReceiveMaxTimeoutInMin;
private final int blobTransferPeersConnectivityFreshnessInSeconds;
private final long blobTransferClientReadLimitBytesPerSec;
private final long blobTransferServiceWriteLimitBytesPerSec;
Expand Down Expand Up @@ -668,8 +670,9 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
blobTransferAclEnabled = serverProperties.getBoolean(BLOB_TRANSFER_ACL_ENABLED, false);

snapshotRetentionTimeInMin = serverProperties.getInt(BLOB_TRANSFER_SNAPSHOT_RETENTION_TIME_IN_MIN, 60);
maxConcurrentSnapshotUser = serverProperties.getInt(BLOB_TRANSFER_MAX_CONCURRENT_SNAPSHOT_USER, 5);
maxConcurrentSnapshotUser = serverProperties.getInt(BLOB_TRANSFER_MAX_CONCURRENT_SNAPSHOT_USER, 15);
blobTransferMaxTimeoutInMin = serverProperties.getInt(BLOB_TRANSFER_MAX_TIMEOUT_IN_MIN, 60);
blobReceiveMaxTimeoutInMin = serverProperties.getInt(BLOB_RECEIVE_MAX_TIMEOUT_IN_MIN, 60);
blobTransferPeersConnectivityFreshnessInSeconds =
serverProperties.getInt(BLOB_TRANSFER_PEERS_CONNECTIVITY_FRESHNESS_IN_SECONDS, 30);
blobTransferClientReadLimitBytesPerSec =
Expand Down Expand Up @@ -1231,6 +1234,10 @@ public int getBlobTransferMaxTimeoutInMin() {
return blobTransferMaxTimeoutInMin;
}

public int getBlobReceiveMaxTimeoutInMin() {
return blobReceiveMaxTimeoutInMin;
}

public int getBlobTransferPeersConnectivityFreshnessInSeconds() {
return blobTransferPeersConnectivityFreshnessInSeconds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testSameSnapshotWhenConcurrentUsersExceedsMaxAllowedUsers() {
blobSnapshotManager.getTransferMetadata(blobTransferPayload, new AtomicBoolean(false));
} catch (VeniceException e) {
String errorMessage = String.format(
"Exceeded the maximum number of concurrent users %d for topic %s partition %d",
"Exceeded the maximum number of concurrent users %d, request for topic %s partition %d can not be served anymore",
BlobSnapshotManager.DEFAULT_MAX_CONCURRENT_USERS,
TOPIC_NAME,
PARTITION_ID);
Expand Down
Loading
Loading