Skip to content

Commit f4a3bf0

Browse files
[release-1.0] Properly handle events without the data field (#1500)
* Properly handle events without data field A valid CloudEvent in the CE binary protocol binding of Kafka might be composed by only Headers. KafkaConsumer doesn't call the deserializer if the value is null. That means that we get a record with a null value even though the record is a valid CloudEvent. This patch handles events without the data field properly by creating the CloudEvent object from record headers, if the above conditions apply. Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Make code simpler, handle exceptions, change method name Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> Co-authored-by: Pierangelo Di Pilato <pdipilat@redhat.com>
1 parent 2c5e702 commit f4a3bf0

File tree

4 files changed

+130
-35
lines changed

4 files changed

+130
-35
lines changed

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,22 @@
2121
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher;
2222
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
2323
import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler;
24+
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
25+
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils;
2426
import io.cloudevents.CloudEvent;
2527
import io.vertx.core.Context;
2628
import io.vertx.core.Future;
2729
import io.vertx.core.Promise;
2830
import io.vertx.core.Vertx;
2931
import io.vertx.kafka.client.common.tracing.ConsumerTracer;
3032
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
33+
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3136

3237
import java.util.Objects;
3338
import java.util.function.Function;
3439

35-
import org.slf4j.Logger;
36-
import org.slf4j.LoggerFactory;
37-
3840
import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;
3941

4042
/**
@@ -47,6 +49,8 @@ public class RecordDispatcherImpl implements RecordDispatcher {
4749

4850
private static final Logger logger = LoggerFactory.getLogger(RecordDispatcherImpl.class);
4951

52+
private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();
53+
5054
private final Filter filter;
5155
private final Function<KafkaConsumerRecord<Object, CloudEvent>, Future<Void>> subscriberSender;
5256
private final Function<KafkaConsumerRecord<Object, CloudEvent>, Future<Void>> dlsSender;
@@ -92,8 +96,6 @@ public RecordDispatcherImpl(
9296
*/
9397
@Override
9498
public Future<Void> dispatch(KafkaConsumerRecord<Object, CloudEvent> record) {
95-
Promise<Void> promise = Promise.promise();
96-
9799
/*
98100
That's pretty much what happens here:
99101
@@ -116,9 +118,23 @@ public Future<Void> dispatch(KafkaConsumerRecord<Object, CloudEvent> record) {
116118
+->end<--+
117119
*/
118120

119-
onRecordReceived(record, promise);
120-
121-
return promise.future();
121+
try {
122+
Promise<Void> promise = Promise.promise();
123+
onRecordReceived(maybeDeserializeValueFromHeaders(record), promise);
124+
return promise.future();
125+
} catch (final Exception ex) {
126+
// This is a fatal exception that shouldn't happen in normal cases.
127+
//
128+
// It might happen if folks send bad records to a topic that is
129+
// managed by our system.
130+
//
131+
// So discard record if we can't deal with the record, so that we can
132+
// make progress in the partition.
133+
logError("Exception occurred, discarding the record", record, ex);
134+
recordDispatcherListener.recordReceived(record);
135+
recordDispatcherListener.recordDiscarded(record);
136+
return Future.failedFuture(ex);
137+
}
122138
}
123139

124140
private void onRecordReceived(final KafkaConsumerRecord<Object, CloudEvent> record, Promise<Void> finalProm) {
@@ -192,6 +208,23 @@ private void onDeadLetterSinkFailure(final KafkaConsumerRecord<Object, CloudEven
192208
finalProm.complete();
193209
}
194210

211+
private static KafkaConsumerRecord<Object, CloudEvent> maybeDeserializeValueFromHeaders(KafkaConsumerRecord<Object, CloudEvent> record) {
212+
if (record.value() != null) {
213+
return record;
214+
}
215+
// A valid CloudEvent in the CE binary protocol binding of Kafka
216+
// might be composed by only Headers.
217+
//
218+
// KafkaConsumer doesn't call the deserializer if the value
219+
// is null.
220+
//
221+
// That means that we get a record with a null value and some CE
222+
// headers even though the record is a valid CloudEvent.
223+
logDebug("Value is null", record);
224+
final var value = cloudEventDeserializer.deserialize(record.record().topic(), record.record().headers(), null);
225+
return new KafkaConsumerRecordImpl<>(KafkaConsumerRecordUtils.copyRecordAssigningValue(record.record(), value));
226+
}
227+
195228
private static Function<KafkaConsumerRecord<Object, CloudEvent>, Future<Void>> composeSenderAndSinkHandler(
196229
CloudEventSender sender, ResponseHandler sinkHandler, String senderType) {
197230
return rec -> sender.send(rec.value())

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/InvalidCloudEventInterceptor.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.stream.Stream;
4141

4242
import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;
43+
import static dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils.copyRecordAssigningValue;
4344
import static io.cloudevents.kafka.PartitionKeyExtensionInterceptor.PARTITION_KEY_EXTENSION;
4445

4546
/**
@@ -174,20 +175,7 @@ private ConsumerRecord<Object, CloudEvent> validRecord(final ConsumerRecord<Obje
174175
}
175176

176177
// Copy consumer record and set value to a valid CloudEvent.
177-
return new ConsumerRecord<>(
178-
record.topic(),
179-
record.partition(),
180-
record.offset(),
181-
record.timestamp(),
182-
record.timestampType(),
183-
record.checksum(),
184-
record.serializedKeySize(),
185-
record.serializedValueSize(),
186-
record.key(),
187-
value.build(),
188-
record.headers(),
189-
record.leaderEpoch()
190-
);
178+
return copyRecordAssigningValue(record, value.build());
191179
}
192180

193181
private static void setKey(CloudEventBuilder value, final Object key) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;
17+
18+
import io.cloudevents.CloudEvent;
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
21+
public final class KafkaConsumerRecordUtils {
22+
23+
private KafkaConsumerRecordUtils() {
24+
}
25+
26+
public static <T> ConsumerRecord<T, CloudEvent> copyRecordAssigningValue(final ConsumerRecord<T, CloudEvent> record,
27+
final CloudEvent value) {
28+
return new ConsumerRecord<>(
29+
record.topic(),
30+
record.partition(),
31+
record.offset(),
32+
record.timestamp(),
33+
record.timestampType(),
34+
record.checksum(),
35+
record.serializedKeySize(),
36+
record.serializedValueSize(),
37+
record.key(),
38+
value,
39+
record.headers(),
40+
record.leaderEpoch()
41+
);
42+
}
43+
}

data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.io.File;
5858
import java.io.IOException;
5959
import java.net.URI;
60+
import java.util.List;
6061
import java.util.Properties;
6162
import java.util.UUID;
6263
import java.util.concurrent.CountDownLatch;
@@ -117,35 +118,40 @@ public static void setUp(final Vertx vertx, final VertxTestContext context) thro
117118

118119
/*
119120
1: event sent by the source to the Broker
120-
2: event sent by the service in the response
121+
2: event sent by the trigger 1 in the response
122+
3: event sent by the trigger 2 in the response
121123
2
122124
+----------------------+
123125
| |
124126
| +-----+-----+
125127
| 1 | |
126128
| +---------->+ Trigger 1 |
127-
v | | |
129+
v | 3 | |
128130
+------------+ +-------------+ +-------+----+----+ +-----------+
129131
| | 1 | | 2 | |
130132
| HTTPClient +--------->+ Receiver | +--------+ Dispatcher |
131133
| | | | | | |
132134
+------------+ +------+------+ | +--------+---+----+ +-----------+
133-
| | ^ | | |
135+
| | ^ | 3 | |
134136
| v | +---------->+ Trigger 2 |
135137
1 | +--------+--------+ | 2 | |
136138
| | | 1 | +-----------+
137139
+----->+ Kafka +--------+
138140
| | 2 +-----------+
139-
+-----------------+ | |
141+
+-----------------+ 3 | |
140142
| Trigger 3 |
141143
| |
142144
+-----------+
145+
146+
147+
148+
143149
*/
144150
@Test
145151
@Timeout(timeUnit = TimeUnit.MINUTES, value = 1)
146-
public void execute(final Vertx vertx, final VertxTestContext context) {
152+
public void execute(final Vertx vertx, final VertxTestContext context) throws InterruptedException {
147153

148-
final var checkpoints = context.checkpoint(3);
154+
final var checkpoints = context.checkpoint(4);
149155

150156
// event sent by the source to the Broker (see 1 in diagram)
151157
final var expectedRequestEvent = CloudEventBuilder.v1()
@@ -158,7 +164,7 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
158164
.build();
159165

160166
// event sent in the response by the Callable service (see 2 in diagram)
161-
final var expectedResponseEvent = CloudEventBuilder.v03()
167+
final var expectedResponseEventService2 = CloudEventBuilder.v03()
162168
.withId(UUID.randomUUID().toString())
163169
.withDataSchema(URI.create("/api/data-schema-ce-2"))
164170
.withSubject("subject-ce-2")
@@ -167,6 +173,20 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
167173
.withType(TYPE_CE_2)
168174
.build();
169175

176+
// event sent in the response by the Callable service 2 (see 3 in diagram)
177+
final var expectedResponseEventService1 = CloudEventBuilder.v1()
178+
.withId(UUID.randomUUID().toString())
179+
.withDataSchema(URI.create("/api/data-schema-ce-3"))
180+
.withSource(URI.create("/api/rossi"))
181+
.withSubject("subject-ce-3")
182+
.withType(TYPE_CE_1)
183+
.build();
184+
185+
final var service1ExpectedEventsIterator = List.of(
186+
expectedRequestEvent,
187+
expectedResponseEventService1
188+
).iterator();
189+
170190
final var resource = DataPlaneContract.Resource.newBuilder()
171191
.addTopics(TOPIC)
172192
.setIngress(DataPlaneContract.Ingress.newBuilder().setPath(format("/%s/%s", BROKER_NAMESPACE, BROKER_NAME)))
@@ -207,9 +227,13 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
207227
new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS)
208228
.accept(DataPlaneContract.Contract.newBuilder().addResources(resource).build());
209229

210-
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertThat(vertx.deploymentIDs())
230+
await()
231+
.atMost(10, TimeUnit.SECONDS)
232+
.untilAsserted(() -> assertThat(vertx.deploymentIDs())
211233
.hasSize(resource.getEgressesCount() + NUM_RESOURCES + NUM_SYSTEM_VERTICLES));
212234

235+
Thread.sleep(2000); // Give consumers time to start
236+
213237
// start service
214238
vertx.createHttpServer()
215239
.exceptionHandler(context::failNow)
@@ -221,22 +245,29 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
221245

222246
// service 1 receives event sent by the HTTPClient
223247
if (request.path().equals(PATH_SERVICE_1)) {
248+
final var expectedEvent = service1ExpectedEventsIterator.next();
224249
context.verify(() -> {
225-
assertThat(event).isEqualTo(expectedRequestEvent);
250+
assertThat(event).isEqualTo(expectedEvent);
226251
checkpoints.flag(); // 2
227252
});
228253

229-
// write event to the response, the event will be handled by service 2
230-
VertxMessageFactory.createWriter(request.response())
231-
.writeBinary(expectedResponseEvent);
254+
if (service1ExpectedEventsIterator.hasNext()) {
255+
// write event to the response, the event will be handled by service 2
256+
VertxMessageFactory.createWriter(request.response())
257+
.writeBinary(expectedResponseEventService2);
258+
}
232259
}
233260

234261
// service 2 receives event in the response
235262
if (request.path().equals(PATH_SERVICE_2)) {
236263
context.verify(() -> {
237-
assertThat(event).isEqualTo(expectedResponseEvent);
264+
assertThat(event).isEqualTo(expectedResponseEventService2);
238265
checkpoints.flag(); // 3
239266
});
267+
268+
// write event to the response, the event will be handled by service 2
269+
VertxMessageFactory.createWriter(request.response())
270+
.writeBinary(expectedResponseEventService1);
240271
}
241272

242273
if (request.path().equals(PATH_SERVICE_3)) {

0 commit comments

Comments
 (0)