|
20 | 20 | import org.junit.ClassRule;
|
21 | 21 | import org.junit.Test;
|
22 | 22 | import tech.ydb.common.transaction.TxMode;
|
| 23 | +import tech.ydb.common.transaction.YdbTransaction; |
23 | 24 | import tech.ydb.core.grpc.YdbHeaders;
|
24 | 25 | import tech.ydb.core.utils.Version;
|
25 | 26 | import tech.ydb.proto.OperationProtos;
|
|
30 | 31 | import tech.ydb.proto.table.v1.TableServiceGrpc;
|
31 | 32 | import tech.ydb.proto.topic.v1.TopicServiceGrpc;
|
32 | 33 | import tech.ydb.table.Session;
|
| 34 | +import tech.ydb.topic.TopicClient; |
| 35 | +import tech.ydb.topic.description.Consumer; |
| 36 | +import tech.ydb.topic.settings.AutoPartitioningStrategy; |
| 37 | +import tech.ydb.topic.settings.CreateTopicSettings; |
| 38 | +import tech.ydb.topic.settings.PartitioningSettings; |
| 39 | +import tech.ydb.topic.settings.ReaderSettings; |
| 40 | +import tech.ydb.topic.settings.ReceiveSettings; |
| 41 | +import tech.ydb.topic.settings.SendSettings; |
| 42 | +import tech.ydb.topic.settings.TopicReadSettings; |
| 43 | +import tech.ydb.topic.settings.WriterSettings; |
33 | 44 | import tech.ydb.yoj.databind.schema.Column;
|
34 | 45 | import tech.ydb.yoj.databind.schema.ObjectSchema;
|
35 | 46 | import tech.ydb.yoj.repository.db.EntitySchema;
|
|
41 | 52 | import tech.ydb.yoj.repository.db.TableDescriptor;
|
42 | 53 | import tech.ydb.yoj.repository.db.Tx;
|
43 | 54 | import tech.ydb.yoj.repository.db.bulk.BulkParams;
|
| 55 | +import tech.ydb.yoj.repository.db.common.CommonConverters; |
44 | 56 | import tech.ydb.yoj.repository.db.exception.ConversionException;
|
45 | 57 | import tech.ydb.yoj.repository.db.exception.RetryableException;
|
46 | 58 | import tech.ydb.yoj.repository.db.exception.UnavailableException;
|
|
63 | 75 | import tech.ydb.yoj.repository.ydb.client.SessionManager;
|
64 | 76 | import tech.ydb.yoj.repository.ydb.compatibility.YdbSchemaCompatibilityChecker;
|
65 | 77 | import tech.ydb.yoj.repository.ydb.exception.ResultTruncatedException;
|
| 78 | +import tech.ydb.yoj.repository.ydb.exception.YdbOverloadedException; |
66 | 79 | import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
|
67 | 80 | import tech.ydb.yoj.repository.ydb.model.EntityChangeTtl;
|
68 | 81 | import tech.ydb.yoj.repository.ydb.model.EntityDropTtl;
|
|
88 | 101 | import java.util.Map;
|
89 | 102 | import java.util.Objects;
|
90 | 103 | import java.util.Set;
|
| 104 | +import java.util.UUID; |
| 105 | +import java.util.concurrent.TimeUnit; |
| 106 | +import java.util.concurrent.atomic.AtomicBoolean; |
91 | 107 | import java.util.stream.Collectors;
|
92 | 108 | import java.util.stream.IntStream;
|
93 | 109 | import java.util.stream.Stream;
|
94 | 110 |
|
| 111 | +import static java.nio.charset.StandardCharsets.UTF_8; |
95 | 112 | import static org.assertj.core.api.Assertions.assertThat;
|
96 | 113 | import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
|
97 | 114 | import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
|
@@ -1043,6 +1060,205 @@ public void queryStatsCollectionMode() {
|
1043 | 1060 | assertThat(found).hasSize(4);
|
1044 | 1061 | }
|
1045 | 1062 |
|
| 1063 | + @Test |
| 1064 | + public void transactionalTopicWrites() { |
| 1065 | + var uuid = UUID.randomUUID(); |
| 1066 | + var topic = "transactionalTopic-" + uuid; |
| 1067 | + var producer = "topicProducer-" + uuid; |
| 1068 | + var consumer = "topicConsumer-" + uuid; |
| 1069 | + var reader = "topicReader-" + uuid; |
| 1070 | + |
| 1071 | + var grpcTransport = ydbEnvAndTransport.getGrpcTransport(); |
| 1072 | + var topicPath = grpcTransport.getDatabase() + "/" + topic; |
| 1073 | + |
| 1074 | + try (var topicClient = TopicClient.newClient(grpcTransport).build()) { |
| 1075 | + topicClient.createTopic(topic, CreateTopicSettings.newBuilder() |
| 1076 | + .addConsumer(Consumer.newBuilder().setName(consumer).build()) |
| 1077 | + .setPartitioningSettings(PartitioningSettings.newBuilder() |
| 1078 | + .setAutoPartitioningStrategy(AutoPartitioningStrategy.DISABLED) |
| 1079 | + .setMinActivePartitions(1) |
| 1080 | + .setPartitionCountLimit(1) |
| 1081 | + .build()) |
| 1082 | + .build() |
| 1083 | + ).join().expectSuccess("can't create a new topic"); |
| 1084 | + |
| 1085 | + var project = new Project(new Project.Id("topic-project"), "topic-project"); |
| 1086 | + var data = CommonConverters.serializeOpaqueObjectValue(Project.class, project).getBytes(UTF_8); |
| 1087 | + |
| 1088 | + try { |
| 1089 | + db.immediateWrites().tx(() -> { |
| 1090 | + db.projects().save(project); |
| 1091 | + |
| 1092 | + var transaction = (YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction(); |
| 1093 | + var sdkTransaction = transaction.toSdkTransaction(); |
| 1094 | + write(topicClient, topicPath, producer, data, 5, sdkTransaction); |
| 1095 | + }); |
| 1096 | + |
| 1097 | + var messages = readAll(topicClient, topicPath, consumer, reader); |
| 1098 | + assertThat(messages) |
| 1099 | + .hasSize(5) |
| 1100 | + .allSatisfy(msg -> assertThat(msg.getData()).isEqualTo(data)); |
| 1101 | + } finally { |
| 1102 | + topicClient.dropTopic(topic).join(); |
| 1103 | + } |
| 1104 | + } |
| 1105 | + } |
| 1106 | + |
| 1107 | + @Test |
| 1108 | + public void transactionalTopicWritesRetryThenCommit() { |
| 1109 | + var uuid = UUID.randomUUID(); |
| 1110 | + var topic = "transactionalTopic-" + uuid; |
| 1111 | + var producer = "topicProducer-" + uuid; |
| 1112 | + var consumer = "topicConsumer-" + uuid; |
| 1113 | + var reader = "topicReader-" + uuid; |
| 1114 | + |
| 1115 | + var grpcTransport = ydbEnvAndTransport.getGrpcTransport(); |
| 1116 | + var topicPath = grpcTransport.getDatabase() + "/" + topic; |
| 1117 | + |
| 1118 | + try (var topicClient = TopicClient.newClient(grpcTransport).build()) { |
| 1119 | + topicClient.createTopic(topic, CreateTopicSettings.newBuilder() |
| 1120 | + .addConsumer(Consumer.newBuilder().setName(consumer).build()) |
| 1121 | + .setPartitioningSettings(PartitioningSettings.newBuilder() |
| 1122 | + .setAutoPartitioningStrategy(AutoPartitioningStrategy.DISABLED) |
| 1123 | + .setMinActivePartitions(1) |
| 1124 | + .setPartitionCountLimit(1) |
| 1125 | + .build()) |
| 1126 | + .build() |
| 1127 | + ).join().expectSuccess("can't create a new topic"); |
| 1128 | + |
| 1129 | + var project = new Project(new Project.Id("topic-project"), "topic-project"); |
| 1130 | + var data = CommonConverters.serializeOpaqueObjectValue(Project.class, project).getBytes(UTF_8); |
| 1131 | + |
| 1132 | + var retried = new AtomicBoolean(); |
| 1133 | + try { |
| 1134 | + db.immediateWrites().tx(() -> { |
| 1135 | + db.projects().save(project); |
| 1136 | + |
| 1137 | + var transaction = (YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction(); |
| 1138 | + var sdkTransaction = transaction.toSdkTransaction(); |
| 1139 | + write(topicClient, topicPath, producer, data, 1, sdkTransaction); |
| 1140 | + |
| 1141 | + if (!retried.getAndSet(true)) { |
| 1142 | + throw new YdbOverloadedException("xxx", "yyy"); |
| 1143 | + } |
| 1144 | + }); |
| 1145 | + |
| 1146 | + var messages = readAll(topicClient, topicPath, consumer, reader); |
| 1147 | + assertThat(messages) |
| 1148 | + .singleElement() |
| 1149 | + .satisfies(msg -> assertThat(msg.getData()).isEqualTo(data)); |
| 1150 | + } finally { |
| 1151 | + topicClient.dropTopic(topic).join(); |
| 1152 | + } |
| 1153 | + } |
| 1154 | + } |
| 1155 | + |
| 1156 | + @Test |
| 1157 | + public void transactionalTopicWritesRollbackReadNothing() { |
| 1158 | + var uuid = UUID.randomUUID(); |
| 1159 | + var topic = "transactionalTopic-" + uuid; |
| 1160 | + var producer = "topicProducer-" + uuid; |
| 1161 | + var consumer = "topicConsumer-" + uuid; |
| 1162 | + var reader = "topicReader-" + uuid; |
| 1163 | + |
| 1164 | + var grpcTransport = ydbEnvAndTransport.getGrpcTransport(); |
| 1165 | + var topicPath = grpcTransport.getDatabase() + "/" + topic; |
| 1166 | + |
| 1167 | + try (var topicClient = TopicClient.newClient(grpcTransport).build()) { |
| 1168 | + topicClient.createTopic(topic, CreateTopicSettings.newBuilder() |
| 1169 | + .addConsumer(Consumer.newBuilder().setName(consumer).build()) |
| 1170 | + .setPartitioningSettings(PartitioningSettings.newBuilder() |
| 1171 | + .setAutoPartitioningStrategy(AutoPartitioningStrategy.DISABLED) |
| 1172 | + .setMinActivePartitions(1) |
| 1173 | + .setPartitionCountLimit(1) |
| 1174 | + .build()) |
| 1175 | + .build() |
| 1176 | + ).join().expectSuccess("can't create a new topic"); |
| 1177 | + |
| 1178 | + var project = new Project(new Project.Id("topic-project"), "topic-project"); |
| 1179 | + var data = CommonConverters.serializeOpaqueObjectValue(Project.class, project).getBytes(UTF_8); |
| 1180 | + |
| 1181 | + try { |
| 1182 | + assertThatIllegalStateException().isThrownBy(() -> db.immediateWrites().tx(() -> { |
| 1183 | + db.projects().save(project); |
| 1184 | + |
| 1185 | + var transaction = (YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction(); |
| 1186 | + var sdkTransaction = transaction.toSdkTransaction(); |
| 1187 | + write(topicClient, topicPath, producer, data, 1, sdkTransaction); |
| 1188 | + |
| 1189 | + throw new IllegalStateException("I don't feel like it"); |
| 1190 | + })); |
| 1191 | + |
| 1192 | + var messages = readAll(topicClient, topicPath, consumer, reader); |
| 1193 | + assertThat(messages).isEmpty(); |
| 1194 | + } finally { |
| 1195 | + topicClient.dropTopic(topic).join(); |
| 1196 | + } |
| 1197 | + } |
| 1198 | + } |
| 1199 | + |
| 1200 | + private List<tech.ydb.topic.read.Message> readAll(TopicClient topicClient, String topicPath, String consumer, String reader) { |
| 1201 | + var syncReader = topicClient.createSyncReader(ReaderSettings.newBuilder() |
| 1202 | + .setTopics( |
| 1203 | + List.of(TopicReadSettings.newBuilder() |
| 1204 | + .setPath(topicPath) |
| 1205 | + .setPartitionIds(List.of(0L)) |
| 1206 | + .build()) |
| 1207 | + ) |
| 1208 | + .setConsumerName(consumer) |
| 1209 | + .setReaderName(reader) |
| 1210 | + .build()); |
| 1211 | + try { |
| 1212 | + syncReader.initAndWait(); |
| 1213 | + |
| 1214 | + List<tech.ydb.topic.read.Message> messages = new ArrayList<>(); |
| 1215 | + while (true) { |
| 1216 | + var msg = syncReader.receive(ReceiveSettings.newBuilder() |
| 1217 | + .setTimeout(250, TimeUnit.MILLISECONDS) |
| 1218 | + .build()); |
| 1219 | + if (msg == null) { |
| 1220 | + break; |
| 1221 | + } |
| 1222 | + messages.add(msg); |
| 1223 | + msg.commit().join(); |
| 1224 | + } |
| 1225 | + |
| 1226 | + return messages; |
| 1227 | + } catch (InterruptedException e) { |
| 1228 | + throw new RuntimeException(e); |
| 1229 | + } finally { |
| 1230 | + syncReader.shutdown(); |
| 1231 | + } |
| 1232 | + } |
| 1233 | + |
| 1234 | + private void write(TopicClient topicClient, String topicPath, String producer, |
| 1235 | + byte[] data, int nCopies, YdbTransaction transaction) { |
| 1236 | + var syncWriter = topicClient.createSyncWriter( |
| 1237 | + WriterSettings.newBuilder() |
| 1238 | + .setTopicPath(topicPath) |
| 1239 | + .setProducerId(producer) |
| 1240 | + .setPartitionId(0L) |
| 1241 | + .build() |
| 1242 | + ); |
| 1243 | + |
| 1244 | + try { |
| 1245 | + syncWriter.initAndWait(); |
| 1246 | + for (int i = 0; i < nCopies; i++) { |
| 1247 | + syncWriter.send( |
| 1248 | + tech.ydb.topic.write.Message.of(data), |
| 1249 | + SendSettings.newBuilder().setTransaction(transaction).build() |
| 1250 | + ); |
| 1251 | + } |
| 1252 | + syncWriter.flush(); |
| 1253 | + } finally { |
| 1254 | + try { |
| 1255 | + syncWriter.shutdown(30_000, TimeUnit.MILLISECONDS); |
| 1256 | + } catch (Exception e) { |
| 1257 | + throw new RuntimeException(e); |
| 1258 | + } |
| 1259 | + } |
| 1260 | + } |
| 1261 | + |
1046 | 1262 | @AllArgsConstructor
|
1047 | 1263 | private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.SchemeServiceImplBase {
|
1048 | 1264 | @Delegate
|
|
0 commit comments