Skip to content

Commit d5c976e

Browse files
DimaVildaHaarolean
andauthored
BE: Serde: Fix HTTP 500 on protobuf Any type (#696)
Co-authored-by: Roman Zabaluev <gpg@haarolean.dev>
1 parent 4bb3632 commit d5c976e

File tree

5 files changed

+68
-3
lines changed

5 files changed

+68
-3
lines changed

api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.google.protobuf.StructProto;
1616
import com.google.protobuf.TimestampProto;
1717
import com.google.protobuf.TypeProto;
18+
import com.google.protobuf.TypeRegistry;
1819
import com.google.protobuf.WrappersProto;
1920
import com.google.protobuf.util.JsonFormat;
2021
import com.google.type.ColorProto;
@@ -147,12 +148,18 @@ public boolean canSerialize(String topic, Serde.Target type) {
147148
@Override
148149
public Serde.Serializer serializer(String topic, Serde.Target type) {
149150
var descriptor = descriptorFor(topic, type).orElseThrow();
151+
TypeRegistry typeRegistry = TypeRegistry.newBuilder()
152+
.add(descriptorPaths.keySet())
153+
.build();
154+
150155
return new Serde.Serializer() {
151156
@SneakyThrows
152157
@Override
153158
public byte[] serialize(String input) {
154159
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
155-
JsonFormat.parser().merge(input, builder);
160+
JsonFormat.parser()
161+
.usingTypeRegistry(typeRegistry)
162+
.merge(input, builder);
156163
return builder.build().toByteArray();
157164
}
158165
};

api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.kafbat.ui.container.KafkaConnectContainer;
66
import io.kafbat.ui.container.KsqlDbContainer;
77
import io.kafbat.ui.container.SchemaRegistryContainer;
8+
import java.io.FileNotFoundException;
89
import java.nio.file.Path;
910
import java.util.List;
1011
import java.util.Properties;
@@ -22,6 +23,7 @@
2223
import org.springframework.test.context.ActiveProfiles;
2324
import org.springframework.test.context.ContextConfiguration;
2425
import org.springframework.test.util.TestSocketUtils;
26+
import org.springframework.util.ResourceUtils;
2527
import org.testcontainers.containers.KafkaContainer;
2628
import org.testcontainers.containers.Network;
2729
import org.testcontainers.utility.DockerImageName;
@@ -75,6 +77,18 @@ public static class Initializer
7577
public void initialize(@NotNull ConfigurableApplicationContext context) {
7678
System.setProperty("kafka.clusters.0.name", LOCAL);
7779
System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());
80+
81+
// Add ProtobufFileSerde configuration
82+
System.setProperty("kafka.clusters.0.serde.0.name", "ProtobufFile");
83+
System.setProperty("kafka.clusters.0.serde.0.topicValuesPattern", "masking-test-.*");
84+
try {
85+
System.setProperty("kafka.clusters.0.serde.0.properties.protobufFilesDir",
86+
ResourceUtils.getFile("classpath:protobuf-serde").getAbsolutePath());
87+
} catch (FileNotFoundException e) {
88+
throw new RuntimeException(e);
89+
}
90+
System.setProperty("kafka.clusters.0.serde.0.properties.protobufMessageName", "test.MessageWithAny");
91+
7892
// List unavailable hosts to verify failover
7993
System.setProperty("kafka.clusters.0.schemaRegistry",
8094
String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",

api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerdeTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,15 @@ void setUp() throws Exception {
8080
void loadsAllProtoFiledFromTargetDirectory() throws Exception {
8181
var protoDir = ResourceUtils.getFile("classpath:protobuf-serde/").getPath();
8282
List<ProtoFile> files = new ProtobufFileSerde.ProtoSchemaLoader(protoDir).load();
83-
assertThat(files).hasSize(4);
83+
assertThat(files).hasSize(5);
8484
assertThat(files)
8585
.map(f -> f.getLocation().getPath())
8686
.containsExactlyInAnyOrder(
8787
"language/language.proto",
8888
"sensor.proto",
8989
"address-book.proto",
90-
"lang-description.proto"
90+
"lang-description.proto",
91+
"messagewithany.proto"
9192
);
9293
}
9394

api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.kafbat.ui.model.TopicMessageDTO;
1414
import io.kafbat.ui.model.TopicMessageEventDTO;
1515
import io.kafbat.ui.producer.KafkaTestProducer;
16+
import io.kafbat.ui.serdes.builtin.ProtobufFileSerde;
1617
import io.kafbat.ui.serdes.builtin.StringSerde;
1718
import java.util.HashSet;
1819
import java.util.List;
@@ -214,4 +215,33 @@ void execSmartFilterTestReturnsErrorOnFilterCompilationError() {
214215
assertThat(result.getError()).containsIgnoringCase("Compilation error");
215216
}
216217

218+
@Test
219+
void sendMessageWithProtobufAnyType() {
220+
String jsonContent = """
221+
{
222+
"name": "testName",
223+
"payload": {
224+
"@type": "type.googleapis.com/test.PayloadMessage",
225+
"id": "123"
226+
}
227+
}
228+
""";
229+
230+
CreateTopicMessageDTO testMessage = new CreateTopicMessageDTO()
231+
.key(null)
232+
.partition(0)
233+
.keySerde(StringSerde.name())
234+
.content(jsonContent)
235+
.valueSerde(ProtobufFileSerde.name());
236+
237+
String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
238+
createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1));
239+
240+
StepVerifier.create(messagesService.sendMessage(cluster, testTopic, testMessage))
241+
.expectNextMatches(metadata -> metadata.topic().equals(testTopic)
242+
&& metadata.partition() == 0
243+
&& metadata.offset() >= 0)
244+
.verifyComplete();
245+
}
246+
217247
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
syntax = "proto3";
2+
package test;
3+
4+
import "google/protobuf/any.proto";
5+
6+
message MessageWithAny {
7+
string name = 1;
8+
google.protobuf.Any payload = 2;
9+
}
10+
11+
message PayloadMessage {
12+
string id = 1;
13+
}

0 commit comments

Comments
 (0)