Skip to content

Commit a1fb075

Browse files
committed
BE: Closes #71 Messages: Show headers duplicates
1 parent 9ea1a4e commit a1fb075

File tree

15 files changed

+169
-46
lines changed

15 files changed

+169
-46
lines changed

api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import dev.cel.common.CelValidationResult;
1515
import dev.cel.common.types.CelType;
1616
import dev.cel.common.types.CelTypeProvider;
17+
import dev.cel.common.types.ListType;
1718
import dev.cel.common.types.MapType;
1819
import dev.cel.common.types.SimpleType;
1920
import dev.cel.common.types.StructType;
@@ -67,9 +68,14 @@ private static boolean headersContains(TopicMessageDTO msg, String searchString)
6768
}
6869

6970
for (final var entry : headers.entrySet()) {
70-
if (StringUtils.contains(entry.getKey(), searchString) || StringUtils.contains(entry.getValue(), searchString)) {
71+
if (StringUtils.contains(entry.getKey(), searchString)) {
7172
return true;
7273
}
74+
for (final var value : entry.getValue()) {
75+
if (StringUtils.contains(value, searchString)) {
76+
return true;
77+
}
78+
}
7379
}
7480

7581
return false;
@@ -143,7 +149,7 @@ private static CelCompiler createCompiler() {
143149
"timestampMs", SimpleType.INT,
144150
"keyAsText", SimpleType.STRING,
145151
"valueAsText", SimpleType.STRING,
146-
"headers", MapType.create(SimpleType.STRING, SimpleType.STRING),
152+
"headers", MapType.create(SimpleType.STRING, ListType.create(SimpleType.STRING)),
147153
"key", SimpleType.DYN,
148154
"value", SimpleType.DYN
149155
);

api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import java.time.Instant;
77
import java.time.OffsetDateTime;
88
import java.time.ZoneId;
9+
import java.util.ArrayList;
910
import java.util.Arrays;
1011
import java.util.HashMap;
12+
import java.util.List;
1113
import java.util.Map;
1214
import java.util.function.UnaryOperator;
1315
import lombok.RequiredArgsConstructor;
@@ -63,13 +65,13 @@ private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType)
6365
}
6466

6567
private void fillHeaders(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {
66-
Map<String, String> headers = new HashMap<>();
68+
Map<String, List<String>> headers = new HashMap<>();
6769
rec.headers().iterator()
68-
.forEachRemaining(header ->
69-
headers.put(
70-
header.key(),
71-
header.value() != null ? new String(header.value()) : null
72-
));
70+
.forEachRemaining(header -> {
71+
String key = header.key();
72+
String value = header.value() != null ? new String(header.value()) : null;
73+
headers.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
74+
});
7375
message.setHeaders(headers);
7476
}
7577

api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kafbat.ui.serdes;
22

33
import io.kafbat.ui.serde.api.Serde;
4+
import java.util.List;
45
import java.util.Map;
56
import javax.annotation.Nullable;
67
import lombok.RequiredArgsConstructor;
@@ -19,7 +20,7 @@ public ProducerRecord<byte[], byte[]> create(String topic,
1920
@Nullable Integer partition,
2021
@Nullable String key,
2122
@Nullable String value,
22-
@Nullable Map<String, String> headers) {
23+
@Nullable Map<String, List<String>> headers) {
2324
return new ProducerRecord<>(
2425
topic,
2526
partition,
@@ -29,10 +30,14 @@ public ProducerRecord<byte[], byte[]> create(String topic,
2930
);
3031
}
3132

32-
private Iterable<Header> createHeaders(Map<String, String> clientHeaders) {
33+
private Iterable<Header> createHeaders(Map<String, List<String>> clientHeaders) {
3334
RecordHeaders headers = new RecordHeaders();
34-
clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v == null ? null : v.getBytes())));
35+
clientHeaders.forEach((k, values) -> values.forEach(v -> headers.add(createRecord(k, v))));
3536
return headers;
3637
}
3738

39+
private RecordHeader createRecord(String key, String value) {
40+
return new RecordHeader(key, value == null ? null : value.getBytes());
41+
}
42+
3843
}

api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,17 @@ void returnsTrueWhenStringContainedInKeyOrContentOrHeadersOrInAllThree() {
4242
);
4343

4444
assertTrue(
45-
filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("abC", "value")))
45+
filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("abC", List.of("value"))))
4646
);
4747

4848
assertTrue(
49-
filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("x1", "some abC")))
49+
filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("x1", List.of("some abC"))))
50+
);
51+
52+
assertTrue(
53+
filter.test(msg().key("dfg")
54+
.content("does-not-contain")
55+
.headers(Map.of("x1", List.of("does-not-contain", "some abC"))))
5056
);
5157
}
5258

@@ -65,7 +71,7 @@ void returnsFalseOtherwise() {
6571
);
6672

6773
assertFalse(
68-
filter.test(msg().key("aBc").content("AbC").headers(Map.of("abc", "value")))
74+
filter.test(msg().key("aBc").content("AbC").headers(Map.of("abc", List.of("value"))))
6975
);
7076

7177
}
@@ -97,12 +103,12 @@ void canCheckOffset() {
97103

98104
@Test
99105
void canCheckHeaders() {
100-
var f = celScriptFilter("record.headers.size() == 2 && record.headers['k1'] == 'v1'");
101-
assertTrue(f.test(msg().headers(Map.of("k1", "v1", "k2", "v2"))));
102-
assertFalse(f.test(msg().headers(Map.of("k1", "unexpected", "k2", "v2"))));
106+
var f = celScriptFilter("record.headers.size() == 2 && record.headers['k1'] == ['v1']");
107+
assertTrue(f.test(msg().headers(Map.of("k1", List.of("v1"), "k2", List.of("v2")))));
108+
assertFalse(f.test(msg().headers(Map.of("k1", List.of("unexpected"), "k2", List.of("v2")))));
103109

104-
f = celScriptFilter("record.headers.size() == 1 && !has(record.headers.k1) && record.headers['k2'] == 'v2'");
105-
assertTrue(f.test(msg().headers(Map.of("k2", "v2"))));
110+
f = celScriptFilter("record.headers.size() == 1 && !has(record.headers.k1) && record.headers['k2'] == ['v2']");
111+
assertTrue(f.test(msg().headers(Map.of("k2", List.of("v2")))));
106112

107113
f = celScriptFilter("!has(record.headers) || record.headers.size() == 0");
108114
assertTrue(f.test(msg().headers(Map.of())));

api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package io.kafbat.ui.serdes;
22

33
import static io.kafbat.ui.serde.api.DeserializeResult.Type.STRING;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
45
import static org.mockito.Mockito.any;
56
import static org.mockito.Mockito.mock;
67
import static org.mockito.Mockito.verify;
8+
import static org.mockito.Mockito.when;
79

810
import io.kafbat.ui.model.TopicMessageDTO;
911
import io.kafbat.ui.serde.api.DeserializeResult;
1012
import io.kafbat.ui.serde.api.Serde;
13+
import java.util.List;
1114
import java.util.Map;
1215
import java.util.function.UnaryOperator;
1316
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -22,9 +25,50 @@ void dataMaskingAppliedOnDeserializedMessage() {
2225
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
2326

2427
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
25-
recordDeser.deserialize(new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes())));
28+
recordDeser.deserialize(record());
2629

2730
verify(maskerMock).apply(any(TopicMessageDTO.class));
2831
}
2932

33+
@Test
34+
void deserializeWithMultipleHeaderValues() {
35+
UnaryOperator<TopicMessageDTO> maskerMock = mock();
36+
when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0));
37+
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
38+
39+
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
40+
ConsumerRecord<Bytes, Bytes> record = record();
41+
record.headers().add("headerKey", "headerValue1".getBytes());
42+
record.headers().add("headerKey", "headerValue2".getBytes());
43+
TopicMessageDTO message = recordDeser.deserialize(record);
44+
45+
Map<String, List<String>> headers = message.getHeaders();
46+
assertEquals(1, headers.size());
47+
assertEquals(List.of("headerValue1", "headerValue2"), headers.get("headerKey"));
48+
}
49+
50+
@Test
51+
void deserializeWithMixedSingleAndMultipleHeaderValues() {
52+
UnaryOperator<TopicMessageDTO> maskerMock = mock();
53+
when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0));
54+
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
55+
56+
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
57+
ConsumerRecord<Bytes, Bytes> record = record();
58+
record.headers().add("headerKey1", "singleValue".getBytes());
59+
record.headers().add("headerKey2", "multiValue1".getBytes());
60+
record.headers().add("headerKey2", "multiValue2".getBytes());
61+
TopicMessageDTO message = recordDeser.deserialize(record);
62+
63+
Map<String, List<String>> headers = message.getHeaders();
64+
assertEquals(1, headers.get("headerKey1").size());
65+
assertEquals(List.of("singleValue"), headers.get("headerKey1"));
66+
assertEquals(2, headers.get("headerKey2").size());
67+
assertEquals(List.of("multiValue1", "multiValue2"), headers.get("headerKey2"));
68+
}
69+
70+
private ConsumerRecord<Bytes, Bytes> record() {
71+
return new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes()));
72+
}
73+
3074
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.kafbat.ui.serdes;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
import static org.junit.jupiter.api.Assertions.assertNotNull;
6+
import static org.mockito.Mockito.mock;
7+
8+
import io.kafbat.ui.serde.api.Serde;
9+
import java.util.List;
10+
import java.util.Map;
11+
import org.apache.kafka.clients.producer.ProducerRecord;
12+
import org.apache.kafka.common.header.internals.RecordHeader;
13+
import org.junit.jupiter.api.Test;
14+
15+
class ProducerRecordCreatorTest {
16+
17+
@Test
18+
void createWithHeaders() {
19+
Serde.Serializer keySerializer = mock(Serde.Serializer.class);
20+
Serde.Serializer valueSerializer = mock(Serde.Serializer.class);
21+
22+
ProducerRecordCreator recordCreator = new ProducerRecordCreator(keySerializer, valueSerializer);
23+
Map<String, List<String>> headers = Map.of(
24+
"headerKey1", List.of("headerValue1"),
25+
"headerKey2", List.of("headerValue2", "headerValue3")
26+
);
27+
ProducerRecord<byte[], byte[]> record = recordCreator.create("topic", 1, "key", "value", headers);
28+
29+
assertNotNull(record.headers());
30+
assertEquals(3, record.headers().toArray().length);
31+
assertThat(record.headers()).containsExactlyInAnyOrder(
32+
new RecordHeader("headerKey1", "headerValue1".getBytes()),
33+
new RecordHeader("headerKey2", "headerValue2".getBytes()),
34+
new RecordHeader("headerKey2", "headerValue3".getBytes())
35+
);
36+
}
37+
}

api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ void execSmartFilterTestReturnsExecutionResult() {
170170
+ "&& has(record.timestampMs) && has(record.offset)")
171171
.key("1234")
172172
.value("{ \"some\" : \"value\" } ")
173-
.headers(Map.of("h1", "hv1"))
173+
.headers(Map.of("h1", List.of("hv1")))
174174
.offset(12345L)
175175
.timestampMs(System.currentTimeMillis())
176176
.partition(1);

api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ void topicMessageMetadataJson() {
412412
.keySerde(SchemaRegistrySerde.name())
413413
.content(JSON_SCHEMA_RECORD)
414414
.valueSerde(SchemaRegistrySerde.name())
415-
.headers(Map.of("header1", "value1"))
415+
.headers(Map.of("header1", List.of("value1")))
416416
)
417417
.doAssert(polled -> {
418418
assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
@@ -438,9 +438,9 @@ void headerValueNullPresentTest() {
438438
.keySerde(SchemaRegistrySerde.name())
439439
.content(JSON_SCHEMA_RECORD)
440440
.valueSerde(SchemaRegistrySerde.name())
441-
.headers(Collections.singletonMap("header123", null))
441+
.headers(Collections.singletonMap("header123", Collections.singletonList(null)))
442442
)
443-
.doAssert(polled -> assertThat(polled.getHeaders().get("header123")).isNull());
443+
.doAssert(polled -> assertThat(polled.getHeaders().get("header123")).containsExactly((String) null));
444444
}
445445

446446

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3004,7 +3004,9 @@ components:
30043004
headers:
30053005
type: object
30063006
additionalProperties:
3007-
type: string
3007+
type: array
3008+
items:
3009+
type: string
30083010
partition:
30093011
type: integer
30103012
offset:
@@ -3033,7 +3035,9 @@ components:
30333035
headers:
30343036
type: object
30353037
additionalProperties:
3036-
type: string
3038+
type: array
3039+
items:
3040+
type: string
30373041
content:
30383042
type: string
30393043
nullable: true
@@ -3123,7 +3127,9 @@ components:
31233127
headers:
31243128
type: object
31253129
additionalProperties:
3126-
type: string
3130+
type: array
3131+
items:
3132+
type: string
31273133
content:
31283134
type: string
31293135
keyFormat:

frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const InfoModal: React.FC<InfoModalProps> = ({ toggleIsOpen }) => {
1919
<S.ListItem>value (json if possible)</S.ListItem>
2020
<S.ListItem>keyAsText</S.ListItem>
2121
<S.ListItem>valueAsText</S.ListItem>
22-
<S.ListItem>header</S.ListItem>
22+
<S.ListItem>headers</S.ListItem>
2323
<S.ListItem>partition</S.ListItem>
2424
<S.ListItem>timestampMs</S.ListItem>
2525
</ol>
@@ -51,7 +51,7 @@ const InfoModal: React.FC<InfoModalProps> = ({ toggleIsOpen }) => {
5151
<S.ListItem>
5252
<code>
5353
record.headers.size() == 1 && !has(record.headers.k1) &&
54-
record.headers[&apos;k2&apos;] == &apos;v2&apos;
54+
&apos;v2&apos; in record.headers[&apos;k2&apos;]
5555
</code>
5656
</S.ListItem>
5757
</ol>

0 commit comments

Comments
 (0)