-
Notifications
You must be signed in to change notification settings - Fork 97
[server][dvc] add snapshot creation listener for flush and sync offset before create snapshot #1711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
5f49dfc
e577b45
3f61f66
4ddb7d8
1633f93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
import static java.util.concurrent.TimeUnit.MINUTES; | ||
import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
||
import com.linkedin.davinci.blobtransfer.BlobTransferUtils; | ||
import com.linkedin.davinci.client.BlockingDaVinciRecordTransformer; | ||
import com.linkedin.davinci.client.DaVinciRecordTransformer; | ||
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig; | ||
|
@@ -40,6 +41,7 @@ | |
import com.linkedin.davinci.storage.StorageMetadataService; | ||
import com.linkedin.davinci.storage.StorageService; | ||
import com.linkedin.davinci.store.AbstractStorageEngine; | ||
import com.linkedin.davinci.store.AbstractStoragePartition; | ||
import com.linkedin.davinci.store.StoragePartitionConfig; | ||
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; | ||
import com.linkedin.davinci.store.memory.InMemoryStorageEngine; | ||
|
@@ -3221,6 +3223,17 @@ protected void processEndOfPush( | |
* Generate snapshot after batch write is done. | ||
*/ | ||
if (storeVersionConfig.isBlobTransferEnabled() && serverConfig.isBlobTransferManagerEnabled()) { | ||
// 1. Add the pre-snapshot listener to partition for hybrid stores. No need to listen to batch stores. | ||
if (isHybridMode()) { | ||
try { | ||
addBlobTransferSnapshotCreationListener(storageEngine, partition); | ||
} catch (Exception e) { | ||
LOGGER | ||
.warn("Failed to setup pre-snapshot listener for topic {} partition {}", kafkaVersionTopic, partition, e); | ||
} | ||
} | ||
|
||
// 2. Create snapshot for blob transfer | ||
storageEngine.createSnapshot(storagePartitionConfig); | ||
} | ||
|
||
|
@@ -4930,4 +4943,30 @@ VeniceConcurrentHashMap<String, Long> getConsumedBytesSinceLastSync() { | |
boolean isGlobalRtDivEnabled() { | ||
return isGlobalRtDivEnabled; // mainly for unit test mocks | ||
} | ||
|
||
/** | ||
* A method that adds the snapshot creation event listener for the given partition. | ||
* And also overrides the pre snapshot creation handler, which will sync the offset for the given partition. | ||
* @param storageEngine | ||
* @param partitionId | ||
*/ | ||
private void addBlobTransferSnapshotCreationListener(AbstractStorageEngine storageEngine, int partitionId) { | ||
AbstractStoragePartition rocksDBPartition = storageEngine.getPartitionOrThrow(partitionId); | ||
rocksDBPartition.addPartitionSnapshotListener(new BlobTransferUtils.BlobTransferSnapshotCreationListener() { | ||
@Override | ||
public void preSnapshotCreationHandler(String storeNameAndVersion, int partitionId) { | ||
LOGGER | ||
.info("Beginning pre-snapshot offset sync for store: {}, partition: {}", storeNameAndVersion, partitionId); | ||
try { | ||
syncOffset(storeNameAndVersion, getPartitionConsumptionState(partitionId)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can trigger race conditions.
Can we leverage something like this?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed the Because we need to wait for this command completion before creating a snapshot for the partition.Resulting we moved some of the snapshot creation logic to SIT. The updated approach is as follows: |
||
LOGGER.info("Completed pre-snapshot sync for store: {}, partition: {}", storeNameAndVersion, partitionId); | ||
} catch (Exception e) { | ||
throw new VeniceException( | ||
"Interrupted while syncing offset before snapshot creation for store : " + storeNameAndVersion | ||
+ ", partition: " + partitionId, | ||
e); | ||
} | ||
} | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we will create a snapshot for hybrid store in Server all the time when this feature is enabled?
Who will clean this up if there is no blob transfer request?
Unused checkpoint can keep the stale data lingering around.