Skip to content

Commit c3b0779

Browse files
authored
Commit offset on consumer close (#1463) (#1478)
* Commit offset on consumer close When a consumer is closed, we want to commit the offsets for all owned partitions. Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> * Comments Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
1 parent 897f426 commit c3b0779

File tree

7 files changed

+53
-13
lines changed

7 files changed

+53
-13
lines changed

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/AsyncCloseable.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import io.vertx.core.Future;
2222
import io.vertx.core.Promise;
2323
import io.vertx.core.Vertx;
24+
2425
import java.util.Arrays;
26+
import java.util.Objects;
2527
import java.util.stream.Collectors;
2628

2729
/**
@@ -53,15 +55,20 @@ static AutoCloseable toAutoCloseable(AsyncCloseable closeable) {
5355
}
5456

5557
/**
56-
* Compose several {@link AsyncCloseable} into a single {@link AsyncCloseable}. One close failure will cause the whole close to fail.
58+
* Compose several {@link AsyncCloseable}s into a single {@link AsyncCloseable}.
59+
* One close failure will cause the whole close to fail.
60+
* <p>
61+
* It filters null futures returned by individual {@link AsyncCloseable} on close.
5762
*
5863
* @param closeables the closeables to compose
5964
* @return the composed closeables
6065
*/
6166
static AsyncCloseable compose(AsyncCloseable... closeables) {
6267
return () -> CompositeFuture.all(
6368
Arrays.stream(closeables)
69+
.filter(Objects::nonNull)
6470
.map(AsyncCloseable::close)
71+
.filter(Objects::nonNull)
6572
.collect(Collectors.toList())
6673
).mapEmpty();
6774
}

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
*/
1616
package dev.knative.eventing.kafka.broker.dispatcher;
1717

18-
import io.vertx.core.Future;
18+
import dev.knative.eventing.kafka.broker.core.AsyncCloseable;
1919
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
2020

2121
/**
2222
* This class contains hooks for listening events through the {@link dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher} lifecycle.
2323
*/
24-
public interface RecordDispatcherListener {
24+
public interface RecordDispatcherListener extends AsyncCloseable {
2525

2626
/**
2727
* The given record has been received.

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public RecordDispatcherImpl(
8181
this.subscriberSender = composeSenderAndSinkHandler(subscriberSender, responseHandler, "subscriber");
8282
this.dlsSender = composeSenderAndSinkHandler(deadLetterSinkSender, responseHandler, "dead letter sink");
8383
this.recordDispatcherListener = recordDispatcherListener;
84-
this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender);
84+
this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender, recordDispatcherListener);
8585
this.consumerTracer = consumerTracer;
8686
}
8787

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,15 @@ public void start(Promise<Void> startPromise) {
6767
public void stop(Promise<Void> stopPromise) {
6868
logger.info("Stopping consumer");
6969

70-
AsyncCloseable.compose(
71-
this.consumer::close,
72-
this.recordDispatcher,
73-
this.closeable
74-
).close(stopPromise);
70+
final Promise<Void> dependenciesClosedPromise = Promise.promise();
71+
72+
// Close consumer after other objects have been closed.
73+
dependenciesClosedPromise.future()
74+
.onComplete(r -> this.consumer.close().onComplete(stopPromise));
75+
76+
AsyncCloseable
77+
.compose(this.recordDispatcher, this.closeable)
78+
.close(dependenciesClosedPromise);
7579
}
7680

7781
public void setConsumer(KafkaConsumer<String, CloudEvent> consumer) {

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;
1717

1818
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
19+
import io.vertx.core.CompositeFuture;
20+
import io.vertx.core.Future;
1921
import io.vertx.core.Vertx;
2022
import io.vertx.kafka.client.common.TopicPartition;
2123
import io.vertx.kafka.client.consumer.KafkaConsumer;
@@ -30,6 +32,7 @@
3032
import java.util.Map;
3133
import java.util.Objects;
3234
import java.util.function.Consumer;
35+
import java.util.stream.Collectors;
3336

3437
/**
3538
* This class implements the offset strategy that makes sure that, even unordered, the offset commit is ordered.
@@ -60,7 +63,7 @@ public OffsetManager(final Vertx vertx,
6063
this.offsetTrackers = new HashMap<>();
6164
this.onCommit = onCommit;
6265

63-
vertx.setPeriodic(commitIntervalMs, l -> this.offsetTrackers.forEach(this::commit));
66+
vertx.setPeriodic(commitIntervalMs, l -> commitAll());
6467
}
6568

6669
/**
@@ -115,7 +118,7 @@ private void commit(final KafkaConsumerRecord<?, ?> record) {
115118
.recordNewOffset(record.offset());
116119
}
117120

118-
private synchronized void commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
121+
private synchronized Future<Void> commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
119122
long newOffset = tracker.offsetToCommit();
120123
if (newOffset > tracker.getCommitted()) {
121124
// Reset the state
@@ -124,7 +127,7 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
124127
logger.debug("Committing offset for {} offset {}", topicPartition, newOffset);
125128

126129
// Execute the actual commit
127-
consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
130+
return consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
128131
.onSuccess(ignored -> {
129132
if (onCommit != null) {
130133
onCommit.accept((int) newOffset);
@@ -133,6 +136,27 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
133136
.onFailure(cause -> logger.error("failed to commit topic partition {} offset {}", topicPartition, newOffset, cause))
134137
.mapEmpty();
135138
}
139+
return null;
140+
}
141+
142+
/**
143+
* Commit all tracked offsets by colling commit on every offsetTracker entry.
144+
*
145+
* @return succeeded or failed future.
146+
*/
147+
private Future<Void> commitAll() {
148+
return CompositeFuture.all(
149+
this.offsetTrackers.entrySet()
150+
.stream()
151+
.map(e -> commit(e.getKey(), e.getValue()))
152+
.filter(Objects::nonNull)
153+
.collect(Collectors.toList())
154+
).mapEmpty();
155+
}
156+
157+
@Override
158+
public Future<Void> close() {
159+
return commitAll();
136160
}
137161

138162
/**

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ public void stop(Promise<Void> stopPromise) {
111111
}
112112

113113
void recordsHandler(KafkaConsumerRecords<String, CloudEvent> records) {
114+
if (records == null) {
115+
return;
116+
}
114117
// Put records in queues
115118
// I assume the records are ordered per topic-partition
116119
for (int i = 0; i < records.size(); i++) {

data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,15 @@ protected MapAssert<TopicPartition, Long> assertThatOffsetCommittedWithFailures(
100100
.when(vertxConsumer)
101101
.commit(any(Map.class));
102102

103-
testExecutor.accept(createOffsetManager(Vertx.vertx(), vertxConsumer), failureFlag);
103+
final var offsetManager = createOffsetManager(Vertx.vertx(), vertxConsumer);
104+
testExecutor.accept(offsetManager, failureFlag);
104105

105106
try {
106107
Thread.sleep(1000);
107108
} catch (final InterruptedException e) {
108109
throw new RuntimeException(e);
109110
}
111+
assertThat(offsetManager.close().succeeded()).isTrue();
110112

111113
final var committed = mockConsumer.committed(Set.copyOf(partitionsConsumed));
112114
return assertThat(

0 commit comments

Comments
 (0)