Skip to content

[server][dvc] add snapshot creation listener for flush and sync offset before create snapshot #1711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void recreateSnapshotAndMetadata(BlobTransferPayload blobTransferRequest
// 1. get the snapshot metadata before recreating the snapshot
BlobTransferPartitionMetadata metadataBeforeRecreateSnapshot = prepareMetadata(blobTransferRequest);
// 2. recreate the snapshot
createSnapshot(topicName, partitionId);
requestCreateSnapshotForPartition(topicName, partitionId);

// update the snapshot timestamp to reflect the latest snapshot creation time
snapshotTimestamps.get(topicName).put(partitionId, System.currentTimeMillis());
Expand Down Expand Up @@ -295,11 +295,11 @@ public synchronized boolean isStoreHybrid(String storeName, int versionNum) {
/**
* Create a snapshot for a particular partition
*/
public void createSnapshot(String kafkaVersionTopic, int partitionId) {
public void requestCreateSnapshotForPartition(String kafkaVersionTopic, int partitionId) {
AbstractStorageEngine storageEngine =
Objects.requireNonNull(storageEngineRepository.getLocalStorageEngine(kafkaVersionTopic));
AbstractStoragePartition partition = storageEngine.getPartitionOrThrow(partitionId);
partition.createSnapshot();
partition.notifySnapshotCreationListener();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ public enum BlobTransferTableFormat {
PLAIN_TABLE, BLOCK_BASED_TABLE
}

/**
* Listener interface for blob transfer snapshot operations.
* Implementations of this interface can define actions to be performed during the snapshot creation.
*/
public interface BlobTransferSnapshotCreationListener {
// Handler the event of snapshot creation
void syncOffsetAndCreateSnapshot(String storeNameAndVersion, int partitionId);
}

/**
* Check if the HttpResponse message is for metadata.
* @param msg the HttpResponse message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.client.BlockingDaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig;
Expand All @@ -40,6 +41,7 @@
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStoragePartition;
import com.linkedin.davinci.store.StoragePartitionConfig;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.memory.InMemoryStorageEngine;
Expand Down Expand Up @@ -107,6 +109,7 @@
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.stats.StatsErrorCode;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.system.store.MetaStoreWriter;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.ComplementSet;
Expand Down Expand Up @@ -3218,7 +3221,21 @@ protected void processEndOfPush(
* Generate snapshot after batch write is done.
*/
if (storeVersionConfig.isBlobTransferEnabled() && serverConfig.isBlobTransferManagerEnabled()) {
storageEngine.createSnapshot(storagePartitionConfig);
// 1. Add the pre snapshot listener to partition for all stores.
try {
addBlobTransferSnapshotCreationListener(storageEngine, partitionConsumptionState.getPartition(), this);
} catch (Exception e) {
LOGGER.warn(
"Failed to setup snapshot creation listener for topic {} partition {}",
kafkaVersionTopic,
partitionConsumptionState.getPartition(),
e);
}
// 2. Notify the listener to create snapshot after end of push for batch store.
// hybrid store snapshot is created when receiving requests.
if (!isHybridMode()) {
storageEngine.getPartitionOrThrow(storagePartitionConfig.getPartitionId()).notifySnapshotCreationListener();
}
}

/**
Expand Down Expand Up @@ -4941,4 +4958,60 @@ KafkaDataIntegrityValidator getDataIntegrityValidator() {
long getLocalVtSubscribeOffset(PartitionConsumptionState pcs) {
return (isGlobalRtDivEnabled()) ? pcs.getLatestConsumedVtOffset() : pcs.getLatestProcessedLocalVersionTopicOffset();
}

/**
* A method that adds the snapshot creation event listener for the given partition.
* And also overrides syncOffsetAndCreateSnapshot, which will sync the offset and create snapshot for that parition.
*/
private void addBlobTransferSnapshotCreationListener(
AbstractStorageEngine storageEngine,
int partitionId,
StoreIngestionTask storeIngestionTask) {
AbstractStoragePartition rocksDBPartition = storageEngine.getPartitionOrThrow(partitionId);
rocksDBPartition.addPartitionSnapshotListener(new BlobTransferUtils.BlobTransferSnapshotCreationListener() {
@Override
public void syncOffsetAndCreateSnapshot(String storeNameAndVersion, int partitionId) {
LOGGER.info(
"Beginning sync offset and snapshot creation process for store: {}, partition: {}",
storeName,
partitionId);
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(storeName), partitionId);

try {
CompletableFuture<Void> cmdFuture =
storeBufferService.execSyncOffsetCommandAsync(topicPartition, storeIngestionTask);

cmdFuture.thenRunAsync(() -> {
try {
LOGGER.info(
"Offset sync completed, start creating snapshot for store: {}, partition: {}",
storeName,
partitionId);

AbstractStorageEngine storageEngine = storageEngineRepository.getLocalStorageEngine(storeNameAndVersion);
if (storageEngine != null) {
AbstractStoragePartition partition = storageEngine.getPartitionOrThrow(partitionId);
String fullPathForPartitionDBSnapshot =
RocksDBUtils.composeSnapshotDir(serverConfig.getRocksDBPath(), storeNameAndVersion, partitionId);
partition.createSnapshot(fullPathForPartitionDBSnapshot);
}
} catch (Exception e) {
LOGGER.error(
"Failed to create snapshot after offset sync for store: {}, partition: {}",
storeName,
partitionId,
e);
}
});
} catch (Exception e) {
LOGGER.error(
"Failed to initiate offset sync for snapshot creation for store: {}, partition: {}",
storeName,
partitionId,
e);
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,6 @@ public synchronized void endBatchWrite(StoragePartitionConfig storagePartitionCo
}
}

/**
* Create snapshot for the given partition
* @param storagePartitionConfig
*/
public synchronized void createSnapshot(StoragePartitionConfig storagePartitionConfig) {
AbstractStoragePartition partition = getPartitionOrThrow(storagePartitionConfig.getPartitionId());
partition.createSnapshot();
}

private void executeWithSafeGuard(int partitionId, Runnable runnable) {
executeWithSafeGuard(partitionId, () -> {
runnable.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.store;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.store.rocksdb.ReplicationMetadataRocksDBStoragePartition;
import com.linkedin.venice.exceptions.VeniceUnsupportedOperationException;
Expand Down Expand Up @@ -103,10 +104,21 @@ public void reopen() {
*/
public abstract boolean verifyConfig(StoragePartitionConfig storagePartitionConfig);

/**
* Add the snapshot creation event listener to the storage partition.
* @param listener
*/
public abstract void addPartitionSnapshotListener(BlobTransferUtils.BlobTransferSnapshotCreationListener listener);

/**
* Creates a snapshot of the current state of the storage if the blob transfer feature is enabled via the store configuration
*/
public abstract void createSnapshot();
public abstract void createSnapshot(String fullPathForPartitionDBSnapshot);

/**
* Notify the snapshot creation listener about the trigger event.
*/
public abstract void notifySnapshotCreationListener();

/**
* checks whether the current state of the database is valid
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.store.blackhole;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.store.AbstractStoragePartition;
import com.linkedin.davinci.store.StoragePartitionConfig;
Expand Down Expand Up @@ -83,7 +84,17 @@ public long getPartitionSizeInBytes() {
}

@Override
public void createSnapshot() {
public void notifySnapshotCreationListener() {
throw new UnsupportedOperationException("Method not implemented!");
}

@Override
public void createSnapshot(String fullPathForPartitionDBSnapshot) {
throw new UnsupportedOperationException("Method not implemented!");
}

@Override
public void addPartitionSnapshotListener(BlobTransferUtils.BlobTransferSnapshotCreationListener listener) {
throw new UnsupportedOperationException("Method not implemented!");
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.davinci.store.cache;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.store.AbstractStoragePartition;
import com.linkedin.davinci.store.StoragePartitionConfig;
Expand Down Expand Up @@ -132,7 +133,17 @@ public VeniceStoreCache getVeniceCache() {
}

@Override
public void createSnapshot() {
public void notifySnapshotCreationListener() {
throw new UnsupportedOperationException("Method not implemented!");
}

@Override
public void createSnapshot(String fullPathForPartitionDBSnapshot) {
throw new UnsupportedOperationException("Method not implemented!");
}

@Override
public void addPartitionSnapshotListener(BlobTransferUtils.BlobTransferSnapshotCreationListener listener) {
throw new UnsupportedOperationException("Method not implemented!");
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.store.memory;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.store.AbstractStoragePartition;
import com.linkedin.davinci.store.StoragePartitionConfig;
Expand Down Expand Up @@ -124,7 +125,17 @@ public long getPartitionSizeInBytes() {
}

@Override
public void createSnapshot() {
public void notifySnapshotCreationListener() {
throw new UnsupportedOperationException("Method not implemented!");
}

@Override
public void createSnapshot(String fullPathForPartitionDBSnapshot) {
throw new UnsupportedOperationException("Method not implemented!");
}

@Override
public void addPartitionSnapshotListener(BlobTransferUtils.BlobTransferSnapshotCreationListener listener) {
throw new UnsupportedOperationException("Method not implemented!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.davinci.store.AbstractStorageEngine.METADATA_PARTITION_ID;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.stats.RocksDBMemoryStats;
Expand Down Expand Up @@ -155,6 +156,8 @@ public class RocksDBStoragePartition extends AbstractStoragePartition {
protected final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
private RocksDBSstFileWriter rocksDBSstFileWriter = null;

private BlobTransferUtils.BlobTransferSnapshotCreationListener blobTransferSnapshotCreationListener;

protected RocksDBStoragePartition(
StoragePartitionConfig storagePartitionConfig,
RocksDBStorageEngineFactory factory,
Expand Down Expand Up @@ -504,9 +507,20 @@ public synchronized void endBatchWrite() {
}

@Override
public synchronized void createSnapshot() {
if (blobTransferEnabled) {
createSnapshot(rocksDB, fullPathForPartitionDBSnapshot);
public void addPartitionSnapshotListener(BlobTransferUtils.BlobTransferSnapshotCreationListener listener) {
this.blobTransferSnapshotCreationListener = listener;
}

@Override
public synchronized void notifySnapshotCreationListener() {
if (blobTransferEnabled && blobTransferSnapshotCreationListener != null) {
try {
// 1. Notify the listener about the snapshot creation event, to flush the data to disk, sync offset and create
// snapshot.
blobTransferSnapshotCreationListener.syncOffsetAndCreateSnapshot(storeNameAndVersion, partitionId);
} catch (Exception e) {
throw new VeniceException("Failed to create snapshot for replica: " + replicaId, e);
}
}
}

Expand Down Expand Up @@ -1011,10 +1025,11 @@ public AbstractStorageIterator getIterator() {
}

/**
* util method to create a snapshot
* A method to create a snapshot
* It will check the snapshot directory and delete it if it exists, then generate a new snapshot
*/
public static void createSnapshot(RocksDB rocksDB, String fullPathForPartitionDBSnapshot) {
@Override
public void createSnapshot(String fullPathForPartitionDBSnapshot) {
LOGGER.info("Creating snapshot in directory: {}", fullPathForPartitionDBSnapshot);

// clean up the snapshot directory if it exists
Expand Down
Loading