Skip to content

Commit 6aaa1a3

Browse files
EisternDaniil Zulin
andauthored
#119: Support predefined consumers for CDC topics
Adding an option to specify a list of consumers that will be created alongside the CDC topic during the initial bootstrap of the application. A minor consideration is that TopicService/AlterTopic is not idempotent and will throw an exception if we specify already existing consumers in the addConsumer block. To address this behavior, we will perform a describe operation each time a table is attempted to be created. These changes have been tested against local YDB (ydbplatform/local-ydb:latest version). --------- Co-authored-by: Daniil Zulin <daniil-zulin@yandex-team.ru>
1 parent 3e9fdde commit 6aaa1a3

File tree

8 files changed

+185
-3
lines changed

8 files changed

+185
-3
lines changed

databind/src/main/java/tech/ydb/yoj/databind/schema/Changefeed.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@
4141
*/
4242
boolean initialScan() default false;
4343

44+
/**
45+
* Initial consumers of the changefeed
46+
*/
47+
Consumer[] consumers() default {};
48+
4449
enum Mode {
4550
/**
4651
* Only the key component of the modified row
@@ -71,4 +76,22 @@ enum Mode {
7176
enum Format {
7277
JSON
7378
}
79+
80+
@interface Consumer {
81+
String name();
82+
83+
Codec[] codecs() default {};
84+
85+
String readFrom() default "1970-01-01T00:00:00Z";
86+
87+
boolean important() default false;
88+
89+
enum Codec {
90+
RAW,
91+
GZIP,
92+
LZOP,
93+
ZSTD,
94+
CUSTOM
95+
}
96+
}
7497
}

databind/src/main/java/tech/ydb/yoj/databind/schema/Schema.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.lang.reflect.Constructor;
2424
import java.lang.reflect.Type;
2525
import java.time.Duration;
26+
import java.time.Instant;
2627
import java.util.ArrayList;
28+
import java.util.Arrays;
2729
import java.util.Collection;
2830
import java.util.HashSet;
2931
import java.util.LinkedHashMap;
@@ -245,13 +247,23 @@ private Changefeed changefeedFromAnnotation(@NonNull tech.ydb.yoj.databind.schem
245247
var retentionPeriod = Duration.parse(changefeed.retentionPeriod());
246248
Preconditions.checkArgument(!(retentionPeriod.isNegative() || retentionPeriod.isZero()),
247249
"RetentionPeriod value defined for %s must be positive", getType());
250+
List<Changefeed.Consumer> consumers = Arrays.stream(changefeed.consumers())
251+
.map(consumer -> new Changefeed.Consumer(
252+
consumer.name(),
253+
List.of(consumer.codecs()),
254+
Instant.parse(consumer.readFrom()),
255+
consumer.important()
256+
))
257+
.toList();
258+
248259
return new Changefeed(
249260
changefeed.name(),
250261
changefeed.mode(),
251262
changefeed.format(),
252263
changefeed.virtualTimestamps(),
253264
retentionPeriod,
254-
changefeed.initialScan()
265+
changefeed.initialScan(),
266+
consumers
255267
);
256268
}
257269

@@ -813,5 +825,22 @@ public static class Changefeed {
813825
Duration retentionPeriod;
814826

815827
boolean initialScan;
828+
829+
@NonNull
830+
List<Consumer> consumers;
831+
832+
@Value
833+
public static class Consumer {
834+
@NonNull
835+
String name;
836+
837+
@NonNull
838+
List<tech.ydb.yoj.databind.schema.Changefeed.Consumer.Codec> codecs;
839+
840+
@NonNull
841+
Instant readFrom;
842+
843+
boolean important;
844+
}
816845
}
817846
}

databind/src/test/java/tech/ydb/yoj/databind/schema/ChangefeedSchemaTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import org.junit.Test;
55

66
import java.time.Duration;
7+
import java.time.Instant;
8+
import java.util.List;
79

810
import static org.assertj.core.api.Assertions.assertThat;
911
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -25,6 +27,7 @@ public void testChangefeedDefaultsEntity() {
2527
assertThat(entitySchema.getChangefeeds().get(0).getRetentionPeriod()).isEqualTo(Duration.ofHours(24));
2628
assertThat(entitySchema.getChangefeeds().get(0).isVirtualTimestamps()).isFalse();
2729
assertThat(entitySchema.getChangefeeds().get(0).isInitialScan()).isFalse();
30+
assertThat(entitySchema.getChangefeeds().get(0).getConsumers()).isEmpty();
2831
}
2932

3033
@Test
@@ -37,6 +40,38 @@ public void testConflictingChangefeedNameEntity() {
3740
assertThatThrownBy(() -> schemaOf(ConflictingChangefeedNameEntity.class));
3841
}
3942

43+
@Test
44+
public void testPredefinedConsumersChangefeedEntity() {
45+
var entitySchema = schemaOf(PredefinedConsumersChangefeedEntity.class);
46+
47+
Schema.Changefeed expectedChangefeed = new Schema.Changefeed(
48+
"feed1",
49+
Changefeed.Mode.NEW_IMAGE,
50+
Changefeed.Format.JSON,
51+
false,
52+
Duration.ofHours(24),
53+
false,
54+
List.of(
55+
new Schema.Changefeed.Consumer(
56+
"consumer1",
57+
List.of(),
58+
Instant.EPOCH,
59+
false
60+
),
61+
new Schema.Changefeed.Consumer(
62+
"consumer2",
63+
List.of(Changefeed.Consumer.Codec.RAW),
64+
Instant.parse("2020-01-01T00:00:00Z"),
65+
true
66+
)
67+
)
68+
);
69+
70+
assertThat(entitySchema.getChangefeeds())
71+
.singleElement()
72+
.isEqualTo(expectedChangefeed);
73+
}
74+
4075
private static <T> Schema<T> schemaOf(Class<T> entityType) {
4176
return new TestSchema<>(entityType);
4277
}
@@ -74,4 +109,19 @@ private static class ConflictingChangefeedNameEntity {
74109
int field1;
75110
int field2;
76111
}
112+
113+
@Value
114+
@Changefeed(name = "feed1", consumers = {
115+
@Changefeed.Consumer(name = "consumer1"),
116+
@Changefeed.Consumer(
117+
name = "consumer2",
118+
readFrom = "2020-01-01T00:00:00Z",
119+
codecs = {Changefeed.Consumer.Codec.RAW},
120+
important = true
121+
)
122+
})
123+
private static class PredefinedConsumersChangefeedEntity {
124+
int field1;
125+
int field2;
126+
}
77127
}

repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/ChangefeedEntity.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,16 @@
1616
format = JSON,
1717
virtualTimestamps = true,
1818
retentionPeriod = "PT1H",
19-
initialScan = false /* otherwise YDB is "overloaded" during YdbRepositoryIntegrationTest */
19+
initialScan = false, /* otherwise YDB is "overloaded" during YdbRepositoryIntegrationTest */
20+
consumers = {
21+
@Changefeed.Consumer(name = "test-consumer1"),
22+
@Changefeed.Consumer(
23+
name = "test-consumer2",
24+
readFrom = "2025-01-21T08:01:25+00:00",
25+
codecs = {Changefeed.Consumer.Codec.RAW},
26+
important = true
27+
)
28+
}
2029
)
2130
@Changefeed(name = "test-changefeed2")
2231
public class ChangefeedEntity implements Entity<ChangefeedEntity> {

repository-ydb-v2/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ java_library(
2525
"@java_contribs_stable//:tech_ydb_ydb_sdk_core",
2626
"@java_contribs_stable//:tech_ydb_ydb_sdk_scheme",
2727
"@java_contribs_stable//:tech_ydb_ydb_sdk_table",
28+
"@java_contribs_stable//:tech_ydb_ydb_sdk_topic",
2829
],
2930
)
3031

@@ -65,5 +66,6 @@ java_test_suite(
6566
"@java_contribs_stable//:tech_ydb_ydb_sdk_core",
6667
"@java_contribs_stable//:tech_ydb_ydb_sdk_scheme",
6768
"@java_contribs_stable//:tech_ydb_ydb_sdk_table",
69+
"@java_contribs_stable//:tech_ydb_ydb_sdk_topic",
6870
],
6971
)

repository-ydb-v2/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@
3838
<groupId>tech.ydb</groupId>
3939
<artifactId>ydb-sdk-table</artifactId>
4040
</dependency>
41+
<dependency>
42+
<groupId>tech.ydb</groupId>
43+
<artifactId>ydb-sdk-topic</artifactId>
44+
<exclusions>
45+
<exclusion>
46+
<groupId>com.google.code.findbugs</groupId>
47+
<artifactId>annotations</artifactId>
48+
</exclusion>
49+
</exclusions>
50+
</dependency>
4151
<dependency>
4252
<groupId>tech.ydb</groupId>
4353
<artifactId>ydb-sdk-scheme</artifactId>

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbSchemaOperations.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package tech.ydb.yoj.repository.ydb.client;
22

3+
import com.google.common.collect.Sets;
34
import lombok.Getter;
45
import lombok.NonNull;
56
import lombok.RequiredArgsConstructor;
@@ -26,6 +27,11 @@
2627
import tech.ydb.table.settings.PartitioningSettings;
2728
import tech.ydb.table.settings.TtlSettings;
2829
import tech.ydb.table.values.Type;
30+
import tech.ydb.topic.TopicClient;
31+
import tech.ydb.topic.description.Consumer;
32+
import tech.ydb.topic.description.TopicDescription;
33+
import tech.ydb.topic.settings.AlterTopicSettings;
34+
import tech.ydb.yoj.databind.schema.Changefeed.Consumer.Codec;
2935
import tech.ydb.yoj.databind.schema.Schema;
3036
import tech.ydb.yoj.repository.db.EntitySchema;
3137
import tech.ydb.yoj.repository.db.exception.CreateTableException;
@@ -38,11 +44,14 @@
3844

3945
import java.util.ArrayList;
4046
import java.util.List;
47+
import java.util.Map;
4148
import java.util.Set;
49+
import java.util.function.Function;
4250
import java.util.stream.Stream;
4351

4452
import static com.google.common.base.Strings.isNullOrEmpty;
4553
import static java.util.stream.Collectors.toList;
54+
import static java.util.stream.Collectors.toMap;
4655
import static java.util.stream.Collectors.toSet;
4756
import static lombok.AccessLevel.PRIVATE;
4857
import static tech.ydb.core.StatusCode.SCHEME_ERROR;
@@ -53,12 +62,14 @@ public class YdbSchemaOperations {
5362

5463
private final SessionManager sessionManager;
5564
private final SchemeClient schemeClient;
65+
private final TopicClient topicClient;
5666
private String tablespace;
5767

5868
public YdbSchemaOperations(String tablespace, @NonNull SessionManager sessionManager, GrpcTransport transport) {
5969
this.tablespace = YdbPaths.canonicalTablespace(tablespace);
6070
this.sessionManager = sessionManager;
6171
this.schemeClient = SchemeClient.newClient(transport).build();
72+
this.topicClient = TopicClient.newClient(transport).build();
6273
}
6374

6475
public void setTablespace(String tablespace) {
@@ -81,7 +92,7 @@ public void createTable(String name, List<EntitySchema.JavaField> columns, List<
8192
columns.forEach(c -> {
8293
ValueProtos.Type.PrimitiveTypeId yqlType = YqlPrimitiveType.of(c).getYqlType();
8394
int yqlTypeNumber = yqlType.getNumber();
84-
ValueProtos.Type.PrimitiveTypeId primitiveTypeId = Stream.of(ValueProtos.Type.PrimitiveTypeId.values())
95+
Stream.of(ValueProtos.Type.PrimitiveTypeId.values())
8596
.filter(id -> id.getNumber() == yqlTypeNumber)
8697
.findFirst()
8798
.orElseThrow(() -> new CreateTableException(String.format("Can't create table '%s'%n"
@@ -149,6 +160,46 @@ public void createTable(String name, List<EntitySchema.JavaField> columns, List<
149160
if (status.getCode() != tech.ydb.core.StatusCode.SUCCESS) {
150161
throw new CreateTableException(String.format("Can't alter table %s: %s", name, status));
151162
}
163+
164+
if (changefeed.getConsumers().isEmpty()) {
165+
continue;
166+
}
167+
168+
String changeFeedTopicPath = YdbPaths.join(tablespace + name, changefeed.getName());
169+
Result<TopicDescription> result = topicClient.describeTopic(changeFeedTopicPath).join();
170+
if (result.getStatus().getCode() != tech.ydb.core.StatusCode.SUCCESS) {
171+
throw new CreateTableException(String.format("Can't describe CDC topic %s: %s", changeFeedTopicPath, result.getStatus()));
172+
}
173+
174+
Set<String> existingConsumerNames = result.getValue().getConsumers().stream()
175+
.map(Consumer::getName)
176+
.collect(toSet());
177+
178+
Map<String, Schema.Changefeed.Consumer> specifiedConsumers = changefeed.getConsumers().stream()
179+
.collect(toMap(Schema.Changefeed.Consumer::getName, Function.identity()));
180+
181+
Set<String> addedConsumers = Sets.difference(specifiedConsumers.keySet(), existingConsumerNames);
182+
183+
AlterTopicSettings.Builder addConsumersRequest = AlterTopicSettings.newBuilder();
184+
for (String addedConsumer : addedConsumers) {
185+
Schema.Changefeed.Consumer consumer = specifiedConsumers.get(addedConsumer);
186+
Consumer.Builder consumerConfiguration = Consumer.newBuilder()
187+
.setName(consumer.getName())
188+
.setImportant(consumer.isImportant())
189+
.setReadFrom(consumer.getReadFrom());
190+
191+
for (Codec consumerCodec : consumer.getCodecs()) {
192+
consumerConfiguration.addSupportedCodec(
193+
tech.ydb.topic.description.Codec.valueOf(consumerCodec.name())
194+
);
195+
}
196+
197+
addConsumersRequest.addAddConsumer(consumerConfiguration.build());
198+
}
199+
status = topicClient.alterTopic(changeFeedTopicPath, addConsumersRequest.build()).join();
200+
if (status.getCode() != tech.ydb.core.StatusCode.SUCCESS) {
201+
throw new CreateTableException(String.format("Can't alter CDC topic %s: %s", changeFeedTopicPath, status));
202+
}
152203
}
153204
}
154205
} finally {

repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc;
2929
import tech.ydb.proto.scheme.v1.SchemeServiceGrpc;
3030
import tech.ydb.proto.table.v1.TableServiceGrpc;
31+
import tech.ydb.proto.topic.v1.TopicServiceGrpc;
3132
import tech.ydb.table.Session;
3233
import tech.ydb.table.SessionPoolStats;
3334
import tech.ydb.table.TableClient;
@@ -151,6 +152,7 @@ private YdbConfig getProxyServerConfig() {
151152
.addService(new ProxyYdbTableService(channel))
152153
.addService(proxyDiscoveryService)
153154
.addService(new DelegateSchemeServiceImplBase(SchemeServiceGrpc.newStub(channel)))
155+
.addService(new DelegateTopicServiceImplBase(TopicServiceGrpc.newStub(channel)))
154156
.build();
155157
proxyServer.start();
156158
Runtime.getRuntime().addShutdownHook(new Thread(proxyServer::shutdown));
@@ -1023,6 +1025,12 @@ private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.Sch
10231025
final SchemeServiceGrpc.SchemeServiceStub schemeServiceStub;
10241026
}
10251027

1028+
@AllArgsConstructor
1029+
private static class DelegateTopicServiceImplBase extends TopicServiceGrpc.TopicServiceImplBase {
1030+
@Delegate
1031+
final TopicServiceGrpc.TopicServiceStub topicServiceStub;
1032+
}
1033+
10261034
private static class ProxyDiscoveryService extends DiscoveryServiceGrpc.DiscoveryServiceImplBase {
10271035
@Delegate(excludes = ProxyDiscoveryService.OverriddenMethod.class)
10281036
DiscoveryServiceGrpc.DiscoveryServiceStub stub;

0 commit comments

Comments
 (0)