From 42c154a319329825d92b9b8f6c351109a4fe04b3 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Tue, 13 May 2025 16:00:30 -0700 Subject: [PATCH 1/9] [server][dvc] change the client side transfer timeout configurable and close channel once timeout. --- .../com/linkedin/davinci/DaVinciBackend.java | 1 + .../blobtransfer/BlobSnapshotManager.java | 23 ++- .../BlobTransferManagerBuilder.java | 1 + .../blobtransfer/P2PBlobTransferConfig.java | 8 + .../client/NettyFileTransferClient.java | 10 +- .../server/P2PFileTransferServerHandler.java | 158 ++++++++++-------- .../davinci/config/VeniceServerConfig.java | 9 +- .../blobtransfer/BlobSnapshotManagerTest.java | 2 +- .../TestBlobTransferManagerBuilder.java | 2 + .../TestNettyP2PBlobTransferManager.java | 1 + .../java/com/linkedin/venice/ConfigKeys.java | 7 +- .../linkedin/venice/server/VeniceServer.java | 1 + 12 files changed, 146 insertions(+), 77 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 919787580c2..bdcbf8472b4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -311,6 +311,7 @@ public DaVinciBackend( backendConfig.getMaxConcurrentSnapshotUser(), backendConfig.getSnapshotRetentionTimeInMin(), backendConfig.getBlobTransferMaxTimeoutInMin(), + backendConfig.getBlobReceiveMaxTimeoutInMin(), backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled() ? BlobTransferTableFormat.PLAIN_TABLE : BlobTransferTableFormat.BLOCK_BASED_TABLE, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java index 65b377b6929..6a4bfd03ffd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java @@ -38,7 +38,7 @@ public class BlobSnapshotManager { private static final InternalAvroSpecificSerializer storeVersionStateSerializer = AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer(); private final static int DEFAULT_SNAPSHOT_RETENTION_TIME_IN_MIN = 30; - public final static int DEFAULT_MAX_CONCURRENT_USERS = 5; + public final static int DEFAULT_MAX_CONCURRENT_USERS = 15; public final static int DEFAULT_SNAPSHOT_CLEANUP_INTERVAL_IN_MINS = 120; // A map to keep track of the number of hosts using a snapshot for a particular topic and partition, use to restrict @@ -212,15 +212,32 @@ private void recreateSnapshotAndMetadata(BlobTransferPayload blobTransferRequest /** * Check if the concurrent user count exceeds the limit + * The map concurrentSnapshotUsers is a per topic and partition map, so we need to sum up to get the total count that the server handler is currently served. + * + * Note: + * Due to the lock is at per partition per topic level, but the check limitation is globally via concurrentSnapshotUsers. + * Then there may be race condition due to "check-then-increment" window: + * 1. Thread A and thread B both check the concurrentSnapshotUsers count and amount = maxConcurrentUsers - 1, at the same time + * 2. Thread A increment the concurrentSnapshotUsers count, then B increment the concurrentSnapshotUsers count, and both of them are allowed to proceed. + * 3. But actually, the concurrentSnapshotUsers count is maxConcurrentUsers + 1 + * In rare cases, actual concurrent users may temporarily exceed maxConcurrentUsers, but only by a small margin. + * * @param topicName the topic name * @param partitionId the partition id * @throws VeniceException if the concurrent user count exceeds the limit */ private void checkIfConcurrentUserExceedsLimit(String topicName, int partitionId) throws VeniceException { - boolean exceededMaxConcurrentUsers = getConcurrentSnapshotUsers(topicName, partitionId) >= maxConcurrentUsers; + // get the current host level served request count which is all topics and partitions + int totalTopicsPartitionsRequestCount = concurrentSnapshotUsers.values() + .stream() + .flatMap(innerMap -> innerMap.values().stream()) + .mapToInt(AtomicInteger::get) + .sum(); + + boolean exceededMaxConcurrentUsers = totalTopicsPartitionsRequestCount >= maxConcurrentUsers; if (exceededMaxConcurrentUsers) { String errorMessage = String.format( - "Exceeded the maximum number of concurrent users %d for topic %s partition %d", + "Exceeded the maximum number of concurrent users %d, request for topic %s partition %d can not be served anymore", maxConcurrentUsers, topicName, partitionId); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java index 2c218c30fa5..c2209ea784c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java @@ -130,6 +130,7 @@ public BlobTransferManager build() { blobTransferConfig.getBaseDir(), storageMetadataService, blobTransferConfig.getPeersConnectivityFreshnessInSeconds(), + blobTransferConfig.getBlobReceiveTimeoutInMin(), globalTrafficHandler, sslFactory), blobFinder, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java index 538a2bfcc3b..a99ced8b1e1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java @@ -16,6 +16,8 @@ public class P2PBlobTransferConfig { private final int snapshotRetentionTimeInMin; // Max timeout for blob transfer in minutes in server side, to avoid endless sending files. private final int blobTransferMaxTimeoutInMin; + // Max timeout for blob receive in minutes in client side, to avoid endless receiving files. + private final int blobReceiveMaxTimeoutInMin; // Table format private final BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat; // Peers connectivity records freshness in seconds. @@ -34,6 +36,7 @@ public P2PBlobTransferConfig( int maxConcurrentSnapshotUser, int snapshotRetentionTimeInMin, int blobTransferMaxTimeoutInMin, + int blobReceiveMaxTimeoutInMin, BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat, int peersConnectivityFreshnessInSeconds, long blobTransferClientReadLimitBytesPerSec, @@ -45,6 +48,7 @@ public P2PBlobTransferConfig( this.maxConcurrentSnapshotUser = maxConcurrentSnapshotUser; this.snapshotRetentionTimeInMin = snapshotRetentionTimeInMin; this.blobTransferMaxTimeoutInMin = blobTransferMaxTimeoutInMin; + this.blobReceiveMaxTimeoutInMin = blobReceiveMaxTimeoutInMin; this.transferSnapshotTableFormat = transferSnapshotTableFormat; this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds; this.blobTransferClientReadLimitBytesPerSec = blobTransferClientReadLimitBytesPerSec; @@ -76,6 +80,10 @@ public int getBlobTransferMaxTimeoutInMin() { return blobTransferMaxTimeoutInMin; } + public int getBlobReceiveTimeoutInMin() { + return blobReceiveMaxTimeoutInMin; + } + public BlobTransferUtils.BlobTransferTableFormat getTransferSnapshotTableFormat() { return transferSnapshotTableFormat; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java index bc2c147681b..56d2ad7cdf8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java @@ -56,6 +56,7 @@ public class NettyFileTransferClient { private final String baseDir; private final int serverPort; private final int peersConnectivityFreshnessInSeconds; // the freshness of the peers connectivity records + private final int blobReceiveTimeoutInMin; // the timeout for blob receive in minutes in client side private StorageMetadataService storageMetadataService; private final ExecutorService hostConnectExecutorService; private final ScheduledExecutorService connectTimeoutScheduler; @@ -73,12 +74,14 @@ public NettyFileTransferClient( String baseDir, StorageMetadataService storageMetadataService, int peersConnectivityFreshnessInSeconds, + int blobReceiveTimeoutInMin, GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler, Optional sslFactory) { this.baseDir = baseDir; this.serverPort = serverPort; this.storageMetadataService = storageMetadataService; this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds; + this.blobReceiveTimeoutInMin = blobReceiveTimeoutInMin; clientBootstrap = new Bootstrap(); workerGroup = new NioEventLoopGroup(); @@ -292,9 +295,12 @@ public CompletionStage get( .completeExceptionally( new TimeoutException( "Request timed out for store " + storeName + " version " + version + " partition " + partition - + " table format " + requestedTableFormat + " from host " + host)); + + " table format " + requestedTableFormat + " from host " + host + " in " + + blobReceiveTimeoutInMin + " minutes.")); + // Close the channel if the request times out + ch.close(); } - }, REQUEST_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES); + }, blobReceiveTimeoutInMin, TimeUnit.MINUTES); } catch (Exception e) { if (!inputStream.toCompletableFuture().isCompletedExceptionally()) { inputStream.toCompletableFuture().completeExceptionally(e); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java index 9103810baf9..0cd615efd8e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java @@ -35,6 +35,7 @@ import io.netty.handler.stream.ChunkedFile; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.AttributeKey; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -57,6 +58,10 @@ public class P2PFileTransferServerHandler extends SimpleChannelInboundHandler BLOB_TRANSFER_REQUEST = + AttributeKey.valueOf("blobTransferRequest"); + private static final AttributeKey SUCCESS_COUNTED = + AttributeKey.valueOf("successCountedAsActiveCurrentUser"); public P2PFileTransferServerHandler( String baseDir, @@ -92,90 +97,107 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque return; } BlobTransferPayload blobTransferRequest = null; - try { - final File snapshotDir; - BlobTransferPartitionMetadata transferPartitionMetadata; - - try { - blobTransferRequest = parseBlobTransferPayload(URI.create(httpRequest.uri())); - snapshotDir = new File(blobTransferRequest.getSnapshotDir()); - - // Check the snapshot table format - BlobTransferTableFormat currentSnapshotTableFormat = blobSnapshotManager.getBlobTransferTableFormat(); - if (blobTransferRequest.getRequestTableFormat() != currentSnapshotTableFormat) { - byte[] errBody = ("Table format mismatch for " + blobTransferRequest.getFullResourceName() - + ", current snapshot format is " + currentSnapshotTableFormat.name() + ", requested format is " - + blobTransferRequest.getRequestTableFormat().name()).getBytes(); - setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx); - return; - } + final File snapshotDir; + BlobTransferPartitionMetadata transferPartitionMetadata; - try { - transferPartitionMetadata = - blobSnapshotManager.getTransferMetadata(blobTransferRequest, successCountedAsActiveCurrentUser); - } catch (Exception e) { - setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, e.getMessage().getBytes(), false, ctx); - return; - } + try { + blobTransferRequest = parseBlobTransferPayload(URI.create(httpRequest.uri())); + snapshotDir = new File(blobTransferRequest.getSnapshotDir()); - if (!snapshotDir.exists() || !snapshotDir.isDirectory()) { - byte[] errBody = ("Snapshot for " + blobTransferRequest.getFullResourceName() + " doesn't exist").getBytes(); - setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx); - return; - } - } catch (IllegalArgumentException e) { - setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, e.getMessage().getBytes(), false, ctx); + // Check the snapshot table format + BlobTransferTableFormat currentSnapshotTableFormat = blobSnapshotManager.getBlobTransferTableFormat(); + if (blobTransferRequest.getRequestTableFormat() != currentSnapshotTableFormat) { + byte[] errBody = ("Table format mismatch for " + blobTransferRequest.getFullResourceName() + + ", current snapshot format is " + currentSnapshotTableFormat.name() + ", requested format is " + + blobTransferRequest.getRequestTableFormat().name()).getBytes(); + setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx); return; - } catch (SecurityException e) { - setupResponseAndFlush(HttpResponseStatus.FORBIDDEN, e.getMessage().getBytes(), false, ctx); + } + + try { + transferPartitionMetadata = + blobSnapshotManager.getTransferMetadata(blobTransferRequest, successCountedAsActiveCurrentUser); + ctx.channel().attr(SUCCESS_COUNTED).set(successCountedAsActiveCurrentUser); + ctx.channel().attr(BLOB_TRANSFER_REQUEST).set(blobTransferRequest); + } catch (Exception e) { + setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, e.getMessage().getBytes(), false, ctx); return; } - File[] files = snapshotDir.listFiles(); - if (files == null || files.length == 0) { - setupResponseAndFlush( - HttpResponseStatus.INTERNAL_SERVER_ERROR, - ("Failed to access files at " + snapshotDir).getBytes(), - false, - ctx); + if (!snapshotDir.exists() || !snapshotDir.isDirectory()) { + byte[] errBody = ("Snapshot for " + blobTransferRequest.getFullResourceName() + " doesn't exist").getBytes(); + setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, errBody, false, ctx); return; } + } catch (IllegalArgumentException e) { + setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, e.getMessage().getBytes(), false, ctx); + return; + } catch (SecurityException e) { + setupResponseAndFlush(HttpResponseStatus.FORBIDDEN, e.getMessage().getBytes(), false, ctx); + return; + } + + File[] files = snapshotDir.listFiles(); + if (files == null || files.length == 0) { + setupResponseAndFlush( + HttpResponseStatus.INTERNAL_SERVER_ERROR, + ("Failed to access files at " + snapshotDir).getBytes(), + false, + ctx); + return; + } - // Set up the time limitation for the transfer - long startTime = System.currentTimeMillis(); + // Set up the time limitation for the transfer + long startTime = System.currentTimeMillis(); - // transfer files - for (File file: files) { - // check if the transfer for all files is timed out for this partition - if (System.currentTimeMillis() - startTime >= TimeUnit.MINUTES.toMillis(blobTransferMaxTimeoutInMin)) { - String errMessage = String - .format(TRANSFER_TIMEOUT_ERROR_MSG_FORMAT, blobTransferRequest.getFullResourceName(), file.getName()); - LOGGER.error(errMessage); - setupResponseAndFlush(HttpResponseStatus.REQUEST_TIMEOUT, errMessage.getBytes(), false, ctx); - return; - } - // send file - sendFile(file, ctx); + // transfer files + for (File file: files) { + // check if the transfer for all files is timed out for this partition + if (System.currentTimeMillis() - startTime >= TimeUnit.MINUTES.toMillis(blobTransferMaxTimeoutInMin)) { + String errMessage = + String.format(TRANSFER_TIMEOUT_ERROR_MSG_FORMAT, blobTransferRequest.getFullResourceName(), file.getName()); + LOGGER.error(errMessage); + setupResponseAndFlush(HttpResponseStatus.REQUEST_TIMEOUT, errMessage.getBytes(), false, ctx); + return; } + // send file + sendFile(file, ctx); + } - sendMetadata(ctx, transferPartitionMetadata); + sendMetadata(ctx, transferPartitionMetadata); - // end of transfer - HttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - endOfTransfer.headers().set(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED); - String fullResourceName = blobTransferRequest.getFullResourceName(); - ctx.writeAndFlush(endOfTransfer).addListener(future -> { - if (future.isSuccess()) { - LOGGER.debug("All files sent successfully for {}", fullResourceName); - } else { - LOGGER.error("Failed to send all files for {}", fullResourceName, future.cause()); - } - }); - } finally { - if (blobTransferRequest != null && successCountedAsActiveCurrentUser.get()) { + // end of transfer + HttpResponse endOfTransfer = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + endOfTransfer.headers().set(BLOB_TRANSFER_STATUS, BLOB_TRANSFER_COMPLETED); + String fullResourceName = blobTransferRequest.getFullResourceName(); + ctx.writeAndFlush(endOfTransfer).addListener(future -> { + if (future.isSuccess()) { + LOGGER.debug("All files sent successfully for {}", fullResourceName); + } else { + LOGGER.error("Failed to send all files for {}", fullResourceName, future.cause()); + } + }); + } + + /** + * This method is called when the channel is inactive. It is used to decrease the concurrent user count. + * Because the channel is inactive, we can assume that the transfer is complete. + * If we decrease the concurrent user at channelRead0, when the connection is break in half, we will not be able to decrease the count in server side + * @param ctx + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) { + AtomicBoolean successCountedAsActiveCurrentUser = ctx.channel().attr(SUCCESS_COUNTED).get(); + BlobTransferPayload blobTransferRequest = ctx.channel().attr(BLOB_TRANSFER_REQUEST).get(); + if (successCountedAsActiveCurrentUser != null && successCountedAsActiveCurrentUser.get() + && blobTransferRequest != null) { + try { blobSnapshotManager.decreaseConcurrentUserCount(blobTransferRequest); + } catch (Exception e) { + LOGGER.error("Failed to decrease the snapshot concurrent user count for request {}", blobTransferRequest, e); } } + ctx.fireChannelInactive(); } /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 92b470e30a5..fae664ba0fd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -5,6 +5,7 @@ import static com.linkedin.venice.ConfigConstants.DEFAULT_MAX_RECORD_SIZE_BYTES_BACKFILL; import static com.linkedin.venice.ConfigKeys.ACL_IN_MEMORY_CACHE_TTL_MS; import static com.linkedin.venice.ConfigKeys.AUTOCREATE_DATA_PATH; +import static com.linkedin.venice.ConfigKeys.BLOB_RECEIVE_MAX_TIMEOUT_IN_MIN; import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_ACL_ENABLED; import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_CLIENT_READ_LIMIT_BYTES_PER_SEC; import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_DISABLED_OFFSET_LAG_THRESHOLD; @@ -597,6 +598,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final int snapshotRetentionTimeInMin; private final int maxConcurrentSnapshotUser; private final int blobTransferMaxTimeoutInMin; + private final int blobReceiveMaxTimeoutInMin; private final int blobTransferPeersConnectivityFreshnessInSeconds; private final long blobTransferClientReadLimitBytesPerSec; private final long blobTransferServiceWriteLimitBytesPerSec; @@ -668,8 +670,9 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map createServices() { serverConfig.getMaxConcurrentSnapshotUser(), serverConfig.getSnapshotRetentionTimeInMin(), serverConfig.getBlobTransferMaxTimeoutInMin(), + serverConfig.getBlobReceiveMaxTimeoutInMin(), serverConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled() ? BlobTransferTableFormat.PLAIN_TABLE : BlobTransferTableFormat.BLOCK_BASED_TABLE, From 3000c3cbbc1d21a4938b05d06035079ea532087a Mon Sep 17 00:00:00 2001 From: jingy-li Date: Thu, 15 May 2025 15:49:43 -0700 Subject: [PATCH 2/9] Add a unit test to verify that the concurrent user count is reduced when the channel becomes inactive. --- .../blobtransfer/TestP2PFileTransferServerHandler.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index 4d384bb5f98..b320669b47a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -168,6 +168,8 @@ public void testFailOnAccessPath() throws IOException { ch.writeInbound(request); FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 500); + // make the ch inactive + ch.pipeline().fireUserEventTriggered(IdleStateEvent.ALL_IDLE_STATE_EVENT); Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); } @@ -236,6 +238,8 @@ public void testTransferSingleFileAndSingleMetadataForBatchStore() throws IOExce Assert.assertEquals(endOfTransfer.headers().get(BLOB_TRANSFER_STATUS), BLOB_TRANSFER_COMPLETED); // end of STATUS response + // make the ch inactive + ch.pipeline().fireUserEventTriggered(IdleStateEvent.ALL_IDLE_STATE_EVENT); Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); } @@ -329,6 +333,8 @@ public void testTransferMultipleFiles() throws IOException { Assert.assertEquals(endOfTransfer.headers().get(BLOB_TRANSFER_STATUS), BLOB_TRANSFER_COMPLETED); // end of STATUS response + // make the ch inactive + ch.pipeline().fireUserEventTriggered(IdleStateEvent.ALL_IDLE_STATE_EVENT); Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); } From b9393165bf0f613e33bc32bee7e971cf10ac2b64 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Mon, 19 May 2025 10:51:19 -0700 Subject: [PATCH 3/9] Address code review 1: 1. remove client side timeout 2. set global counter --- .../com/linkedin/davinci/DaVinciBackend.java | 1 - .../blobtransfer/BlobSnapshotManager.java | 56 ++------------ .../BlobTransferManagerBuilder.java | 5 +- .../blobtransfer/P2PBlobTransferConfig.java | 8 -- .../client/NettyFileTransferClient.java | 18 ----- .../BlobTransferNettyChannelInitializer.java | 11 ++- .../server/P2PBlobTransferService.java | 6 +- .../server/P2PFileTransferServerHandler.java | 23 +++++- .../davinci/config/VeniceServerConfig.java | 7 -- .../blobtransfer/BlobSnapshotManagerTest.java | 77 ------------------- .../TestBlobTransferManagerBuilder.java | 2 - .../TestNettyP2PBlobTransferManager.java | 4 +- .../TestP2PFileTransferServerHandler.java | 2 +- .../java/com/linkedin/venice/ConfigKeys.java | 2 - .../linkedin/venice/server/VeniceServer.java | 1 - 15 files changed, 46 insertions(+), 177 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index bdcbf8472b4..919787580c2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -311,7 +311,6 @@ public DaVinciBackend( backendConfig.getMaxConcurrentSnapshotUser(), backendConfig.getSnapshotRetentionTimeInMin(), backendConfig.getBlobTransferMaxTimeoutInMin(), - backendConfig.getBlobReceiveMaxTimeoutInMin(), backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled() ? BlobTransferTableFormat.PLAIN_TABLE : BlobTransferTableFormat.BLOCK_BASED_TABLE, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java index 6a4bfd03ffd..fcdc133f1b7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java @@ -60,7 +60,6 @@ public class BlobSnapshotManager { private final StorageEngineRepository storageEngineRepository; private final StorageMetadataService storageMetadataService; - private final int maxConcurrentUsers; private final long snapshotRetentionTimeInMillis; private final int snapshotCleanupIntervalInMins; private final BlobTransferUtils.BlobTransferTableFormat blobTransferTableFormat; @@ -72,13 +71,11 @@ public class BlobSnapshotManager { public BlobSnapshotManager( StorageEngineRepository storageEngineRepository, StorageMetadataService storageMetadataService, - int maxConcurrentUsers, int snapshotRetentionTimeInMin, BlobTransferUtils.BlobTransferTableFormat transferTableFormat, int snapshotCleanupIntervalInMins) { this.storageEngineRepository = storageEngineRepository; this.storageMetadataService = storageMetadataService; - this.maxConcurrentUsers = maxConcurrentUsers; this.snapshotRetentionTimeInMillis = TimeUnit.MINUTES.toMillis(snapshotRetentionTimeInMin); this.blobTransferTableFormat = transferTableFormat; this.snapshotCleanupIntervalInMins = snapshotCleanupIntervalInMins; @@ -106,7 +103,6 @@ public BlobSnapshotManager( this( storageEngineRepository, storageMetadataService, - DEFAULT_MAX_CONCURRENT_USERS, DEFAULT_SNAPSHOT_RETENTION_TIME_IN_MIN, BlobTransferUtils.BlobTransferTableFormat.BLOCK_BASED_TABLE, DEFAULT_SNAPSHOT_CLEANUP_INTERVAL_IN_MINS); @@ -114,12 +110,11 @@ public BlobSnapshotManager( /** * Get the transfer metadata for a particular payload - * 1. throttle the request if many concurrent users. - * 2. check snapshot staleness - * 2.1. if stale: - * 2.1.1. if it does not have active users: recreate the snapshot and metadata, then return the metadata - * 2.1.2. if it has active users: no need to recreate the snapshot, throw an exception to let the client move to next candidate. - * 2.2. if not stale, directly return the metadata + * 1. check snapshot staleness + * 1.1. if stale: + * 1.1.1. if it does not have active users: recreate the snapshot and metadata, then return the metadata + * 1.1.2. if it has active users: no need to recreate the snapshot, throw an exception to let the client move to next candidate. + * 1.2. if not stale, directly return the metadata * * @param payload the blob transfer payload * @param successCountedAsActiveCurrentUser Indicates whether this request has been successfully counted as an active user. @@ -146,9 +141,6 @@ public BlobTransferPartitionMetadata getTransferMetadata( ReentrantLock lock = getSnapshotLock(topicName, partitionId); try (AutoCloseableLock ignored = AutoCloseableLock.of(lock)) { - // 1. check if the concurrent user count exceeds the limit - checkIfConcurrentUserExceedsLimit(topicName, partitionId); - initializeTrackingValues(topicName, partitionId); boolean havingActiveUsers = getConcurrentSnapshotUsers(topicName, partitionId) > 0; @@ -156,7 +148,7 @@ public BlobTransferPartitionMetadata getTransferMetadata( increaseConcurrentUserCount(topicName, partitionId); successCountedAsActiveCurrentUser.set(true); - // 2. Check if the snapshot is stale and needs to be recreated. + // 1. Check if the snapshot is stale and needs to be recreated. // If the snapshot is stale and there are active users, throw an exception to exit early, allowing the client to // try the next available peer. // Even if creating a snapshot is fast, the stale snapshot may still be in use and transferring data for a @@ -210,42 +202,6 @@ private void recreateSnapshotAndMetadata(BlobTransferPayload blobTransferRequest } } - /** - * Check if the concurrent user count exceeds the limit - * The map concurrentSnapshotUsers is a per topic and partition map, so we need to sum up to get the total count that the server handler is currently served. - * - * Note: - * Due to the lock is at per partition per topic level, but the check limitation is globally via concurrentSnapshotUsers. - * Then there may be race condition due to "check-then-increment" window: - * 1. Thread A and thread B both check the concurrentSnapshotUsers count and amount = maxConcurrentUsers - 1, at the same time - * 2. Thread A increment the concurrentSnapshotUsers count, then B increment the concurrentSnapshotUsers count, and both of them are allowed to proceed. - * 3. But actually, the concurrentSnapshotUsers count is maxConcurrentUsers + 1 - * In rare cases, actual concurrent users may temporarily exceed maxConcurrentUsers, but only by a small margin. - * - * @param topicName the topic name - * @param partitionId the partition id - * @throws VeniceException if the concurrent user count exceeds the limit - */ - private void checkIfConcurrentUserExceedsLimit(String topicName, int partitionId) throws VeniceException { - // get the current host level served request count which is all topics and partitions - int totalTopicsPartitionsRequestCount = concurrentSnapshotUsers.values() - .stream() - .flatMap(innerMap -> innerMap.values().stream()) - .mapToInt(AtomicInteger::get) - .sum(); - - boolean exceededMaxConcurrentUsers = totalTopicsPartitionsRequestCount >= maxConcurrentUsers; - if (exceededMaxConcurrentUsers) { - String errorMessage = String.format( - "Exceeded the maximum number of concurrent users %d, request for topic %s partition %d can not be served anymore", - maxConcurrentUsers, - topicName, - partitionId); - LOGGER.error(errorMessage); - throw new VeniceException(errorMessage); - } - } - /** * Check if the snapshot is stale * @param topicName the topic name diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java index c2209ea784c..c3119bc53a8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java @@ -111,7 +111,6 @@ public BlobTransferManager build() { BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager( storageEngineRepository, storageMetadataService, - blobTransferConfig.getMaxConcurrentSnapshotUser(), blobTransferConfig.getSnapshotRetentionTimeInMin(), blobTransferConfig.getTransferSnapshotTableFormat(), blobTransferConfig.getSnapshotCleanupIntervalInMins()); @@ -124,13 +123,13 @@ public BlobTransferManager build() { blobSnapshotManager, globalTrafficHandler, sslFactory, - aclHandler), + aclHandler, + blobTransferConfig.getMaxConcurrentSnapshotUser()), new NettyFileTransferClient( blobTransferConfig.getP2pTransferClientPort(), blobTransferConfig.getBaseDir(), storageMetadataService, blobTransferConfig.getPeersConnectivityFreshnessInSeconds(), - blobTransferConfig.getBlobReceiveTimeoutInMin(), globalTrafficHandler, sslFactory), blobFinder, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java index a99ced8b1e1..538a2bfcc3b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java @@ -16,8 +16,6 @@ public class P2PBlobTransferConfig { private final int snapshotRetentionTimeInMin; // Max timeout for blob transfer in minutes in server side, to avoid endless sending files. private final int blobTransferMaxTimeoutInMin; - // Max timeout for blob receive in minutes in client side, to avoid endless receiving files. - private final int blobReceiveMaxTimeoutInMin; // Table format private final BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat; // Peers connectivity records freshness in seconds. @@ -36,7 +34,6 @@ public P2PBlobTransferConfig( int maxConcurrentSnapshotUser, int snapshotRetentionTimeInMin, int blobTransferMaxTimeoutInMin, - int blobReceiveMaxTimeoutInMin, BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat, int peersConnectivityFreshnessInSeconds, long blobTransferClientReadLimitBytesPerSec, @@ -48,7 +45,6 @@ public P2PBlobTransferConfig( this.maxConcurrentSnapshotUser = maxConcurrentSnapshotUser; this.snapshotRetentionTimeInMin = snapshotRetentionTimeInMin; this.blobTransferMaxTimeoutInMin = blobTransferMaxTimeoutInMin; - this.blobReceiveMaxTimeoutInMin = blobReceiveMaxTimeoutInMin; this.transferSnapshotTableFormat = transferSnapshotTableFormat; this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds; this.blobTransferClientReadLimitBytesPerSec = blobTransferClientReadLimitBytesPerSec; @@ -80,10 +76,6 @@ public int getBlobTransferMaxTimeoutInMin() { return blobTransferMaxTimeoutInMin; } - public int getBlobReceiveTimeoutInMin() { - return blobReceiveMaxTimeoutInMin; - } - public BlobTransferUtils.BlobTransferTableFormat getTransferSnapshotTableFormat() { return transferSnapshotTableFormat; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java index 56d2ad7cdf8..4adb6398f8b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java @@ -39,7 +39,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,7 +46,6 @@ public class NettyFileTransferClient { private static final Logger LOGGER = LogManager.getLogger(NettyFileTransferClient.class); private static final int MAX_METADATA_CONTENT_LENGTH = 1024 * 1024 * 100; - private static final int REQUEST_TIMEOUT_IN_MINUTES = 5; private static final int CONNECTION_TIMEOUT_IN_MINUTES = 1; // Maximum time that Netty will wait to establish the initial connection before failing. private static final int CONNECTION_ESTABLISHMENT_TIMEOUT_MS = 30 * 1000; @@ -56,7 +54,6 @@ public class NettyFileTransferClient { private final String baseDir; private final int serverPort; private final int peersConnectivityFreshnessInSeconds; // the freshness of the peers connectivity records - private final int blobReceiveTimeoutInMin; // the timeout for blob receive in minutes in client side private StorageMetadataService storageMetadataService; private final ExecutorService hostConnectExecutorService; private final ScheduledExecutorService connectTimeoutScheduler; @@ -74,14 +71,12 @@ public NettyFileTransferClient( String baseDir, StorageMetadataService storageMetadataService, int peersConnectivityFreshnessInSeconds, - int blobReceiveTimeoutInMin, GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler, Optional sslFactory) { this.baseDir = baseDir; this.serverPort = serverPort; this.storageMetadataService = storageMetadataService; this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds; - this.blobReceiveTimeoutInMin = blobReceiveTimeoutInMin; clientBootstrap = new Bootstrap(); workerGroup = new NioEventLoopGroup(); @@ -288,19 +283,6 @@ public CompletionStage get( requestedTableFormat)); // Send a GET request ch.writeAndFlush(prepareRequest(storeName, version, partition, requestedTableFormat)); - // Set a timeout, otherwise if the host is not responding, the future will never complete - connectTimeoutScheduler.schedule(() -> { - if (!inputStream.toCompletableFuture().isDone()) { - inputStream.toCompletableFuture() - .completeExceptionally( - new TimeoutException( - "Request timed out for store " + storeName + " version " + version + " partition " + partition - + " table format " + requestedTableFormat + " from host " + host + " in " - + blobReceiveTimeoutInMin + " minutes.")); - // Close the channel if the request times out - ch.close(); - } - }, blobReceiveTimeoutInMin, TimeUnit.MINUTES); } catch (Exception e) { if (!inputStream.toCompletableFuture().isCompletedExceptionally()) { inputStream.toCompletableFuture().completeExceptionally(e); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java index 2efd8f0f470..e1c13c9582b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java @@ -20,6 +20,7 @@ public class BlobTransferNettyChannelInitializer extends ChannelInitializer { private final String baseDir; private final int blobTransferMaxTimeoutInMin; + private final int maxAllowedConcurrentSnapshotUsers; private BlobSnapshotManager blobSnapshotManager; private Optional sslFactory; private Optional aclHandler; @@ -33,13 +34,15 @@ public BlobTransferNettyChannelInitializer( BlobSnapshotManager blobSnapshotManager, GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler, Optional sslFactory, - Optional aclHandler) { + Optional aclHandler, + int maxAllowedConcurrentSnapshotUsers) { this.baseDir = baseDir; this.blobTransferMaxTimeoutInMin = blobTransferMaxTimeoutInMin; this.blobSnapshotManager = blobSnapshotManager; this.globalChannelTrafficShapingHandler = globalChannelTrafficShapingHandler; this.sslFactory = sslFactory; this.aclHandler = aclHandler; + this.maxAllowedConcurrentSnapshotUsers = maxAllowedConcurrentSnapshotUsers; } @Override @@ -67,6 +70,10 @@ protected void initChannel(SocketChannel ch) throws Exception { // for handling p2p file transfer .addLast( "p2pFileTransferHandler", - new P2PFileTransferServerHandler(baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager)); + new P2PFileTransferServerHandler( + baseDir, + blobTransferMaxTimeoutInMin, + blobSnapshotManager, + maxAllowedConcurrentSnapshotUsers)); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java index 73d1610a4d7..e832513c6be 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java @@ -41,7 +41,8 @@ public P2PBlobTransferService( BlobSnapshotManager blobSnapshotManager, GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler, Optional sslFactory, - Optional aclHandler) { + Optional aclHandler, + int maxAllowedConcurrentSnapshotUsers) { this.port = port; this.serverBootstrap = new ServerBootstrap(); this.blobSnapshotManager = blobSnapshotManager; @@ -66,7 +67,8 @@ public P2PBlobTransferService( blobSnapshotManager, globalChannelTrafficShapingHandler, sslFactory, - aclHandler)) + aclHandler, + maxAllowedConcurrentSnapshotUsers)) .option(ChannelOption.SO_BACKLOG, 1000) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java index 0cd615efd8e..c6c5261b62a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java @@ -42,6 +42,7 @@ import java.net.URI; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -57,7 +58,11 @@ public class P2PFileTransferServerHandler extends SimpleChannelInboundHandler BLOB_TRANSFER_REQUEST = AttributeKey.valueOf("blobTransferRequest"); private static final AttributeKey SUCCESS_COUNTED = @@ -66,10 +71,12 @@ public class P2PFileTransferServerHandler extends SimpleChannelInboundHandler= maxAllowedConcurrentSnapshotUsers) { + String errMessage = + "The number of concurrent snapshot users exceeds the limit of " + maxAllowedConcurrentSnapshotUsers + + ", wont be able to process the request for " + blobTransferRequest.getFullResourceName(); + LOGGER.error(errMessage); + setupResponseAndFlush(HttpResponseStatus.TOO_MANY_REQUESTS, errMessage.getBytes(), false, ctx); + return; + } + try { transferPartitionMetadata = blobSnapshotManager.getTransferMetadata(blobTransferRequest, successCountedAsActiveCurrentUser); ctx.channel().attr(SUCCESS_COUNTED).set(successCountedAsActiveCurrentUser); ctx.channel().attr(BLOB_TRANSFER_REQUEST).set(blobTransferRequest); + if (successCountedAsActiveCurrentUser.get()) { + globalConcurrentTransferRequests.incrementAndGet(); + } } catch (Exception e) { setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, e.getMessage().getBytes(), false, ctx); return; @@ -193,6 +213,7 @@ public void channelInactive(ChannelHandlerContext ctx) { && blobTransferRequest != null) { try { blobSnapshotManager.decreaseConcurrentUserCount(blobTransferRequest); + globalConcurrentTransferRequests.decrementAndGet(); } catch (Exception e) { LOGGER.error("Failed to decrease the snapshot concurrent user count for request {}", blobTransferRequest, e); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index fae664ba0fd..052d5ac443f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -5,7 +5,6 @@ import static com.linkedin.venice.ConfigConstants.DEFAULT_MAX_RECORD_SIZE_BYTES_BACKFILL; import static com.linkedin.venice.ConfigKeys.ACL_IN_MEMORY_CACHE_TTL_MS; import static com.linkedin.venice.ConfigKeys.AUTOCREATE_DATA_PATH; -import static com.linkedin.venice.ConfigKeys.BLOB_RECEIVE_MAX_TIMEOUT_IN_MIN; import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_ACL_ENABLED; import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_CLIENT_READ_LIMIT_BYTES_PER_SEC; import static com.linkedin.venice.ConfigKeys.BLOB_TRANSFER_DISABLED_OFFSET_LAG_THRESHOLD; @@ -598,7 +597,6 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final int snapshotRetentionTimeInMin; private final int maxConcurrentSnapshotUser; private final int blobTransferMaxTimeoutInMin; - private final int blobReceiveMaxTimeoutInMin; private final int blobTransferPeersConnectivityFreshnessInSeconds; private final long blobTransferClientReadLimitBytesPerSec; private final long blobTransferServiceWriteLimitBytesPerSec; @@ -672,7 +670,6 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map createServices() { serverConfig.getMaxConcurrentSnapshotUser(), serverConfig.getSnapshotRetentionTimeInMin(), serverConfig.getBlobTransferMaxTimeoutInMin(), - serverConfig.getBlobReceiveMaxTimeoutInMin(), serverConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled() ? BlobTransferTableFormat.PLAIN_TABLE : BlobTransferTableFormat.BLOCK_BASED_TABLE, From 1091c9d2661ade3d554efe601c7c596b0e1df263 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Mon, 19 May 2025 13:01:38 -0700 Subject: [PATCH 4/9] Fix a bug from previously remove snapshot generation from EOP --- .../davinci/blobtransfer/BlobSnapshotManager.java | 1 - .../davinci/ingestion/DefaultIngestionBackend.java | 5 ++++- .../store/rocksdb/RocksDBStoragePartition.java | 11 ++--------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java index fcdc133f1b7..b98f9f55307 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java @@ -38,7 +38,6 @@ public class BlobSnapshotManager { private static final InternalAvroSpecificSerializer storeVersionStateSerializer = AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer(); private final static int DEFAULT_SNAPSHOT_RETENTION_TIME_IN_MIN = 30; - public final static int DEFAULT_MAX_CONCURRENT_USERS = 15; public final static int DEFAULT_SNAPSHOT_CLEANUP_INTERVAL_IN_MINS = 120; // A map to keep track of the number of hosts using a snapshot for a particular topic and partition, use to restrict diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index 196b666b2aa..a252c0dfa22 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -133,7 +133,10 @@ CompletionStage bootstrapFromBlobs( if (storageEngine != null) { storageEngine.dropPartition(partitionId, false); } - LOGGER.info("Clean up the offset and delete partition folder for topic {} partition {}", kafkaTopic, partitionId); + LOGGER.info( + "Clean up the offset and delete partition folder for topic {} partition {} before bootstrap from blob transfer", + kafkaTopic, + partitionId); return blobTransferManager.get(storeName, versionNumber, partitionId, tableFormat) .handle((inputStream, throwable) -> { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java index a6178b20ea9..437c4639600 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java @@ -217,8 +217,7 @@ protected RocksDBStoragePartition( this.expectedChecksumSupplier = Optional.empty(); this.rocksDBThrottler = rocksDbThrottler; this.fullPathForTempSSTFileDir = RocksDBUtils.composeTempSSTFileDir(dbDir, storeNameAndVersion, partitionId); - this.fullPathForPartitionDBSnapshot = - blobTransferEnabled ? RocksDBUtils.composeSnapshotDir(dbDir, storeNameAndVersion, partitionId) : null; + this.fullPathForPartitionDBSnapshot = RocksDBUtils.composeSnapshotDir(dbDir, storeNameAndVersion, partitionId); if (deferredWrite) { this.rocksDBSstFileWriter = new RocksDBSstFileWriter( @@ -357,10 +356,6 @@ protected EnvOptions getEnvOptions() { return envOptions; } - protected Boolean getBlobTransferEnabled() { - return blobTransferEnabled; - } - protected Options getStoreOptions(StoragePartitionConfig storagePartitionConfig, boolean isRMD) { Options options = new Options(); @@ -518,9 +513,7 @@ public synchronized void endBatchWrite() { @Override public synchronized void createSnapshot() { - if (blobTransferEnabled) { - createSnapshot(rocksDB, fullPathForPartitionDBSnapshot); - } + createSnapshot(rocksDB, fullPathForPartitionDBSnapshot); } @Override From e77d688ae14bc38339e8aa0dd82526553da9fbd9 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Mon, 19 May 2025 17:36:20 -0700 Subject: [PATCH 5/9] fix unit test --- .../davinci/store/AbstractStorageEngine.java | 9 --------- .../kafka/consumer/StoreIngestionTaskTest.java | 1 - .../rocksdb/RocksDBStoragePartitionTest.java | 16 ++++------------ 3 files changed, 4 insertions(+), 22 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java index 177275eab92..eb6ba87245a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java @@ -433,15 +433,6 @@ public synchronized void endBatchWrite(StoragePartitionConfig storagePartitionCo } } - /** - * Create snapshot for the given partition - * @param storagePartitionConfig - */ - public synchronized void createSnapshot(StoragePartitionConfig storagePartitionConfig) { - AbstractStoragePartition partition = getPartitionOrThrow(storagePartitionConfig.getPartitionId()); - partition.createSnapshot(); - } - private void executeWithSafeGuard(int partitionId, Runnable runnable) { executeWithSafeGuard(partitionId, () -> { runnable.run(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 7e405550699..99334e11bb3 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -1034,7 +1034,6 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( } else { mockDeepCopyStorageEngine = spy(new DeepCopyStorageEngine(mockAbstractStorageEngine)); doReturn(mockDeepCopyStorageEngine).when(mockStorageEngineRepository).getLocalStorageEngine(topic); - doNothing().when(mockDeepCopyStorageEngine).createSnapshot(any()); } inMemoryLocalKafkaConsumer = diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java index 5d4c972641c..9db08b3a163 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java @@ -1099,8 +1099,8 @@ public void checkMemoryLimitAtDatabaseOpen() { } } - @Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False") - public void testCreateSnapshot(boolean blobTransferEnabled) { + @Test + public void testCreateSnapshot() { String storeName = Version.composeKafkaTopic(Utils.getUniqueString("test_store"), 1); String storeDir = getTempDatabaseDir(storeName); int partitionId = 0; @@ -1116,9 +1116,6 @@ public void testCreateSnapshot(boolean blobTransferEnabled) { RocksDBStorageEngineFactory factory = new RocksDBStorageEngineFactory(serverConfig); VeniceStoreVersionConfig storeConfig = new VeniceStoreVersionConfig(storeName, veniceServerProperties); - // Set the blob transfer enabled flag - storeConfig.setBlobTransferEnabled(blobTransferEnabled); - RocksDBStoragePartition storagePartition = new RocksDBStoragePartition( partitionConfig, factory, @@ -1135,13 +1132,8 @@ public void testCreateSnapshot(boolean blobTransferEnabled) { return null; }); storagePartition.createSnapshot(); - if (blobTransferEnabled) { - rocksDBStoragePartition - .verify(() -> RocksDBStoragePartition.createSnapshot(Mockito.any(), Mockito.any()), Mockito.times(1)); - } else { - rocksDBStoragePartition - .verify(() -> RocksDBStoragePartition.createSnapshot(Mockito.any(), Mockito.any()), Mockito.never()); - } + + rocksDBStoragePartition.verify(() -> RocksDBStoragePartition.createSnapshot(Mockito.any(), Mockito.any())); } if (storagePartition != null) { From 8a28cfec877b53d22de5ac7a41c91cb9a9236045 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Tue, 20 May 2025 14:49:11 -0700 Subject: [PATCH 6/9] Revert "Fix a bug from previously remove snapshot generation from EOP" This reverts commit 1091c9d2661ade3d554efe601c7c596b0e1df263. --- .../davinci/blobtransfer/BlobSnapshotManager.java | 1 + .../davinci/ingestion/DefaultIngestionBackend.java | 5 +---- .../store/rocksdb/RocksDBStoragePartition.java | 11 +++++++++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java index b98f9f55307..fcdc133f1b7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java @@ -38,6 +38,7 @@ public class BlobSnapshotManager { private static final InternalAvroSpecificSerializer storeVersionStateSerializer = AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer(); private final static int DEFAULT_SNAPSHOT_RETENTION_TIME_IN_MIN = 30; + public final static int DEFAULT_MAX_CONCURRENT_USERS = 15; public final static int DEFAULT_SNAPSHOT_CLEANUP_INTERVAL_IN_MINS = 120; // A map to keep track of the number of hosts using a snapshot for a particular topic and partition, use to restrict diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index a252c0dfa22..196b666b2aa 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -133,10 +133,7 @@ CompletionStage bootstrapFromBlobs( if (storageEngine != null) { storageEngine.dropPartition(partitionId, false); } - LOGGER.info( - "Clean up the offset and delete partition folder for topic {} partition {} before bootstrap from blob transfer", - kafkaTopic, - partitionId); + LOGGER.info("Clean up the offset and delete partition folder for topic {} partition {}", kafkaTopic, partitionId); return blobTransferManager.get(storeName, versionNumber, partitionId, tableFormat) .handle((inputStream, throwable) -> { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java index 437c4639600..a6178b20ea9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java @@ -217,7 +217,8 @@ protected RocksDBStoragePartition( this.expectedChecksumSupplier = Optional.empty(); this.rocksDBThrottler = rocksDbThrottler; this.fullPathForTempSSTFileDir = RocksDBUtils.composeTempSSTFileDir(dbDir, storeNameAndVersion, partitionId); - this.fullPathForPartitionDBSnapshot = RocksDBUtils.composeSnapshotDir(dbDir, storeNameAndVersion, partitionId); + this.fullPathForPartitionDBSnapshot = + blobTransferEnabled ? RocksDBUtils.composeSnapshotDir(dbDir, storeNameAndVersion, partitionId) : null; if (deferredWrite) { this.rocksDBSstFileWriter = new RocksDBSstFileWriter( @@ -356,6 +357,10 @@ protected EnvOptions getEnvOptions() { return envOptions; } + protected Boolean getBlobTransferEnabled() { + return blobTransferEnabled; + } + protected Options getStoreOptions(StoragePartitionConfig storagePartitionConfig, boolean isRMD) { Options options = new Options(); @@ -513,7 +518,9 @@ public synchronized void endBatchWrite() { @Override public synchronized void createSnapshot() { - createSnapshot(rocksDB, fullPathForPartitionDBSnapshot); + if (blobTransferEnabled) { + createSnapshot(rocksDB, fullPathForPartitionDBSnapshot); + } } @Override From cb60c0d0790e0148aaff9c777130860210f37ee5 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Tue, 20 May 2025 14:49:15 -0700 Subject: [PATCH 7/9] Revert "fix unit test" This reverts commit e77d688ae14bc38339e8aa0dd82526553da9fbd9. --- .../davinci/store/AbstractStorageEngine.java | 9 +++++++++ .../kafka/consumer/StoreIngestionTaskTest.java | 1 + .../rocksdb/RocksDBStoragePartitionTest.java | 16 ++++++++++++---- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java index eb6ba87245a..177275eab92 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java @@ -433,6 +433,15 @@ public synchronized void endBatchWrite(StoragePartitionConfig storagePartitionCo } } + /** + * Create snapshot for the given partition + * @param storagePartitionConfig + */ + public synchronized void createSnapshot(StoragePartitionConfig storagePartitionConfig) { + AbstractStoragePartition partition = getPartitionOrThrow(storagePartitionConfig.getPartitionId()); + partition.createSnapshot(); + } + private void executeWithSafeGuard(int partitionId, Runnable runnable) { executeWithSafeGuard(partitionId, () -> { runnable.run(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 99334e11bb3..7e405550699 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -1034,6 +1034,7 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( } else { mockDeepCopyStorageEngine = spy(new DeepCopyStorageEngine(mockAbstractStorageEngine)); doReturn(mockDeepCopyStorageEngine).when(mockStorageEngineRepository).getLocalStorageEngine(topic); + doNothing().when(mockDeepCopyStorageEngine).createSnapshot(any()); } inMemoryLocalKafkaConsumer = diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java index 9db08b3a163..5d4c972641c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java @@ -1099,8 +1099,8 @@ public void checkMemoryLimitAtDatabaseOpen() { } } - @Test - public void testCreateSnapshot() { + @Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False") + public void testCreateSnapshot(boolean blobTransferEnabled) { String storeName = Version.composeKafkaTopic(Utils.getUniqueString("test_store"), 1); String storeDir = getTempDatabaseDir(storeName); int partitionId = 0; @@ -1116,6 +1116,9 @@ public void testCreateSnapshot() { RocksDBStorageEngineFactory factory = new RocksDBStorageEngineFactory(serverConfig); VeniceStoreVersionConfig storeConfig = new VeniceStoreVersionConfig(storeName, veniceServerProperties); + // Set the blob transfer enabled flag + storeConfig.setBlobTransferEnabled(blobTransferEnabled); + RocksDBStoragePartition storagePartition = new RocksDBStoragePartition( partitionConfig, factory, @@ -1132,8 +1135,13 @@ public void testCreateSnapshot() { return null; }); storagePartition.createSnapshot(); - - rocksDBStoragePartition.verify(() -> RocksDBStoragePartition.createSnapshot(Mockito.any(), Mockito.any())); + if (blobTransferEnabled) { + rocksDBStoragePartition + .verify(() -> RocksDBStoragePartition.createSnapshot(Mockito.any(), Mockito.any()), Mockito.times(1)); + } else { + rocksDBStoragePartition + .verify(() -> RocksDBStoragePartition.createSnapshot(Mockito.any(), Mockito.any()), Mockito.never()); + } } if (storagePartition != null) { From 10abce81a0921dba43e6fca4c10544030d1093e1 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Tue, 20 May 2025 19:04:28 -0700 Subject: [PATCH 8/9] address code review 2: 1. Add unit test to veriy the 429 error. 2. change initializer use same handler per connection 3. Do check max concurrent user when bump it --- .../blobtransfer/BlobSnapshotManager.java | 1 - .../BlobTransferNettyChannelInitializer.java | 14 ++--- .../server/P2PFileTransferServerHandler.java | 8 ++- .../ingestion/DefaultIngestionBackend.java | 5 +- .../TestP2PFileTransferServerHandler.java | 60 ++++++++++++++++++- 5 files changed, 76 insertions(+), 12 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java index fcdc133f1b7..b98f9f55307 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java @@ -38,7 +38,6 @@ public class BlobSnapshotManager { private static final InternalAvroSpecificSerializer storeVersionStateSerializer = AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer(); private final static int DEFAULT_SNAPSHOT_RETENTION_TIME_IN_MIN = 30; - public final static int DEFAULT_MAX_CONCURRENT_USERS = 15; public final static int DEFAULT_SNAPSHOT_CLEANUP_INTERVAL_IN_MINS = 120; // A map to keep track of the number of hosts using a snapshot for a particular topic and partition, use to restrict diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java index e1c13c9582b..8d2461b7137 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java @@ -27,6 +27,7 @@ public class BlobTransferNettyChannelInitializer extends ChannelInitializer= maxAllowedConcurrentSnapshotUsers) { + String errMessage = + "The number of concurrent snapshot users exceeds the limit of " + maxAllowedConcurrentSnapshotUsers + + ", wont be able to process the request for " + blobTransferRequest.getFullResourceName(); + LOGGER.error(errMessage); + setupResponseAndFlush(HttpResponseStatus.TOO_MANY_REQUESTS, errMessage.getBytes(), false, ctx); + } } } catch (Exception e) { setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, e.getMessage().getBytes(), false, ctx); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index 196b666b2aa..a252c0dfa22 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -133,7 +133,10 @@ CompletionStage bootstrapFromBlobs( if (storageEngine != null) { storageEngine.dropPartition(partitionId, false); } - LOGGER.info("Clean up the offset and delete partition folder for topic {} partition {}", kafkaTopic, partitionId); + LOGGER.info( + "Clean up the offset and delete partition folder for topic {} partition {} before bootstrap from blob transfer", + kafkaTopic, + partitionId); return blobTransferManager.get(storeName, versionNumber, partitionId, tableFormat) .handle((inputStream, throwable) -> { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index 1ed5e68a7d5..f877cf6154a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -4,6 +4,7 @@ import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_TYPE; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferType; +import static com.linkedin.venice.response.VeniceReadResponseStatus.TOO_MANY_REQUESTS; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.davinci.blobtransfer.server.P2PFileTransferServerHandler; @@ -52,6 +53,7 @@ public class TestP2PFileTransferServerHandler { P2PFileTransferServerHandler serverHandler; BlobSnapshotManager blobSnapshotManager; StorageEngineRepository storageEngineRepository; + int maxAllowedConcurrentSnapshotUsers = 20; @BeforeMethod public void setUp() throws IOException { @@ -61,8 +63,11 @@ public void setUp() throws IOException { storageEngineRepository = Mockito.mock(StorageEngineRepository.class); blobSnapshotManager = Mockito.spy(new BlobSnapshotManager(storageEngineRepository, storageMetadataService)); - serverHandler = - new P2PFileTransferServerHandler(baseDir.toString(), blobTransferMaxTimeoutInMin, blobSnapshotManager, 20); + serverHandler = new P2PFileTransferServerHandler( + baseDir.toString(), + blobTransferMaxTimeoutInMin, + blobSnapshotManager, + maxAllowedConcurrentSnapshotUsers); ch = new EmbeddedChannel(serverHandler); } @@ -97,6 +102,57 @@ public void testRejectInvalidPath() { Assert.assertEquals(response.status().code(), 500); } + @Test + public void testRejectTooManyRequest() throws IOException { + AbstractStorageEngine localStorageEngine = Mockito.mock(AbstractStorageEngine.class); + Mockito.doReturn(localStorageEngine).when(storageEngineRepository).getLocalStorageEngine(Mockito.any()); + Mockito.doReturn(true).when(localStorageEngine).containsPartition(Mockito.anyInt()); + + // prepare response from metadata service + StoreVersionState storeVersionState = new StoreVersionState(); + Mockito.doReturn(storeVersionState).when(storageMetadataService).getStoreVersionState(Mockito.any()); + + InternalAvroSpecificSerializer partitionStateSerializer = + AvroProtocolDefinition.PARTITION_STATE.getSerializer(); + OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer); + offsetRecord.setOffsetLag(1000L); + Mockito.doReturn(offsetRecord).when(storageMetadataService).getLastOffset(Mockito.any(), Mockito.anyInt()); + + // prepare the file request + Path snapshotDir = Paths.get(RocksDBUtils.composeSnapshotDir(baseDir.toString(), "myStore_v1", 10)); + Files.createDirectories(snapshotDir); + Path file1 = snapshotDir.resolve("file1"); + Files.write(file1.toAbsolutePath(), "hello".getBytes()); + String file1ChecksumHeader = BlobTransferUtils.generateFileChecksum(file1); + Mockito.doNothing().when(blobSnapshotManager).createSnapshot(Mockito.anyString(), Mockito.anyInt()); + + // Send maxAllowedConcurrentSnapshotUsers + 1 requests + for (int requestCount = 0; requestCount < maxAllowedConcurrentSnapshotUsers; requestCount++) { + FullHttpRequest request = + new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/myStore/1/10/BLOCK_BASED_TABLE"); + ch.writeInbound(request); + } + + // Read all outbound responses and check if at least one is 429 + boolean foundTooManyRequestsResponse = false; + while (true) { + Object outbound = ch.readOutbound(); + if (outbound == null) { + break; + } + + if (outbound instanceof FullHttpResponse) { + FullHttpResponse httpResponse = (FullHttpResponse) outbound; + if (httpResponse.status().code() == TOO_MANY_REQUESTS) { + foundTooManyRequestsResponse = true; + break; + } + } + } + + Assert.assertTrue(foundTooManyRequestsResponse); + } + @Test public void testRejectNonExistPath() { // prepare response from metadata service for the metadata preparation From 8f06310e14249e61339dbcc8639ac650edd6f97f Mon Sep 17 00:00:00 2001 From: jingy-li Date: Tue, 20 May 2025 19:52:59 -0700 Subject: [PATCH 9/9] fix spotbugsTest/spotbugsMain --- .../server/BlobTransferNettyChannelInitializer.java | 8 -------- .../blobtransfer/TestP2PFileTransferServerHandler.java | 1 - 2 files changed, 9 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java index 8d2461b7137..45608759aa4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java @@ -18,10 +18,6 @@ public class BlobTransferNettyChannelInitializer extends ChannelInitializer { - private final String baseDir; - private final int blobTransferMaxTimeoutInMin; - private final int maxAllowedConcurrentSnapshotUsers; - private BlobSnapshotManager blobSnapshotManager; private Optional sslFactory; private Optional aclHandler; @@ -37,13 +33,9 @@ public BlobTransferNettyChannelInitializer( Optional sslFactory, Optional aclHandler, int maxAllowedConcurrentSnapshotUsers) { - this.baseDir = baseDir; - this.blobTransferMaxTimeoutInMin = blobTransferMaxTimeoutInMin; - this.blobSnapshotManager = blobSnapshotManager; this.globalChannelTrafficShapingHandler = globalChannelTrafficShapingHandler; this.sslFactory = sslFactory; this.aclHandler = aclHandler; - this.maxAllowedConcurrentSnapshotUsers = maxAllowedConcurrentSnapshotUsers; this.p2pFileTransferServerHandler = new P2PFileTransferServerHandler( baseDir, blobTransferMaxTimeoutInMin, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index f877cf6154a..d8cd93d1e32 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -123,7 +123,6 @@ public void testRejectTooManyRequest() throws IOException { Files.createDirectories(snapshotDir); Path file1 = snapshotDir.resolve("file1"); Files.write(file1.toAbsolutePath(), "hello".getBytes()); - String file1ChecksumHeader = BlobTransferUtils.generateFileChecksum(file1); Mockito.doNothing().when(blobSnapshotManager).createSnapshot(Mockito.anyString(), Mockito.anyInt()); // Send maxAllowedConcurrentSnapshotUsers + 1 requests