Skip to content

Commit 9c86b25

Browse files
authored
[cc][dvc][test] Add support for SpecificRecord deserialization in DVRT, and add isCaughtUp API for DVRT CDC (#1790)
Problem Statement 1. Currently, DVRT only supports deserializing keys and values into Avro GenericRecords. This is a problem as for complex schemas this can lead to poor deserialization performance. Additionally, regular CDC client supports deserializing values to SpecificRecords, but DVRT CDC doesn't. 2. In DVRT CDC, there is no way for the user to tell whether the client has caught up. 3. During the start phase of DVRT CDC, subscription to the DVC can die silently. 4. We allow users to call start multiple times in DVRT CDC. 5. The javadoc for BootstrappingVeniceChangelogConsumer hasn't been updated since the introduction of DVRT CDC. 6. DaVinciClientRecordTransformerTest::testRecordTransformer has been failing consistently in the CI. Solution 1. Add support for SpecificRecord deserialization for keys and values in DVRT to improve performance, and so DVRT CDC can benefit from it. Additionally, I've moved deserialization and serialization to FastAvro as it can perform 90% better when deserializing complex schemas. Please note that the regular CDC client doesn't support SpecificRecord for keys, but we are adding it for DVRT CDC since a user is requesting it. 2. To provide context to the user on whether they're caught up in DVRT CDC, the isCaughtUp API is added to the BootstrappingVeniceChangelogConsumer interface. Since the original BootstrappingVeniceChangelogConsumer implementation is extended from VeniceAfterImageConsumerImpl, it already supports isCaughtUp. 3. To prevent subscription to DVC in DVRT CDC from dying silently, I re-organized the futures and if subscription fails to DVC we will complete the future returned to the user exceptionally. 4. If a user calls start multiple times on DVRT CDC, we now throw an exception. Additionally, if a user passes in an empty set to start we will subscribe to all partitions. I've also added sychronized to start. 5. Updated the javadoc for BootstrappingVeniceChangelogConsumer, explaining how it behaves differently compared to the regular CDC client. 6. To make DaVinciClientRecordTransformerTest::testRecordTransformer pass consistently, we need to ensure that this test runs first.
1 parent 1223ed3 commit 9c86b25

20 files changed

+1202
-160
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerConfig.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,39 @@
44
import com.linkedin.venice.utils.lazy.Lazy;
55
import java.util.Optional;
66
import org.apache.avro.Schema;
7+
import org.apache.avro.specific.SpecificRecord;
78

89

910
/**
1011
* Configuration class for {@link DaVinciRecordTransformer}, which is passed into {@link DaVinciConfig}.
1112
*/
1213
public class DaVinciRecordTransformerConfig {
1314
private final DaVinciRecordTransformerFunctionalInterface recordTransformerFunction;
15+
private final Class keyClass;
1416
private final Class outputValueClass;
1517
private final Schema outputValueSchema;
1618
private final boolean storeRecordsInDaVinci;
1719
private final boolean alwaysBootstrapFromVersionTopic;
1820
private final boolean skipCompatibilityChecks;
21+
private final boolean useSpecificRecordKeyDeserializer;
22+
private final boolean useSpecificRecordValueDeserializer;
1923

2024
public DaVinciRecordTransformerConfig(Builder builder) {
2125
this.recordTransformerFunction = Optional.ofNullable(builder.recordTransformerFunction)
2226
.orElseThrow(() -> new VeniceException("recordTransformerFunction cannot be null"));
2327

28+
this.keyClass = builder.keyClass;
2429
this.outputValueClass = builder.outputValueClass;
2530
this.outputValueSchema = builder.outputValueSchema;
2631
if ((this.outputValueClass != null && this.outputValueSchema == null)
2732
|| (this.outputValueClass == null && this.outputValueSchema != null)) {
2833
throw new VeniceException("outputValueClass and outputValueSchema must be defined together");
2934
}
3035

36+
this.useSpecificRecordKeyDeserializer = keyClass != null && SpecificRecord.class.isAssignableFrom(keyClass);
37+
this.useSpecificRecordValueDeserializer =
38+
outputValueClass != null && SpecificRecord.class.isAssignableFrom(outputValueClass);
39+
3140
this.storeRecordsInDaVinci = builder.storeRecordsInDaVinci;
3241
this.alwaysBootstrapFromVersionTopic = builder.alwaysBootstrapFromVersionTopic;
3342
this.skipCompatibilityChecks = builder.skipCompatibilityChecks;
@@ -40,13 +49,34 @@ public DaVinciRecordTransformerFunctionalInterface getRecordTransformerFunction(
4049
return recordTransformerFunction;
4150
}
4251

52+
/**
53+
* @return {@link #keyClass}
54+
*/
55+
public Class getKeyClass() {
56+
return keyClass;
57+
}
58+
59+
/**
60+
* @return Whether the {@link SpecificRecord} deserializer should be used for keys
61+
*/
62+
public boolean useSpecificRecordKeyDeserializer() {
63+
return useSpecificRecordKeyDeserializer;
64+
}
65+
4366
/**
4467
* @return {@link #outputValueClass}
4568
*/
4669
public Class getOutputValueClass() {
4770
return outputValueClass;
4871
}
4972

73+
/**
74+
* @return Whether the {@link SpecificRecord} deserializer should be used for values
75+
*/
76+
public boolean useSpecificRecordValueDeserializer() {
77+
return useSpecificRecordValueDeserializer;
78+
}
79+
5080
/**
5181
* @return {@link #outputValueSchema}
5282
*/
@@ -77,6 +107,7 @@ public boolean shouldSkipCompatibilityChecks() {
77107

78108
public static class Builder {
79109
private DaVinciRecordTransformerFunctionalInterface recordTransformerFunction;
110+
private Class keyClass;
80111
private Class outputValueClass;
81112
private Schema outputValueSchema;
82113
private Boolean storeRecordsInDaVinci = true;
@@ -92,7 +123,18 @@ public Builder setRecordTransformerFunction(DaVinciRecordTransformerFunctionalIn
92123
}
93124

94125
/**
95-
* Set this if you modify the schema during transformation. Must be used in conjunction with {@link #setOutputValueSchema(Schema)}
126+
* Set this if you want to deserialize keys into {@link org.apache.avro.specific.SpecificRecord}.
127+
* @param keyClass the class of the key
128+
*/
129+
public Builder setKeyClass(Class keyClass) {
130+
this.keyClass = keyClass;
131+
return this;
132+
}
133+
134+
/**
135+
* Set this if you modify the schema during transformation, or you want to deserialize values
136+
* into {@link org.apache.avro.specific.SpecificRecord}.
137+
* Must be used in conjunction with {@link #setOutputValueSchema(Schema)}
96138
* @param outputValueClass the class of the output value
97139
*/
98140
public Builder setOutputValueClass(Class outputValueClass) {
@@ -101,7 +143,9 @@ public Builder setOutputValueClass(Class outputValueClass) {
101143
}
102144

103145
/**
104-
* Set this if you modify the schema during transformation. Must be used in conjunction with {@link #setOutputValueClass(Class)}
146+
* Set this if you modify the schema during transformation, or you want to deserialize values
147+
* into {@link org.apache.avro.specific.SpecificRecord}.
148+
* Must be used in conjunction with {@link #setOutputValueClass(Class)}
105149
* @param outputValueSchema the schema of the output value
106150
*/
107151
public Builder setOutputValueSchema(Schema outputValueSchema) {

clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerUtility.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
import com.linkedin.venice.kafka.protocol.state.PartitionState;
77
import com.linkedin.venice.offsets.OffsetRecord;
88
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
9-
import com.linkedin.venice.serializer.AvroGenericDeserializer;
10-
import com.linkedin.venice.serializer.AvroSerializer;
9+
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
10+
import com.linkedin.venice.serializer.RecordDeserializer;
11+
import com.linkedin.venice.serializer.RecordSerializer;
1112
import com.linkedin.venice.utils.lazy.Lazy;
1213
import java.io.IOException;
1314
import java.nio.ByteBuffer;
@@ -27,9 +28,9 @@ public class DaVinciRecordTransformerUtility<K, O> {
2728
private static final Logger LOGGER = LogManager.getLogger(DaVinciRecordTransformerUtility.class);
2829
private final DaVinciRecordTransformer recordTransformer;
2930
private final DaVinciRecordTransformerConfig recordTransformerConfig;
30-
private final AvroGenericDeserializer<K> keyDeserializer;
31-
private final AvroGenericDeserializer<O> outputValueDeserializer;
32-
private final AvroSerializer<O> outputValueSerializer;
31+
private final RecordDeserializer<K> keyDeserializer;
32+
private final RecordDeserializer<O> outputValueDeserializer;
33+
private final RecordSerializer<O> outputValueSerializer;
3334

3435
public DaVinciRecordTransformerUtility(
3536
DaVinciRecordTransformer recordTransformer,
@@ -38,10 +39,23 @@ public DaVinciRecordTransformerUtility(
3839
this.recordTransformerConfig = recordTransformerConfig;
3940

4041
Schema keySchema = recordTransformer.getKeySchema();
42+
if (recordTransformerConfig.useSpecificRecordKeyDeserializer()) {
43+
this.keyDeserializer = FastSerializerDeserializerFactory
44+
.getFastAvroSpecificDeserializer(keySchema, recordTransformerConfig.getKeyClass());
45+
} else {
46+
this.keyDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(keySchema, keySchema);
47+
}
48+
4149
Schema outputValueSchema = recordTransformer.getOutputValueSchema();
42-
this.keyDeserializer = new AvroGenericDeserializer<>(keySchema, keySchema);
43-
this.outputValueDeserializer = new AvroGenericDeserializer<>(outputValueSchema, outputValueSchema);
44-
this.outputValueSerializer = new AvroSerializer<>(outputValueSchema);
50+
if (recordTransformerConfig.useSpecificRecordValueDeserializer()) {
51+
this.outputValueDeserializer = FastSerializerDeserializerFactory
52+
.getFastAvroSpecificDeserializer(outputValueSchema, recordTransformerConfig.getOutputValueClass());
53+
} else {
54+
this.outputValueDeserializer =
55+
FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(outputValueSchema, outputValueSchema);
56+
}
57+
58+
this.outputValueSerializer = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(outputValueSchema);
4559
}
4660

4761
/**

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/BootstrappingVeniceChangelogConsumer.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ public interface BootstrappingVeniceChangelogConsumer<K, V> {
2929
* NOTE: This future may take some time to complete depending on how much data needs to be ingested in order to catch
3030
* up with the time that this client started.
3131
*
32+
* NOTE: In the experimental client, the future will complete when there is at least one message to be polled.
33+
* We don't wait for all partitions to catch up, as loading every message into a buffer will result in an
34+
* Out Of Memory error. Instead, use the {@link #isCaughtUp()} method to determine once all subscribed partitions have
35+
* caught up.
36+
*
37+
* NOTE: In the experimental client, if you pass in an empty set, it will subscribe to all partitions for the store
38+
*
3239
* @param partitions which partition id's to catch up with
3340
* @return a future that completes once catch up is complete for all passed in partitions.
3441
*/
@@ -41,9 +48,21 @@ public interface BootstrappingVeniceChangelogConsumer<K, V> {
4148
/**
4249
* polls for the next batch of change events. The first records returned following calling 'start()' will be from the bootstrap state.
4350
* Once this state is consumed, subsequent calls to poll will be based off of recent updates to the Venice store.
51+
*
52+
* In the experimental client, records will be returned in batches configured to the MAX_BUFFER_SIZE. So the initial
53+
* calls to poll will be from records from the bootstrap state, until the partitions have caught up.
54+
* Additionally, if the buffer hits the MAX_BUFFER_SIZE before the timeout is hit, poll will return immediately.
55+
*
4456
* @param timeoutInMs
4557
* @return
4658
*/
4759
Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll(long timeoutInMs);
4860

61+
/**
62+
* In the experimental client, once this becomes true it will stay true even if we start to lag after the
63+
* bootstrapping phase.
64+
* @return True if all subscribed partitions have caught up.
65+
*/
66+
boolean isCaughtUp();
67+
4968
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl.java

Lines changed: 51 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.IOException;
3333
import java.util.ArrayList;
3434
import java.util.Collection;
35+
import java.util.Collections;
3536
import java.util.HashMap;
3637
import java.util.HashSet;
3738
import java.util.LinkedHashMap;
@@ -46,6 +47,7 @@
4647
import java.util.concurrent.ExecutorService;
4748
import java.util.concurrent.Executors;
4849
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.atomic.AtomicBoolean;
4951
import java.util.concurrent.locks.Condition;
5052
import java.util.concurrent.locks.ReentrantLock;
5153
import org.apache.avro.Schema;
@@ -65,12 +67,12 @@ public class BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl<K,
6567
// A buffer of messages that will be returned to the user
6668
private final BlockingQueue<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pubSubMessages;
6769
// Determines what version per partition is currently serving
68-
private final ConcurrentHashMap<Integer, Integer> partitionToVersionToServe;
70+
private final Map<Integer, Integer> partitionToVersionToServe;
6971
private final DaVinciRecordTransformerConfig recordTransformerConfig;
7072
// CachingDaVinciClientFactory used instead of DaVinciClientFactory, so we have the ability to close down the client
7173
private final CachingDaVinciClientFactory daVinciClientFactory;
7274
private final DaVinciClient<Object, Object> daVinciClient;
73-
private boolean isStarted = false;
75+
private AtomicBoolean isStarted = new AtomicBoolean(false);
7476
private final CountDownLatch startLatch = new CountDownLatch(1);
7577
// Using a dedicated thread pool for CompletableFutures created by this class to avoid potential thread starvation
7678
// issues in the default ForkJoinPool
@@ -80,6 +82,7 @@ public class BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl<K,
8082
private final ApacheKafkaOffsetPosition placeHolderOffset = ApacheKafkaOffsetPosition.of(0);
8183
private final ReentrantLock bufferLock = new ReentrantLock();
8284
private final Condition bufferIsFullCondition = bufferLock.newCondition();
85+
private final AtomicBoolean isCaughtUp = new AtomicBoolean(false);
8386

8487
public BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl(ChangelogClientConfig changelogClientConfig) {
8588
this.changelogClientConfig = changelogClientConfig;
@@ -96,6 +99,9 @@ public BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl(Changelo
9699
* DVRT implmentation. This is to prevent the local state from being wiped everytime a change is deployed
97100
*/
98101
.setSkipCompatibilityChecks(true)
102+
.setKeyClass(innerClientConfig.getSpecificKeyClass())
103+
.setOutputValueClass(innerClientConfig.getSpecificValueClass())
104+
.setOutputValueSchema(innerClientConfig.getSpecificValueSchema())
99105
.build();
100106
daVinciConfig.setRecordTransformerConfig(recordTransformerConfig);
101107

@@ -114,25 +120,24 @@ public BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl(Changelo
114120
}
115121

116122
@Override
117-
public CompletableFuture<Void> start(Set<Integer> partitions) {
118-
internalStart();
119-
subscribedPartitions.addAll(partitions);
123+
public synchronized CompletableFuture<Void> start(Set<Integer> partitions) {
124+
if (isStarted.get()) {
125+
throw new VeniceException("BootstrappingVeniceChangelogConsumer is already started!");
126+
}
120127

121-
/*
122-
* Avoid waiting on the CompletableFuture to prevent a circular dependency.
123-
* When subscribe is called, DVRT scans the entire storage engine and fills pubSubMessages.
124-
* Because pubSubMessages has limited capacity, blocking on the CompletableFuture
125-
* prevents the user from calling poll to drain pubSubMessages, so the threads populating pubSubMessages
126-
* will wait forever for capacity to become available. This leads to a deadlock.
127-
*/
128-
daVinciClient.subscribe(partitions).whenComplete((result, error) -> {
129-
if (error != null) {
130-
LOGGER.error("Failed to subscribe to partitions: {} for store: {}", partitions, storeName, error);
131-
throw new VeniceException(error);
128+
daVinciClient.start();
129+
isStarted.set(true);
130+
131+
// If a user passes in empty partitions set, we subscribe to all partitions
132+
if (partitions.isEmpty()) {
133+
for (int i = 0; i < daVinciClient.getPartitionCount(); i++) {
134+
subscribedPartitions.add(i);
132135
}
133-
});
136+
} else {
137+
subscribedPartitions.addAll(partitions);
138+
}
134139

135-
return CompletableFuture.supplyAsync(() -> {
140+
CompletableFuture<Void> startFuture = CompletableFuture.supplyAsync(() -> {
136141
try {
137142
/*
138143
* When this latch gets released, this means there's at least one message in pubSubMessages. So when the user
@@ -147,24 +152,40 @@ public CompletableFuture<Void> start(Set<Integer> partitions) {
147152
}
148153
return null;
149154
}, completableFutureThreadPool);
155+
156+
/*
157+
* Avoid waiting on the CompletableFuture to prevent a circular dependency.
158+
* When subscribe is called, DVRT scans the entire storage engine and fills pubSubMessages.
159+
* Because pubSubMessages has limited capacity, blocking on the CompletableFuture
160+
* prevents the user from calling poll to drain pubSubMessages, so the threads populating pubSubMessages
161+
* will wait forever for capacity to become available. This leads to a deadlock.
162+
*/
163+
daVinciClient.subscribe(subscribedPartitions).whenComplete((result, error) -> {
164+
if (error != null) {
165+
LOGGER.error("Failed to subscribe to partitions: {} for store: {}", subscribedPartitions, storeName, error);
166+
startFuture.completeExceptionally(new VeniceException(error));
167+
return;
168+
}
169+
170+
isCaughtUp.set(true);
171+
LOGGER.info(
172+
"BootstrappingVeniceChangelogConsumer is caught up for store: {} for partitions: {}",
173+
storeName,
174+
subscribedPartitions);
175+
});
176+
177+
return startFuture;
150178
}
151179

152180
@Override
153181
public CompletableFuture<Void> start() {
154-
internalStart();
155-
156-
Set<Integer> allPartitions = new HashSet<>();
157-
for (int i = 0; i < daVinciClient.getPartitionCount(); i++) {
158-
allPartitions.add(i);
159-
}
160-
161-
return this.start(allPartitions);
182+
return this.start(Collections.emptySet());
162183
}
163184

164185
@Override
165186
public void stop() throws Exception {
166187
daVinciClientFactory.close();
167-
isStarted = false;
188+
isStarted.set(false);
168189
}
169190

170191
@Override
@@ -204,13 +225,9 @@ public Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll
204225
return drainedPubSubMessages;
205226
}
206227

207-
private void internalStart() {
208-
if (isStarted) {
209-
return;
210-
}
211-
212-
daVinciClient.start();
213-
isStarted = true;
228+
@Override
229+
public boolean isCaughtUp() {
230+
return isCaughtUp.get();
214231
}
215232

216233
private VeniceProperties buildVeniceConfig() {

0 commit comments

Comments
 (0)