14
14
import io .tehuti .metrics .MetricsRepository ;
15
15
import java .util .Map ;
16
16
import java .util .Optional ;
17
+ import org .apache .avro .Schema ;
18
+ import org .apache .avro .specific .SpecificRecord ;
17
19
import org .apache .commons .lang .StringUtils ;
18
20
19
21
@@ -64,14 +66,17 @@ public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeNam
64
66
* Creates a VeniceChangelogConsumer with consumer id. This is used to create multiple consumers so that
65
67
* each consumer can only subscribe to certain partitions. Multiple such consumers can work in parallel.
66
68
*/
67
- public <K , V > VeniceChangelogConsumer <K , V > getChangelogConsumer (String storeName , String consumerId , Class clazz ) {
68
- return getChangelogConsumer (storeName , consumerId , clazz , globalChangelogClientConfig .getViewName ());
69
+ public <K , V > VeniceChangelogConsumer <K , V > getChangelogConsumer (
70
+ String storeName ,
71
+ String consumerId ,
72
+ Class <V > valueClass ) {
73
+ return getChangelogConsumer (storeName , consumerId , valueClass , globalChangelogClientConfig .getViewName ());
69
74
}
70
75
71
76
public <K , V > VeniceChangelogConsumer <K , V > getChangelogConsumer (
72
77
String storeName ,
73
78
String consumerId ,
74
- Class clazz ,
79
+ Class < V > valueClass ,
75
80
String viewNameOverride ) {
76
81
String adjustedConsumerId ;
77
82
if (!StringUtils .isEmpty (viewNameOverride )) {
@@ -85,7 +90,7 @@ public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(
85
90
}
86
91
return storeClientMap .computeIfAbsent (suffixConsumerIdToStore (storeName , adjustedConsumerId ), name -> {
87
92
ChangelogClientConfig newStoreChangelogClientConfig =
88
- getNewStoreChangelogClientConfig (storeName ).setSpecificValue (clazz );
93
+ getNewStoreChangelogClientConfig (storeName ).setSpecificValue (valueClass );
89
94
newStoreChangelogClientConfig .setConsumerName (name );
90
95
newStoreChangelogClientConfig .setViewName (viewNameOverride );
91
96
String viewClass = getViewClass (newStoreChangelogClientConfig , storeName );
@@ -114,16 +119,22 @@ public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangel
114
119
}
115
120
116
121
/**
117
- * Creates a BootstrappingVeniceChangelogConsumer with consumer id. This is used to create multiple
118
- * consumers so that each consumer can only subscribe to certain partitions.
122
+ * Use this if you're using the experimental client
123
+ * @param keyClass The {@link SpecificRecord} class for your key
124
+ * @param valueClass The {@link SpecificRecord} class for your value
125
+ * @param valueSchema The {@link Schema} for your values
119
126
*/
120
127
public <K , V > BootstrappingVeniceChangelogConsumer <K , V > getBootstrappingChangelogConsumer (
121
128
String storeName ,
122
129
String consumerId ,
123
- Class clazz ) {
130
+ Class <K > keyClass ,
131
+ Class <V > valueClass ,
132
+ Schema valueSchema ) {
124
133
return storeBootstrappingClientMap .computeIfAbsent (suffixConsumerIdToStore (storeName , consumerId ), name -> {
125
134
ChangelogClientConfig newStoreChangelogClientConfig =
126
- getNewStoreChangelogClientConfig (storeName ).setSpecificValue (clazz );
135
+ getNewStoreChangelogClientConfig (storeName ).setSpecificKey (keyClass )
136
+ .setSpecificValue (valueClass )
137
+ .setSpecificValueSchema (valueSchema );
127
138
String viewClass = getViewClass (newStoreChangelogClientConfig , storeName );
128
139
String consumerName = suffixConsumerIdToStore (storeName + "-" + viewClass .getClass ().getSimpleName (), consumerId );
129
140
@@ -141,6 +152,17 @@ public <K, V> BootstrappingVeniceChangelogConsumer<K, V> getBootstrappingChangel
141
152
});
142
153
}
143
154
155
+ /**
156
+ * Creates a BootstrappingVeniceChangelogConsumer with consumer id. This is used to create multiple
157
+ * consumers so that each consumer can only subscribe to certain partitions.
158
+ */
159
+ public <K , V > BootstrappingVeniceChangelogConsumer <K , V > getBootstrappingChangelogConsumer (
160
+ String storeName ,
161
+ String consumerId ,
162
+ Class <V > valueClass ) {
163
+ return getBootstrappingChangelogConsumer (storeName , consumerId , null , valueClass , null );
164
+ }
165
+
144
166
public <K , V > BootstrappingVeniceChangelogConsumer <K , V > getBootstrappingChangelogConsumer (
145
167
String storeName ,
146
168
String consumerId ) {
0 commit comments