Skip to content

Commit 5f6b97f

Browse files
authored
[cc] Add the ability to create a Specific DVRT CDC client from VeniceChangelogConsumerClientFactory (#1818)
Problem Statement Internally, our users don't have access to the ChangelogClientConfig, so they're unable to enable SpecificRecord support for DVRT CDC. Solution Create a new getter in VeniceChangelogConsumerClientFactory that allows DVRT CDC users to enable the use of SpecificRecord.
1 parent 615a5de commit 5f6b97f

File tree

3 files changed

+49
-27
lines changed

3 files changed

+49
-27
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactory.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import io.tehuti.metrics.MetricsRepository;
1515
import java.util.Map;
1616
import java.util.Optional;
17+
import org.apache.avro.Schema;
18+
import org.apache.avro.specific.SpecificRecord;
1719
import org.apache.commons.lang.StringUtils;
1820

1921

@@ -64,14 +66,17 @@ public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeNam
6466
* Creates a VeniceChangelogConsumer with consumer id. This is used to create multiple consumers so that
6567
* each consumer can only subscribe to certain partitions. Multiple such consumers can work in parallel.
6668
*/
67-
public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeName, String consumerId, Class clazz) {
68-
return getChangelogConsumer(storeName, consumerId, clazz, globalChangelogClientConfig.getViewName());
69+
public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(
70+
String storeName,
71+
String consumerId,
72+
Class<V> valueClass) {
73+
return getChangelogConsumer(storeName, consumerId, valueClass, globalChangelogClientConfig.getViewName());
6974
}
7075

7176
public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(
7277
String storeName,
7378
String consumerId,
74-
Class clazz,
79+
Class<V> valueClass,
7580
String viewNameOverride) {
7681
String adjustedConsumerId;
7782
if (!StringUtils.isEmpty(viewNameOverride)) {
@@ -85,7 +90,7 @@ public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(
8590
}
8691
return storeClientMap.computeIfAbsent(suffixConsumerIdToStore(storeName, adjustedConsumerId), name -> {
8792
ChangelogClientConfig newStoreChangelogClientConfig =
88-
getNewStoreChangelogClientConfig(storeName).setSpecificValue(clazz);
93+
getNewStoreChangelogClientConfig(storeName).setSpecificValue(valueClass);
8994
newStoreChangelogClientConfig.setConsumerName(name);
9095
newStoreChangelogClientConfig.setViewName(viewNameOverride);
9196
String viewClass = getViewClass(newStoreChangelogClientConfig, storeName);
@@ -114,16 +119,22 @@ public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangel
114119
}
115120

116121
/**
117-
* Creates a BootstrappingVeniceChangelogConsumer with consumer id. This is used to create multiple
118-
* consumers so that each consumer can only subscribe to certain partitions.
122+
* Use this if you're using the experimental client
123+
* @param keyClass The {@link SpecificRecord} class for your key
124+
* @param valueClass The {@link SpecificRecord} class for your value
125+
* @param valueSchema The {@link Schema} for your values
119126
*/
120127
public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangelogConsumer(
121128
String storeName,
122129
String consumerId,
123-
Class clazz) {
130+
Class<K> keyClass,
131+
Class<V> valueClass,
132+
Schema valueSchema) {
124133
return storeBootstrappingClientMap.computeIfAbsent(suffixConsumerIdToStore(storeName, consumerId), name -> {
125134
ChangelogClientConfig newStoreChangelogClientConfig =
126-
getNewStoreChangelogClientConfig(storeName).setSpecificValue(clazz);
135+
getNewStoreChangelogClientConfig(storeName).setSpecificKey(keyClass)
136+
.setSpecificValue(valueClass)
137+
.setSpecificValueSchema(valueSchema);
127138
String viewClass = getViewClass(newStoreChangelogClientConfig, storeName);
128139
String consumerName = suffixConsumerIdToStore(storeName + "-" + viewClass.getClass().getSimpleName(), consumerId);
129140

@@ -141,6 +152,17 @@ public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangel
141152
});
142153
}
143154

155+
/**
156+
* Creates a BootstrappingVeniceChangelogConsumer with consumer id. This is used to create multiple
157+
* consumers so that each consumer can only subscribe to certain partitions.
158+
*/
159+
public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangelogConsumer(
160+
String storeName,
161+
String consumerId,
162+
Class<V> valueClass) {
163+
return getBootstrappingChangelogConsumer(storeName, consumerId, null, valueClass, null);
164+
}
165+
144166
public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangelogConsumer(
145167
String storeName,
146168
String consumerId) {

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ChangelogConsumerDaVinciRecordTransformerUserApp.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,19 +120,17 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
120120
.setD2Client(d2Client)
121121
.setIsExperimentalClientEnabled(true);
122122

123-
if (useSpecificRecord) {
124-
globalChangelogClientConfig.setSpecificKey(TestChangelogKey.class)
125-
.setSpecificValue(TestChangelogValue.class)
126-
.setSpecificValueSchema(TestChangelogValue.SCHEMA$);
127-
}
128-
129123
VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory =
130124
new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository);
131125
BootstrappingVeniceChangelogConsumer bootstrappingVeniceChangelogConsumer;
132126

133127
if (useSpecificRecord) {
134-
bootstrappingVeniceChangelogConsumer = veniceChangelogConsumerClientFactory
135-
.getBootstrappingChangelogConsumer(storeName, Integer.toString(0), TestChangelogValue.class);
128+
bootstrappingVeniceChangelogConsumer = veniceChangelogConsumerClientFactory.getBootstrappingChangelogConsumer(
129+
storeName,
130+
Integer.toString(0),
131+
TestChangelogKey.class,
132+
TestChangelogValue.class,
133+
TestChangelogValue.SCHEMA$);
136134
} else {
137135
bootstrappingVeniceChangelogConsumer =
138136
veniceChangelogConsumerClientFactory.getBootstrappingChangelogConsumer(storeName, Integer.toString(0));

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestBootstrappingChangelogConsumer.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -557,16 +557,17 @@ public void testSpecificRecordVeniceChangelogConsumerDaVinciRecordTransformerImp
557557
.setControllerRequestRetryCount(3)
558558
.setBootstrapFileSystemPath(inputDirPath)
559559
.setIsExperimentalClientEnabled(true)
560-
.setSpecificKey(TestChangelogKey.class)
561-
.setSpecificValue(TestChangelogValue.class)
562-
.setSpecificValueSchema(TestChangelogValue.SCHEMA$)
563560
.setD2Client(d2Client);
564561
VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory =
565562
new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository);
566563
List<BootstrappingVeniceChangelogConsumer<TestChangelogKey, TestChangelogValue>> bootstrappingVeniceChangelogConsumerList =
567564
Collections.singletonList(
568-
veniceChangelogConsumerClientFactory
569-
.getBootstrappingChangelogConsumer(storeName, Integer.toString(0), TestChangelogValue.class));
565+
veniceChangelogConsumerClientFactory.getBootstrappingChangelogConsumer(
566+
storeName,
567+
Integer.toString(0),
568+
TestChangelogKey.class,
569+
TestChangelogValue.class,
570+
TestChangelogValue.SCHEMA$));
570571

571572
try (VeniceSystemProducer veniceProducer =
572573
IntegrationTestPushUtils.getSamzaProducer(clusterWrapper, storeName, Version.PushType.STREAM)) {
@@ -667,17 +668,18 @@ public void testSpecificRecordBlobTransferVeniceChangelogConsumerDaVinciRecordTr
667668
.setControllerRequestRetryCount(3)
668669
.setBootstrapFileSystemPath(inputDirPath1)
669670
.setD2Client(d2Client)
670-
.setIsExperimentalClientEnabled(true)
671-
.setSpecificKey(TestChangelogKey.class)
672-
.setSpecificValue(TestChangelogValue.class)
673-
.setSpecificValueSchema(TestChangelogValue.SCHEMA$);
671+
.setIsExperimentalClientEnabled(true);
674672

675673
VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory =
676674
new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository);
677675
List<BootstrappingVeniceChangelogConsumer<TestChangelogKey, TestChangelogValue>> bootstrappingVeniceChangelogConsumerList =
678676
Collections.singletonList(
679-
veniceChangelogConsumerClientFactory
680-
.getBootstrappingChangelogConsumer(storeName, Integer.toString(0), TestChangelogValue.class));
677+
veniceChangelogConsumerClientFactory.getBootstrappingChangelogConsumer(
678+
storeName,
679+
Integer.toString(0),
680+
TestChangelogKey.class,
681+
TestChangelogValue.class,
682+
TestChangelogValue.SCHEMA$));
681683

682684
try (VeniceSystemProducer veniceProducer =
683685
IntegrationTestPushUtils.getSamzaProducer(clusterWrapper, storeName, Version.PushType.STREAM)) {

0 commit comments

Comments
 (0)