Skip to content

[cc][dvc][test] Add support for SpecificRecord deserialization in DVRT, and add isCaughtUp API for DVRT CDC #1790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b3dc21e
initial commit
kvargha May 5, 2025
1da7e77
Add support for specific records in dvrt
kvargha May 6, 2025
54add59
Add more tests for specific record
kvargha May 8, 2025
acf2ceb
Add isCaughtUp support for dvrt cdc
kvargha May 9, 2025
b6e98a8
Cleanup
kvargha May 9, 2025
08cb1ca
test
kvargha May 9, 2025
2d2fea4
Merge branch 'main' into dvrt-cdc-fixes
kvargha May 9, 2025
0fc01b2
Fix tests
kvargha May 10, 2025
8e4d59e
revert
kvargha May 10, 2025
68b2c0a
merge main
kvargha May 10, 2025
2f8e0ad
remove assertions
kvargha May 10, 2025
748ee90
Update BootstrappingVeniceChangelogConsumer JavaDoc
kvargha May 12, 2025
bd1dd67
Add unit tests for isCaughtUp
kvargha May 13, 2025
2539842
Fix tests
kvargha May 13, 2025
db7e314
Make sure dvc subscription doesn't die silently
kvargha May 13, 2025
62ead2c
Add unit test for specific record in dvrt
kvargha May 13, 2025
d34b0a3
Add support for specificrecord keys
kvargha May 14, 2025
23a909a
cleanup
kvargha May 14, 2025
c419f15
Update javadoc
kvargha May 15, 2025
943722c
make sure to use fast avro for serializer
kvargha May 15, 2025
ca63375
Merge branch 'main' into dvrt-cdc-fixes
kvargha May 15, 2025
f37643e
Throw exception when user calls start multiple times
kvargha May 15, 2025
e970756
Update javadoc
kvargha May 15, 2025
49d48db
Update javadoc
kvargha May 15, 2025
5001bb7
Delete internalStart
kvargha May 15, 2025
a7c8598
Make start synchronized
kvargha May 19, 2025
6415d0a
make isStarted an atomicboolean
kvargha May 19, 2025
3554276
Fix unit tests
kvargha May 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,39 @@
import com.linkedin.venice.utils.lazy.Lazy;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;


/**
* Configuration class for {@link DaVinciRecordTransformer}, which is passed into {@link DaVinciConfig}.
*/
public class DaVinciRecordTransformerConfig {
private final DaVinciRecordTransformerFunctionalInterface recordTransformerFunction;
private final Class keyClass;
private final Class outputValueClass;
private final Schema outputValueSchema;
private final boolean storeRecordsInDaVinci;
private final boolean alwaysBootstrapFromVersionTopic;
private final boolean skipCompatibilityChecks;
private final boolean useSpecificRecordKeyDeserializer;
private final boolean useSpecificRecordValueDeserializer;

public DaVinciRecordTransformerConfig(Builder builder) {
this.recordTransformerFunction = Optional.ofNullable(builder.recordTransformerFunction)
.orElseThrow(() -> new VeniceException("recordTransformerFunction cannot be null"));

this.keyClass = builder.keyClass;
this.outputValueClass = builder.outputValueClass;
this.outputValueSchema = builder.outputValueSchema;
if ((this.outputValueClass != null && this.outputValueSchema == null)
|| (this.outputValueClass == null && this.outputValueSchema != null)) {
throw new VeniceException("outputValueClass and outputValueSchema must be defined together");
}

this.useSpecificRecordKeyDeserializer = keyClass != null && SpecificRecord.class.isAssignableFrom(keyClass);
this.useSpecificRecordValueDeserializer =
outputValueClass != null && SpecificRecord.class.isAssignableFrom(outputValueClass);

this.storeRecordsInDaVinci = builder.storeRecordsInDaVinci;
this.alwaysBootstrapFromVersionTopic = builder.alwaysBootstrapFromVersionTopic;
this.skipCompatibilityChecks = builder.skipCompatibilityChecks;
Expand All @@ -40,13 +49,34 @@ public DaVinciRecordTransformerFunctionalInterface getRecordTransformerFunction(
return recordTransformerFunction;
}

/**
* @return {@link #keyClass}
*/
public Class getKeyClass() {
return keyClass;
}

/**
* @return Whether the {@link SpecificRecord} deserializer should be used for keys
*/
public boolean useSpecificRecordKeyDeserializer() {
return useSpecificRecordKeyDeserializer;
}

/**
* @return {@link #outputValueClass}
*/
public Class getOutputValueClass() {
return outputValueClass;
}

/**
* @return Whether the {@link SpecificRecord} deserializer should be used for values
*/
public boolean useSpecificRecordValueDeserializer() {
return useSpecificRecordValueDeserializer;
}

/**
* @return {@link #outputValueSchema}
*/
Expand Down Expand Up @@ -77,6 +107,7 @@ public boolean shouldSkipCompatibilityChecks() {

public static class Builder {
private DaVinciRecordTransformerFunctionalInterface recordTransformerFunction;
private Class keyClass;
private Class outputValueClass;
private Schema outputValueSchema;
private Boolean storeRecordsInDaVinci = true;
Expand All @@ -92,7 +123,18 @@ public Builder setRecordTransformerFunction(DaVinciRecordTransformerFunctionalIn
}

/**
* Set this if you modify the schema during transformation. Must be used in conjunction with {@link #setOutputValueSchema(Schema)}
* Set this if you want to deserialize keys into {@link org.apache.avro.specific.SpecificRecord}.
* @param keyClass the class of the key
*/
public Builder setKeyClass(Class keyClass) {
this.keyClass = keyClass;
return this;
}

/**
* Set this if you modify the schema during transformation, or you want to deserialize values
* into {@link org.apache.avro.specific.SpecificRecord}.
* Must be used in conjunction with {@link #setOutputValueSchema(Schema)}
* @param outputValueClass the class of the output value
*/
public Builder setOutputValueClass(Class outputValueClass) {
Expand All @@ -101,7 +143,9 @@ public Builder setOutputValueClass(Class outputValueClass) {
}

/**
* Set this if you modify the schema during transformation. Must be used in conjunction with {@link #setOutputValueClass(Class)}
* Set this if you modify the schema during transformation, or you want to deserialize values
* into {@link org.apache.avro.specific.SpecificRecord}.
* Must be used in conjunction with {@link #setOutputValueClass(Class)}
* @param outputValueSchema the schema of the output value
*/
public Builder setOutputValueSchema(Schema outputValueSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serializer.AvroGenericDeserializer;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -27,9 +28,9 @@ public class DaVinciRecordTransformerUtility<K, O> {
private static final Logger LOGGER = LogManager.getLogger(DaVinciRecordTransformerUtility.class);
private final DaVinciRecordTransformer recordTransformer;
private final DaVinciRecordTransformerConfig recordTransformerConfig;
private final AvroGenericDeserializer<K> keyDeserializer;
private final AvroGenericDeserializer<O> outputValueDeserializer;
private final AvroSerializer<O> outputValueSerializer;
private final RecordDeserializer<K> keyDeserializer;
private final RecordDeserializer<O> outputValueDeserializer;
private final RecordSerializer<O> outputValueSerializer;

public DaVinciRecordTransformerUtility(
DaVinciRecordTransformer recordTransformer,
Expand All @@ -38,10 +39,23 @@ public DaVinciRecordTransformerUtility(
this.recordTransformerConfig = recordTransformerConfig;

Schema keySchema = recordTransformer.getKeySchema();
if (recordTransformerConfig.useSpecificRecordKeyDeserializer()) {
this.keyDeserializer = FastSerializerDeserializerFactory
.getFastAvroSpecificDeserializer(keySchema, recordTransformerConfig.getKeyClass());
} else {
this.keyDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(keySchema, keySchema);
}

Schema outputValueSchema = recordTransformer.getOutputValueSchema();
this.keyDeserializer = new AvroGenericDeserializer<>(keySchema, keySchema);
this.outputValueDeserializer = new AvroGenericDeserializer<>(outputValueSchema, outputValueSchema);
this.outputValueSerializer = new AvroSerializer<>(outputValueSchema);
if (recordTransformerConfig.useSpecificRecordValueDeserializer()) {
this.outputValueDeserializer = FastSerializerDeserializerFactory
.getFastAvroSpecificDeserializer(outputValueSchema, recordTransformerConfig.getOutputValueClass());
} else {
this.outputValueDeserializer =
FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(outputValueSchema, outputValueSchema);
}

this.outputValueSerializer = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(outputValueSchema);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ public interface BootstrappingVeniceChangelogConsumer<K, V> {
* NOTE: This future may take some time to complete depending on how much data needs to be ingested in order to catch
* up with the time that this client started.
*
* NOTE: In the experimental client, the future will complete when there is at least one message to be polled.
* We don't wait for all partitions to catch up, as loading every message into a buffer will result in an
* Out Of Memory error. Instead, use the {@link #isCaughtUp()} method to determine once all subscribed partitions have
* caught up.
*
* NOTE: In the experimental client, if you pass in an empty set, it will subscribe to all partitions for the store
*
* @param partitions which partition id's to catch up with
* @return a future that completes once catch up is complete for all passed in partitions.
*/
Expand All @@ -41,9 +48,21 @@ public interface BootstrappingVeniceChangelogConsumer<K, V> {
/**
* polls for the next batch of change events. The first records returned following calling 'start()' will be from the bootstrap state.
* Once this state is consumed, subsequent calls to poll will be based off of recent updates to the Venice store.
*
* In the experimental client, records will be returned in batches configured to the MAX_BUFFER_SIZE. So the initial
* calls to poll will be from records from the bootstrap state, until the partitions have caught up.
* Additionally, if the buffer hits the MAX_BUFFER_SIZE before the timeout is hit, poll will return immediately.
*
* @param timeoutInMs
* @return
*/
Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll(long timeoutInMs);

/**
* In the experimental client, once this becomes true it will stay true even if we start to lag after the
* bootstrapping phase.
* @return True if all subscribed partitions have caught up.
*/
boolean isCaughtUp();

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand All @@ -46,6 +47,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.avro.Schema;
Expand All @@ -65,7 +67,7 @@ public class BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl<K,
// A buffer of messages that will be returned to the user
private final BlockingQueue<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pubSubMessages;
// Determines what version per partition is currently serving
private final ConcurrentHashMap<Integer, Integer> partitionToVersionToServe;
private final Map<Integer, Integer> partitionToVersionToServe;
private final DaVinciRecordTransformerConfig recordTransformerConfig;
// CachingDaVinciClientFactory used instead of DaVinciClientFactory, so we have the ability to close down the client
private final CachingDaVinciClientFactory daVinciClientFactory;
Expand All @@ -80,6 +82,7 @@ public class BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl<K,
private final ApacheKafkaOffsetPosition placeHolderOffset = ApacheKafkaOffsetPosition.of(0);
private final ReentrantLock bufferLock = new ReentrantLock();
private final Condition bufferIsFullCondition = bufferLock.newCondition();
private final AtomicBoolean isCaughtUp = new AtomicBoolean(false);

public BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl(ChangelogClientConfig changelogClientConfig) {
this.changelogClientConfig = changelogClientConfig;
Expand All @@ -96,6 +99,9 @@ public BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl(Changelo
* DVRT implmentation. This is to prevent the local state from being wiped everytime a change is deployed
*/
.setSkipCompatibilityChecks(true)
.setKeyClass(innerClientConfig.getSpecificKeyClass())
.setOutputValueClass(innerClientConfig.getSpecificValueClass())
.setOutputValueSchema(innerClientConfig.getSpecificValueSchema())
.build();
daVinciConfig.setRecordTransformerConfig(recordTransformerConfig);

Expand All @@ -115,24 +121,23 @@ public BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl(Changelo

@Override
public CompletableFuture<Void> start(Set<Integer> partitions) {
internalStart();
subscribedPartitions.addAll(partitions);
if (isStarted) {
throw new VeniceException("BootstrappingVeniceChangelogConsumer is already started!");
}

/*
* Avoid waiting on the CompletableFuture to prevent a circular dependency.
* When subscribe is called, DVRT scans the entire storage engine and fills pubSubMessages.
* Because pubSubMessages has limited capacity, blocking on the CompletableFuture
* prevents the user from calling poll to drain pubSubMessages, so the threads populating pubSubMessages
* will wait forever for capacity to become available. This leads to a deadlock.
*/
daVinciClient.subscribe(partitions).whenComplete((result, error) -> {
if (error != null) {
LOGGER.error("Failed to subscribe to partitions: {} for store: {}", partitions, storeName, error);
throw new VeniceException(error);
daVinciClient.start();
isStarted = true;

// If a user passes in empty partitions set, we subscribe to all partitions
if (partitions.isEmpty()) {
for (int i = 0; i < daVinciClient.getPartitionCount(); i++) {
subscribedPartitions.add(i);
}
});
} else {
subscribedPartitions.addAll(partitions);
}

return CompletableFuture.supplyAsync(() -> {
CompletableFuture<Void> startFuture = CompletableFuture.supplyAsync(() -> {
try {
/*
* When this latch gets released, this means there's at least one message in pubSubMessages. So when the user
Expand All @@ -147,18 +152,34 @@ public CompletableFuture<Void> start(Set<Integer> partitions) {
}
return null;
}, completableFutureThreadPool);

/*
* Avoid waiting on the CompletableFuture to prevent a circular dependency.
* When subscribe is called, DVRT scans the entire storage engine and fills pubSubMessages.
* Because pubSubMessages has limited capacity, blocking on the CompletableFuture
* prevents the user from calling poll to drain pubSubMessages, so the threads populating pubSubMessages
* will wait forever for capacity to become available. This leads to a deadlock.
*/
daVinciClient.subscribe(subscribedPartitions).whenComplete((result, error) -> {
if (error != null) {
LOGGER.error("Failed to subscribe to partitions: {} for store: {}", subscribedPartitions, storeName, error);
startFuture.completeExceptionally(new VeniceException(error));
return;
}

isCaughtUp.set(true);
LOGGER.info(
"BootstrappingVeniceChangelogConsumer is caught up for store: {} for partitions: {}",
storeName,
subscribedPartitions);
});

return startFuture;
}

@Override
public CompletableFuture<Void> start() {
internalStart();

Set<Integer> allPartitions = new HashSet<>();
for (int i = 0; i < daVinciClient.getPartitionCount(); i++) {
allPartitions.add(i);
}

return this.start(allPartitions);
return this.start(Collections.emptySet());
}

@Override
Expand Down Expand Up @@ -204,13 +225,9 @@ public Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll
return drainedPubSubMessages;
}

private void internalStart() {
if (isStarted) {
return;
}

daVinciClient.start();
isStarted = true;
@Override
public boolean isCaughtUp() {
return isCaughtUp.get();
}

private VeniceProperties buildVeniceConfig() {
Expand Down
Loading
Loading