Skip to content

Commit d871494

Browse files
authored
Merge pull request #447 from ekuvardin/365.Add-support-of-custom-codecs-in-topics
365.add support of custom codecs in topics
2 parents 943d5aa + a77588f commit d871494

26 files changed

+1229
-151
lines changed

topic/src/main/java/tech/ydb/topic/TopicClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import tech.ydb.core.Result;
99
import tech.ydb.core.Status;
1010
import tech.ydb.core.grpc.GrpcTransport;
11+
import tech.ydb.topic.description.Codec;
1112
import tech.ydb.topic.description.ConsumerDescription;
1213
import tech.ydb.topic.description.TopicDescription;
1314
import tech.ydb.topic.impl.GrpcTopicRpc;
@@ -164,6 +165,14 @@ default CompletableFuture<Result<ConsumerDescription>> describeConsumer(String p
164165
@Override
165166
void close();
166167

168+
/**
169+
* Register custom codec implementation to TopicClient
170+
*
171+
* @param codec - custom implementation
172+
*/
173+
void registerCodec(Codec codec);
174+
175+
167176
/**
168177
* BUILDER
169178
*/
Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,72 @@
11
package tech.ydb.topic.description;
22

3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
37
/**
8+
9+
*
10+
* Interface for custom codec implementation.
11+
* <p>
12+
*
13+
* You can use custom codec as below
14+
* 1. Implement interface methods
15+
* Specify getId which return value more than 10000. This value identify codec across others
16+
* 2. Use code below to write data
17+
* Codec codecImpl = ....
18+
* Topic client = TopicClient.newClient(ydbTransport).build();
19+
* <p>
20+
* client.registerCodec(codecImpl);
21+
* WriterSettings settings = WriterSettings.newBuilder()
22+
* .setTopicPath(topicName)
23+
* .setCodec(codecId)
24+
* .build();
25+
* <p>
26+
* SyncWriter writer = client.createSyncWriter(settings);
27+
* <p>
28+
* 3. Use to read data. Codec should be registered in {@link CodecRegistry}
29+
* Codec codecImpl = ....
30+
* Topic client = TopicClient.newClient(ydbTransport).build();
31+
* <p>
32+
* ReaderSettings readerSettings = ReaderSettings.newBuilder()
33+
* .addTopic(TopicReadSettings.newBuilder().setPath(topicName).build())
34+
* .setConsumerName(TEST_CONSUMER1)
35+
* .build();
36+
* <p>
37+
* SyncReader reader = client.createSyncReader(readerSettings);
38+
*
439
* @author Nikolay Perfilov
540
*/
6-
public enum Codec {
7-
RAW,
8-
GZIP,
9-
LZOP,
10-
ZSTD,
11-
CUSTOM;
41+
public interface Codec {
42+
int RAW = 1;
43+
int GZIP = 2;
44+
int LZOP = 3;
45+
int ZSTD = 4;
46+
47+
/**
48+
* Get codec identifier
49+
* @return codec identifier
50+
*/
51+
int getId();
52+
53+
/**
54+
* Decode data
55+
*
56+
* @param byteArrayInputStream input stream
57+
* @return output stream
58+
* @throws IOException throws when error occurs
59+
*/
60+
61+
InputStream decode(InputStream byteArrayInputStream) throws IOException;
62+
63+
/**
64+
* Encode data
65+
*
66+
* @param byteArrayOutputStream output stream
67+
* @return output stream
68+
* @throws IOException throws when error occurs
69+
*/
70+
OutputStream encode(OutputStream byteArrayOutputStream) throws IOException;
71+
1272
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package tech.ydb.topic.description;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import tech.ydb.topic.impl.GzipCodec;
10+
import tech.ydb.topic.impl.LzopCodec;
11+
import tech.ydb.topic.impl.RawCodec;
12+
import tech.ydb.topic.impl.ZstdCodec;
13+
14+
/**
15+
* Register for custom topic codec. Local to TopicClient
16+
*
17+
* @author Evgeny Kuvardin
18+
**/
19+
public class CodecRegistry {
20+
21+
private static final Logger logger = LoggerFactory.getLogger(CodecRegistry.class);
22+
23+
final Map<Integer, Codec> customCodecMap;
24+
25+
public CodecRegistry() {
26+
customCodecMap = new HashMap<>();
27+
customCodecMap.put(Codec.RAW, RawCodec.getInstance());
28+
customCodecMap.put(Codec.GZIP, GzipCodec.getInstance());
29+
customCodecMap.put(Codec.LZOP, LzopCodec.getInstance());
30+
customCodecMap.put(Codec.ZSTD, ZstdCodec.getInstance());
31+
}
32+
33+
/**
34+
* Register codec implementation
35+
* @param codec codec implementation
36+
* @return previous implementation with associated codec
37+
*/
38+
public Codec registerCodec(Codec codec) {
39+
assert codec != null;
40+
int codecId = codec.getId();
41+
42+
Codec result = customCodecMap.put(codecId, codec);
43+
44+
if (result != null) {
45+
logger.info(
46+
"Replace codec which have already associated with this id. CodecId: {} Codec: {}",
47+
codecId,
48+
result);
49+
}
50+
51+
return result;
52+
}
53+
54+
/**
55+
* Get codec implementation by associated id
56+
* @param codecId codec identifier
57+
* @return codec implementation
58+
*/
59+
public Codec getCodec(int codecId) {
60+
return customCodecMap.get(codecId);
61+
}
62+
63+
}

topic/src/main/java/tech/ydb/topic/description/Consumer.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import java.util.HashMap;
66
import java.util.List;
77
import java.util.Map;
8-
import java.util.stream.Collectors;
98

109
import javax.annotation.Nonnull;
1110
import javax.annotation.Nullable;
@@ -14,7 +13,6 @@
1413

1514
import tech.ydb.core.utils.ProtobufUtils;
1615
import tech.ydb.proto.topic.YdbTopic;
17-
import tech.ydb.topic.utils.ProtoUtils;
1816

1917
/**
2018
* @author Nikolay Perfilov
@@ -23,7 +21,7 @@ public class Consumer {
2321
private final String name;
2422
private final boolean important;
2523
private final Instant readFrom;
26-
private final List<Codec> supportedCodecs;
24+
private final List<Integer> supportedCodecs;
2725
private final Map<String, String> attributes;
2826
private final ConsumerStats stats;
2927

@@ -40,8 +38,7 @@ public Consumer(YdbTopic.Consumer consumer) {
4038
this.name = consumer.getName();
4139
this.important = consumer.getImportant();
4240
this.readFrom = ProtobufUtils.protoToInstant(consumer.getReadFrom());
43-
this.supportedCodecs = consumer.getSupportedCodecs().getCodecsList()
44-
.stream().map(ProtoUtils::codecFromProto).collect(Collectors.toList());
41+
this.supportedCodecs = new ArrayList<>(consumer.getSupportedCodecs().getCodecsList());
4542
this.attributes = consumer.getAttributesMap();
4643
this.stats = new ConsumerStats(consumer.getConsumerStats());
4744
}
@@ -68,7 +65,7 @@ public SupportedCodecs getSupportedCodecs() {
6865
return new SupportedCodecs(supportedCodecs);
6966
}
7067

71-
public List<Codec> getSupportedCodecsList() {
68+
public List<Integer> getSupportedCodecsList() {
7269
return supportedCodecs;
7370
}
7471

@@ -88,7 +85,7 @@ public static class Builder {
8885
private String name;
8986
private boolean important = false;
9087
private Instant readFrom = null;
91-
private List<Codec> supportedCodecs = new ArrayList<>();
88+
private List<Integer> supportedCodecs = new ArrayList<>();
9289
private Map<String, String> attributes = new HashMap<>();
9390
private ConsumerStats stats = null;
9491

@@ -107,7 +104,7 @@ public Builder setReadFrom(Instant readFrom) {
107104
return this;
108105
}
109106

110-
public Builder addSupportedCodec(Codec codec) {
107+
public Builder addSupportedCodec(int codec) {
111108
this.supportedCodecs.add(codec);
112109
return this;
113110
}

topic/src/main/java/tech/ydb/topic/description/SupportedCodecs.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,17 @@
99
* @author Nikolay Perfilov
1010
*/
1111
public class SupportedCodecs {
12-
private final List<Codec> codecs;
12+
private final List<Integer> codecs;
1313

1414
public SupportedCodecs(Builder builder) {
1515
this.codecs = ImmutableList.copyOf(builder.codecs);
1616
}
1717

18-
public SupportedCodecs(List<Codec> codecs) {
18+
public SupportedCodecs(List<Integer> codecs) {
1919
this.codecs = codecs;
2020
}
2121

22-
public List<Codec> getCodecs() {
22+
public List<Integer> getCodecs() {
2323
return codecs;
2424
}
2525

@@ -31,14 +31,14 @@ public static Builder newBuilder() {
3131
* BUILDER
3232
*/
3333
public static class Builder {
34-
private List<Codec> codecs = new ArrayList<>();
34+
private List<Integer> codecs = new ArrayList<>();
3535

36-
public Builder addCodec(Codec codec) {
36+
public Builder addCodec(int codec) {
3737
codecs.add(codec);
3838
return this;
3939
}
4040

41-
public Builder setCodecs(List<Codec> supportedCodecs) {
41+
public Builder setCodecs(List<Integer> supportedCodecs) {
4242
this.codecs = supportedCodecs;
4343
return this;
4444
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package tech.ydb.topic.impl;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
import java.util.zip.GZIPInputStream;
7+
import java.util.zip.GZIPOutputStream;
8+
9+
import tech.ydb.topic.description.Codec;
10+
11+
/**
12+
* Compression codec which implements the GZIP algorithm
13+
*/
14+
public class GzipCodec implements Codec {
15+
16+
private static final GzipCodec INSTANCE = new GzipCodec();
17+
18+
private GzipCodec() {
19+
}
20+
21+
/**
22+
* Get single instance
23+
* @return single instance of RawCodec
24+
*/
25+
public static GzipCodec getInstance() {
26+
return INSTANCE;
27+
}
28+
29+
@Override
30+
public int getId() {
31+
return Codec.GZIP;
32+
}
33+
34+
@Override
35+
public InputStream decode(InputStream byteArrayInputStream) throws IOException {
36+
return new GZIPInputStream(byteArrayInputStream);
37+
}
38+
39+
@Override
40+
public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException {
41+
return new GZIPOutputStream(byteArrayOutputStream);
42+
}
43+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package tech.ydb.topic.impl;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
7+
import org.anarres.lzo.LzoAlgorithm;
8+
import org.anarres.lzo.LzoCompressor;
9+
import org.anarres.lzo.LzoLibrary;
10+
import org.anarres.lzo.LzopInputStream;
11+
import org.anarres.lzo.LzopOutputStream;
12+
13+
import tech.ydb.topic.description.Codec;
14+
15+
/**
16+
* Compression codec which implements the LZO algorithm
17+
*/
18+
public class LzopCodec implements Codec {
19+
20+
private static final LzopCodec INSTANCE = new LzopCodec();
21+
22+
private LzopCodec() {
23+
}
24+
25+
/**
26+
* Get single instance
27+
* @return single instance of RawCodec
28+
*/
29+
public static LzopCodec getInstance() {
30+
return INSTANCE;
31+
}
32+
33+
@Override
34+
public int getId() {
35+
return Codec.LZOP;
36+
}
37+
38+
@Override
39+
public InputStream decode(InputStream byteArrayInputStream) throws IOException {
40+
return new LzopInputStream(byteArrayInputStream);
41+
}
42+
43+
@Override
44+
public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException {
45+
LzoCompressor lzoCompressor = LzoLibrary.getInstance().newCompressor(LzoAlgorithm.LZO1X, null);
46+
return new LzopOutputStream(byteArrayOutputStream, lzoCompressor);
47+
}
48+
}

0 commit comments

Comments
 (0)