Skip to content

Commit c4cea58

Browse files
committed
Add ability rewrite predefined codec.
Add tests for rewrite predefined codec. Remove ProtoUtils class
1 parent 2086142 commit c4cea58

File tree

9 files changed

+115
-115
lines changed

9 files changed

+115
-115
lines changed

topic/src/main/java/tech/ydb/topic/description/Codec.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,6 @@ public interface Codec {
4343
int GZIP = 2;
4444
int LZOP = 3;
4545
int ZSTD = 4;
46-
int CUSTOM = 10000;
47-
48-
/**
49-
* Check is codec is reserved
50-
*
51-
* @param codec codec id
52-
* @return true - codec id is reserved; false - elsewhere
53-
*/
54-
default boolean isReserved(int codec) {
55-
return codec <= CUSTOM;
56-
}
5746

5847
/**
5948
* Get codec identifier

topic/src/main/java/tech/ydb/topic/description/CodecRegistry.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ public Codec registerCodec(Codec codec) {
3939
assert codec != null;
4040
int codecId = codec.getId();
4141

42-
if (codec.isReserved(codecId)) {
43-
throw new RuntimeException(
44-
"Create custom codec for reserved code not allowed: " + codec + " .Use code more than 10000");
45-
}
4642
Codec result = customCodecMap.put(codecId, codec);
4743

4844
if (result != null) {

topic/src/main/java/tech/ydb/topic/description/Consumer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import java.util.HashMap;
66
import java.util.List;
77
import java.util.Map;
8-
import java.util.stream.Collectors;
98

109
import javax.annotation.Nonnull;
1110
import javax.annotation.Nullable;
@@ -14,7 +13,6 @@
1413

1514
import tech.ydb.core.utils.ProtobufUtils;
1615
import tech.ydb.proto.topic.YdbTopic;
17-
import tech.ydb.topic.utils.ProtoUtils;
1816

1917
/**
2018
* @author Nikolay Perfilov
@@ -40,8 +38,7 @@ public Consumer(YdbTopic.Consumer consumer) {
4038
this.name = consumer.getName();
4139
this.important = consumer.getImportant();
4240
this.readFrom = ProtobufUtils.protoToInstant(consumer.getReadFrom());
43-
this.supportedCodecs = consumer.getSupportedCodecs().getCodecsList()
44-
.stream().map(ProtoUtils::codecFromProto).collect(Collectors.toList());
41+
this.supportedCodecs = new ArrayList<>(consumer.getSupportedCodecs().getCodecsList());
4542
this.attributes = consumer.getAttributesMap();
4643
this.stats = new ConsumerStats(consumer.getConsumerStats());
4744
}

topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import tech.ydb.topic.settings.ReadEventHandlersSettings;
4949
import tech.ydb.topic.settings.ReaderSettings;
5050
import tech.ydb.topic.settings.WriterSettings;
51-
import tech.ydb.topic.utils.ProtoUtils;
5251
import tech.ydb.topic.write.AsyncWriter;
5352
import tech.ydb.topic.write.SyncWriter;
5453
import tech.ydb.topic.write.impl.AsyncWriterImpl;
@@ -296,7 +295,7 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) {
296295

297296
SupportedCodecs.Builder supportedCodecsBuilder = SupportedCodecs.newBuilder();
298297
for (int codec : result.getSupportedCodecs().getCodecsList()) {
299-
supportedCodecsBuilder.addCodec(ProtoUtils.codecFromProto(codec));
298+
supportedCodecsBuilder.addCodec(codec);
300299
}
301300
description.setSupportedCodecs(supportedCodecsBuilder.build());
302301

@@ -383,7 +382,7 @@ private static YdbTopic.Consumer toProto(Consumer consumer) {
383382
List<Integer> supportedCodecs = consumer.getSupportedCodecsList();
384383
if (!supportedCodecs.isEmpty()) {
385384
YdbTopic.SupportedCodecs.Builder codecBuilder = YdbTopic.SupportedCodecs.newBuilder();
386-
supportedCodecs.forEach(codec -> codecBuilder.addCodecs(ProtoUtils.toProto(codec)));
385+
supportedCodecs.forEach(codecBuilder::addCodecs);
387386
consumerBuilder.setSupportedCodecs(codecBuilder.build());
388387
}
389388

@@ -394,7 +393,7 @@ private static YdbTopic.SupportedCodecs toProto(SupportedCodecs supportedCodecs)
394393
List<Integer> supportedCodecsList = supportedCodecs.getCodecs();
395394
YdbTopic.SupportedCodecs.Builder codecsBuilder = YdbTopic.SupportedCodecs.newBuilder();
396395
for (Integer codec : supportedCodecsList) {
397-
codecsBuilder.addCodecs(tech.ydb.topic.utils.ProtoUtils.toProto(codec));
396+
codecsBuilder.addCodecs(codec);
398397
}
399398
return codecsBuilder.build();
400399
}

topic/src/main/java/tech/ydb/topic/read/impl/BatchMeta.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class BatchMeta {
1818
public BatchMeta(YdbTopic.StreamReadMessage.ReadResponse.Batch batch) {
1919
this.producerId = batch.getProducerId();
2020
this.writeSessionMeta = batch.getWriteSessionMetaMap();
21-
this.codec = tech.ydb.topic.utils.ProtoUtils.codecFromProto(batch.getCodec());
21+
this.codec = batch.getCodec();
2222
this.writtenAt = ProtobufUtils.protoToInstant(batch.getWrittenAt());
2323
}
2424

topic/src/main/java/tech/ydb/topic/utils/ProtoUtils.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import tech.ydb.proto.topic.YdbTopic;
1616
import tech.ydb.topic.description.MetadataItem;
1717
import tech.ydb.topic.settings.WriterSettings;
18-
import tech.ydb.topic.utils.ProtoUtils;
1918

2019
/**
2120
* Utility class that splits messages into several requests so that every request would be less than grpc size limit
@@ -80,7 +79,7 @@ public void setSession(WriteSession session) {
8079

8180
private void reset() {
8281
writeRequestBuilder = YdbTopic.StreamWriteMessage.WriteRequest.newBuilder()
83-
.setCodec(ProtoUtils.toProto(settings.getCodec()));
82+
.setCodec(settings.getCodec());
8483
messageCount = 0;
8584
totalMessageDataProtoSize = 0;
8685
}
@@ -124,7 +123,7 @@ public void sendWriteRequest() {
124123
messages.subList(firstHalfMessagesCount, messages.size())
125124
)) {
126125
writeRequestBuilder = YdbTopic.StreamWriteMessage.WriteRequest.newBuilder()
127-
.setCodec(ProtoUtils.toProto(settings.getCodec()));
126+
.setCodec(settings.getCodec());
128127
writeRequestBuilder.addAllMessages(sublist);
129128
YdbTopic.StreamWriteMessage.FromClient subRequest = YdbTopic.StreamWriteMessage.FromClient
130129
.newBuilder()

topic/src/test/java/tech/ydb/topic/impl/YdbTopicsCustomCodecImplTest.java renamed to topic/src/test/java/tech/ydb/topic/impl/CodecRegistryTest.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
* @author Evgeny Kuvardin
1818
*/
19-
public class YdbTopicsCustomCodecImplTest {
19+
public class CodecRegistryTest {
2020
CodecRegistry registry;
2121

2222
private static final int codecId = 10113;
@@ -46,25 +46,18 @@ public void registerCustomCodecShouldNotAcceptNull() {
4646
}
4747

4848
@Test
49-
public void registerCustomCodecShouldFailedWhenRegisterReservedCode() {
49+
public void registerCustomCodecShouldRegisterAndOverrideAnyCodec() {
5050
CodecTopic codec1 = new CodecTopic();
51-
expectErrorRegister(-1, codec1);
52-
expectErrorRegister(-100, codec1);
53-
expectErrorRegister(0, codec1);
54-
expectErrorRegister(1, codec1);
55-
expectErrorRegister(2, codec1);
56-
expectErrorRegister(3, codec1);
57-
expectErrorRegister(4, codec1);
58-
expectErrorRegister(10000, codec1);
51+
expectRegisterCodec(1, codec1, RawCodec.getInstance());
52+
expectRegisterCodec(2, codec1, GzipCodec.getInstance());
53+
expectRegisterCodec(3, codec1, LzopCodec.getInstance());
54+
expectRegisterCodec(4, codec1, ZstdCodec.getInstance());
5955
}
6056

61-
void expectErrorRegister(int codecId, CodecTopic codec) {
62-
codec.setCodecId(codecId);
63-
Exception e = Assert.assertThrows(
64-
RuntimeException.class,
65-
() -> registry.registerCodec(codec));
66-
67-
Assert.assertEquals("Create custom codec for reserved code not allowed: " + codec + " .Use code more than 10000", e.getMessage());
57+
void expectRegisterCodec(int codecId, CodecTopic newCodec, Codec oldCodec) {
58+
newCodec.setCodecId(codecId);
59+
Codec codecOldPredefined = registry.registerCodec(newCodec);
60+
Assert.assertSame(codecOldPredefined, oldCodec);
6861
}
6962

7063
static class CodecTopic implements Codec {

topic/src/test/java/tech/ydb/topic/impl/YdbTopicsCodecIntegrationTest.java

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,9 @@ public void writeWithReservedNotExistedCodec() {
305305
createTopic(client1, TEST_TOPIC1);
306306

307307
Exception e = Assert.assertThrows(RuntimeException.class, () -> writeData(7, TEST_TOPIC1, client1));
308-
Assert.assertEquals("Cannot convert codec to proto. Unknown codec value: " + 7, e.getMessage());
308+
Assert.assertTrue(e.getMessage().contains("Unsupported codec: " + 7));
309309
}
310310

311-
312311
/**
313312
* Create one more defect. Test failed for unknown reason. Seems RuntimeException produce some weird behaviour
314313
*/
@@ -340,6 +339,33 @@ public void readWriteRawCodec() throws ExecutionException, InterruptedException,
340339
readData(TEST_TOPIC1, client1);
341340
}
342341

342+
/**
343+
* The test checks that we can rewrite the predefined RAW codec.
344+
* Please note that modifying a RAW codec is highly unusual and potentially risky.
345+
* You take full responsibility for any consequences that may result.
346+
* The SDK includes mechanisms in some parts of the codec that attempt to optimize the code
347+
* and detect write or read operations to RAW codecs.
348+
* <p>
349+
* 1. Create client1
350+
* 2. Create topic TEST_TOPIC1 in client1
351+
* 3. Create custom codec
352+
* 4, Register codec
353+
* 5. Try to write
354+
* 6. Read data
355+
*/
356+
@Test
357+
public void userCanRewriteRawCodec() throws ExecutionException, InterruptedException, TimeoutException {
358+
client1 = createClient();
359+
createTopic(client1, TEST_TOPIC1);
360+
361+
Codec codec = new CustomCodec(0, Codec.RAW);
362+
client1.registerCodec(codec);
363+
364+
writeData(codec.getId(), TEST_TOPIC1, client1);
365+
366+
readData(TEST_TOPIC1, client1);
367+
}
368+
343369
/**
344370
* Test checks that we can write and read using GZIP Codec
345371
* <p>
@@ -358,6 +384,29 @@ public void readWriteGzipCodec() throws ExecutionException, InterruptedException
358384
readData(TEST_TOPIC1, client1);
359385
}
360386

387+
/**
388+
* The test checks that we can rewrite the predefined Gzip codec.
389+
* <p>
390+
* 1. Create client1
391+
* 2. Create topic TEST_TOPIC1 in client1
392+
* 3. Create custom codec
393+
* 4, Register codec
394+
* 5. Try to write
395+
* 6. Read data
396+
*/
397+
@Test
398+
public void userCanRewriteGzipCodec() throws ExecutionException, InterruptedException, TimeoutException {
399+
client1 = createClient();
400+
createTopic(client1, TEST_TOPIC1);
401+
402+
Codec codec = new CustomCodec(2, Codec.GZIP);
403+
client1.registerCodec(codec);
404+
405+
writeData(codec.getId(), TEST_TOPIC1, client1);
406+
407+
readData(TEST_TOPIC1, client1);
408+
}
409+
361410
/**
362411
* Test checks that we can write and read using Lzop Codec
363412
* <p>
@@ -376,6 +425,29 @@ public void readWriteLzopCodec() throws ExecutionException, InterruptedException
376425
readData(TEST_TOPIC1, client1);
377426
}
378427

428+
/**
429+
* The test checks that we can rewrite the predefined Lzop codec.
430+
* <p>
431+
* 1. Create client1
432+
* 2. Create topic TEST_TOPIC1 in client1
433+
* 3. Create custom codec
434+
* 4, Register codec
435+
* 5. Try to write
436+
* 6. Read data
437+
*/
438+
@Test
439+
public void userCanRewriteLzopCodec() throws ExecutionException, InterruptedException, TimeoutException {
440+
client1 = createClient();
441+
createTopic(client1, TEST_TOPIC1);
442+
443+
Codec codec = new CustomCodec(3, Codec.LZOP);
444+
client1.registerCodec(codec);
445+
446+
writeData(codec.getId(), TEST_TOPIC1, client1);
447+
448+
readData(TEST_TOPIC1, client1);
449+
}
450+
379451
/**
380452
* Test checks that we can write and read using Zstd Codec
381453
* <p>
@@ -394,6 +466,29 @@ public void readWriteZstdCodec() throws ExecutionException, InterruptedException
394466
readData(TEST_TOPIC1, client1);
395467
}
396468

469+
/**
470+
* The test checks that we can rewrite the predefined Lzop codec.
471+
* <p>
472+
* 1. Create client1
473+
* 2. Create topic TEST_TOPIC1 in client1
474+
* 3. Create custom codec
475+
* 4, Register codec
476+
* 5. Try to write
477+
* 6. Read data
478+
*/
479+
@Test
480+
public void userCanRewriteZstdCodec() throws ExecutionException, InterruptedException, TimeoutException {
481+
client1 = createClient();
482+
createTopic(client1, TEST_TOPIC1);
483+
484+
Codec codec = new CustomCodec(4, Codec.ZSTD);
485+
client1.registerCodec(codec);
486+
487+
writeData(codec.getId(), TEST_TOPIC1, client1);
488+
489+
readData(TEST_TOPIC1, client1);
490+
}
491+
397492
private TopicClient createClient() {
398493
TopicClient topicClient = TopicClient.newClient(ydbTransport).build();
399494
clientToClose.add(topicClient);
@@ -468,7 +563,7 @@ private void readData(String topicName, TopicClient client) throws InterruptedEx
468563

469564
Assert.assertNotNull(testMessages);
470565
for (byte[] bytes : testMessages) {
471-
tech.ydb.topic.read.Message msg = reader.receive(1, TimeUnit.SECONDS);
566+
tech.ydb.topic.read.Message msg = reader.receive(10, TimeUnit.SECONDS);
472567
Assert.assertNotNull(msg);
473568
Assert.assertArrayEquals(bytes, msg.getData());
474569
}

0 commit comments

Comments
 (0)