Skip to content

[server][dvc] add snapshot creation listener for flush and sync offset before create snapshot #1711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ public enum BlobTransferTableFormat {
PLAIN_TABLE, BLOCK_BASED_TABLE
}

/**
* Listener interface for blob transfer snapshot operations.
* Implementations of this interface can define actions to be performed before the snapshot creation.
*/
public interface BlobTransferSnapshotCreationListener {
// Handler the event of pre-snapshot creation
void preSnapshotCreationHandler(String storeNameAndVersion, int partitionId);
}

/**
* Check if the HttpResponse message is for metadata.
* @param msg the HttpResponse message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.client.BlockingDaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig;
Expand All @@ -40,6 +41,7 @@
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStoragePartition;
import com.linkedin.davinci.store.StoragePartitionConfig;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.memory.InMemoryStorageEngine;
Expand Down Expand Up @@ -3221,6 +3223,17 @@ protected void processEndOfPush(
* Generate snapshot after batch write is done.
*/
if (storeVersionConfig.isBlobTransferEnabled() && serverConfig.isBlobTransferManagerEnabled()) {
// 1. Add the pre-snapshot listener to partition for hybrid stores. No need to listen to batch stores.
if (isHybridMode()) {
try {
addBlobTransferSnapshotCreationListener(storageEngine, partition);
} catch (Exception e) {
LOGGER
.warn("Failed to setup pre-snapshot listener for topic {} partition {}", kafkaVersionTopic, partition, e);
}
}

// 2. Create snapshot for blob transfer
storageEngine.createSnapshot(storagePartitionConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we will create a snapshot for hybrid store in Server all the time when this feature is enabled?
Who will clean this up if there is no blob transfer request?
Unused checkpoint can keep the stale data lingering around.

}

Expand Down Expand Up @@ -4930,4 +4943,30 @@ VeniceConcurrentHashMap<String, Long> getConsumedBytesSinceLastSync() {
boolean isGlobalRtDivEnabled() {
return isGlobalRtDivEnabled; // mainly for unit test mocks
}

/**
* A method that adds the snapshot creation event listener for the given partition.
* And also overrides the pre snapshot creation handler, which will sync the offset for the given partition.
* @param storageEngine
* @param partitionId
*/
private void addBlobTransferSnapshotCreationListener(AbstractStorageEngine storageEngine, int partitionId) {
AbstractStoragePartition rocksDBPartition = storageEngine.getPartitionOrThrow(partitionId);
rocksDBPartition.addPartitionSnapshotListener(new BlobTransferUtils.BlobTransferSnapshotCreationListener() {
@Override
public void preSnapshotCreationHandler(String storeNameAndVersion, int partitionId) {
LOGGER
.info("Beginning pre-snapshot offset sync for store: {}, partition: {}", storeNameAndVersion, partitionId);
try {
syncOffset(storeNameAndVersion, getPartitionConsumptionState(partitionId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can trigger race conditions.
So far, this function is mainly triggered in drainer thread, and if we invoke it here, the race condition can happen.
If we make it synchronized, it may not solve all the issues since we would like to invoke this function as the last step of the processing.
Some related javadoc:

/**
     * Syncing offset checking in syncOffset() should be the very last step for processing a record.
     *
     * Check whether offset metadata checkpoint will happen; if so, update the producer states recorded in OffsetRecord
     * with the updated producer states maintained in {@link #drainerDiv}
     */
    if (shouldSyncOffset(partitionConsumptionState, record, leaderProducedRecordContext)) {
      updateOffsetMetadataAndSyncOffset(partitionConsumptionState);
    }

Can we leverage something like this?

CompletableFuture<Void> cmdFuture = storeBufferService.execSyncOffsetCommandAsync(topicPartition, this);
              waitForSyncOffsetCmd(cmdFuture, topicPartition);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the syncOffset to this asynchronous execSyncOffsetCommandAsync command.

Because we need to wait for this command completion before creating a snapshot for the partition.Resulting we moved some of the snapshot creation logic to SIT.

The updated approach is as follows:
(1) In SIT, we add a snapshot creation listener for all stores all partitions.
(2) For batch stores, we notify the listener to trigger snapshot creation after EOP. For hybrid stores, we fetch the partition and then notify the listener based on blob transfer requests.
(3) When the listener receives the notification, it executes syncOffsetAndCreateSnapshot, a method overridden in the SIT class, to synchronize the offset and create the snapshot.

LOGGER.info("Completed pre-snapshot sync for store: {}, partition: {}", storeNameAndVersion, partitionId);
} catch (Exception e) {
throw new VeniceException(
"Interrupted while syncing offset before snapshot creation for store : " + storeNameAndVersion
+ ", partition: " + partitionId,
e);
}
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.store;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.store.rocksdb.ReplicationMetadataRocksDBStoragePartition;
import com.linkedin.venice.exceptions.VeniceUnsupportedOperationException;
Expand Down Expand Up @@ -103,6 +104,12 @@ public void reopen() {
*/
public abstract boolean verifyConfig(StoragePartitionConfig storagePartitionConfig);

/**
* Add the snapshot creation event listener to the storage partition.
* @param listener
*/
public abstract void addPartitionSnapshotListener(BlobTransferUtils.BlobTransferSnapshotCreationListener listener);

/**
* Creates a snapshot of the current state of the storage if the blob transfer feature is enabled via the store configuration
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.store.blackhole;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.store.AbstractStoragePartition;
import com.linkedin.davinci.store.StoragePartitionConfig;
Expand Down Expand Up @@ -86,4 +87,9 @@ public long getPartitionSizeInBytes() {
public void createSnapshot() {
throw new UnsupportedOperationException("Method not implemented!");
}

@Override
public void addPartitionSnapshotListener(BlobTransferUtils.BlobTransferSnapshotCreationListener listener) {
throw new UnsupportedOperationException("Method not implemented!");
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.davinci.store.cache;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.store.AbstractStoragePartition;
import com.linkedin.davinci.store.StoragePartitionConfig;
Expand Down Expand Up @@ -135,4 +136,9 @@ public VeniceStoreCache getVeniceCache() {
public void createSnapshot() {
throw new UnsupportedOperationException("Method not implemented!");
}

@Override
public void addPartitionSnapshotListener(BlobTransferUtils.BlobTransferSnapshotCreationListener listener) {
throw new UnsupportedOperationException("Method not implemented!");
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.store.memory;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.store.AbstractStoragePartition;
import com.linkedin.davinci.store.StoragePartitionConfig;
Expand Down Expand Up @@ -127,4 +128,9 @@ public long getPartitionSizeInBytes() {
public void createSnapshot() {
throw new UnsupportedOperationException("Method not implemented!");
}

@Override
public void addPartitionSnapshotListener(BlobTransferUtils.BlobTransferSnapshotCreationListener listener) {
throw new UnsupportedOperationException("Method not implemented!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.davinci.store.AbstractStorageEngine.METADATA_PARTITION_ID;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.stats.RocksDBMemoryStats;
Expand Down Expand Up @@ -155,6 +156,8 @@ public class RocksDBStoragePartition extends AbstractStoragePartition {
protected final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
private RocksDBSstFileWriter rocksDBSstFileWriter = null;

private BlobTransferUtils.BlobTransferSnapshotCreationListener blobTransferSnapshotCreationListener;

protected RocksDBStoragePartition(
StoragePartitionConfig storagePartitionConfig,
RocksDBStorageEngineFactory factory,
Expand Down Expand Up @@ -503,10 +506,24 @@ public synchronized void endBatchWrite() {
rocksDBSstFileWriter.ingestSSTFiles(rocksDB, columnFamilyHandleList);
}

@Override
public void addPartitionSnapshotListener(BlobTransferUtils.BlobTransferSnapshotCreationListener listener) {
this.blobTransferSnapshotCreationListener = listener;
}

@Override
public synchronized void createSnapshot() {
if (blobTransferEnabled) {
createSnapshot(rocksDB, fullPathForPartitionDBSnapshot);
try {
// 1. Notify the handler about the pre snapshot creation event, to flush the data to disk and sync offset.
if (blobTransferSnapshotCreationListener != null) {
blobTransferSnapshotCreationListener.preSnapshotCreationHandler(storeNameAndVersion, partitionId);
}
// 2. Create the snapshot.
createSnapshot(rocksDB, fullPathForPartitionDBSnapshot);
} catch (Exception e) {
throw new VeniceException("Failed to create snapshot for replica: " + replicaId, e);
}
}
}

Expand Down
Loading