Skip to content

Commit e13f14d

Browse files
committed
Add a dedicated module for record headers access when consuming
1 parent a157758 commit e13f14d

File tree

41 files changed

+615
-183
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+615
-183
lines changed

README.md

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,25 @@ to simplify the development of applications.
2323
|---------------------------------------------------------------------------|---------------|---------------------------------------------------------------|
2424
| [Avro Generic](/kafka-producer-quickstarts/kafka-producer-avro-generic) | Kafka Clients | Produce generic Avro records |
2525
| [Avro Specific](/kafka-producer-quickstarts/kafka-producer-avro-specific) | Kafka Clients | Produce specific Avro records |
26-
| [Headers](/kafka-producer-quickstarts/kafka-producer-headers) | Kafka Clients | Add record headers and produce records |
26+
| [Headers](/kafka-producer-quickstarts/kafka-producer-headers) | Kafka Clients | Producer record with headers |
2727
| [Simple](/kafka-producer-quickstarts/kafka-producer-simple) | Kafka Clients | Produce String records |
2828
| [Transaction](/kafka-producer-quickstarts/kafka-producer-transaction) | Kafka Clients | Guarantee atomicity between multiple topics with transactions |
2929

3030
Each producer module provides unit tests with the MockProducer API.
3131

3232
### Consumers
3333

34-
| Module | Library | Main Concept | Additional Content |
35-
|:------------------------------------------------------------------------------------------|---------------|---------------------------------------------------------|------------------------|
36-
| [Avro Generic](/kafka-consumer-quickstarts/kafka-consumer-avro-generic) | Kafka Clients | Generic Avro consumption | Mock consumer |
37-
| [Avro Specific](/kafka-consumer-quickstarts/kafka-consumer-avro-specific) | Kafka Clients | Specific Avro consumption | Mock consumer |
38-
| [Circuit breaker](/kafka-consumer-quickstarts/kafka-consumer-circuit-breaker) | Kafka Clients | Deserialization/poison pill error handling | Mock consumer |
39-
| [Retry external system](/kafka-consumer-quickstarts/kafka-consumer-retry-external-system) | Kafka Clients | External system interfacing with strong retry mechanism | Mock consumer |
40-
| [Simple](/kafka-consumer-quickstarts/kafka-consumer-simple) | Kafka Clients | String consumption | Headers, Mock consumer |
41-
| [Transaction](/kafka-consumer-quickstarts/kafka-consumer-transaction) | Kafka Clients | Isolation level | |
34+
| Module | Library | Main Concept |
35+
|:------------------------------------------------------------------------------------------|---------------|-------------------------------------------------------|
36+
| [Avro Generic](/kafka-consumer-quickstarts/kafka-consumer-avro-generic) | Kafka Clients | Consume generic Avro records |
37+
| [Avro Specific](/kafka-consumer-quickstarts/kafka-consumer-avro-specific) | Kafka Clients | Consume specific Avro records |
38+
| [Circuit breaker](/kafka-consumer-quickstarts/kafka-consumer-circuit-breaker) | Kafka Clients | Handle poison pills |
39+
| [Headers](/kafka-consumer-quickstarts/kafka-consumer-headers) | Kafka Clients | Read records with headers |
40+
| [Retry external system](/kafka-consumer-quickstarts/kafka-consumer-retry-external-system) | Kafka Clients | Retry mechanism on external processing system failure |
41+
| [Simple](/kafka-consumer-quickstarts/kafka-consumer-simple) | Kafka Clients | Consume String records |
42+
| [Transaction](/kafka-consumer-quickstarts/kafka-consumer-transaction) | Kafka Clients | Consume records from committed transactions |
43+
44+
Each consumer module provides unit tests with the MockConsumer API.
4245

4346
### Kafka Streams
4447

kafka-consumer-quickstarts/kafka-consumer-avro-generic/README.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
This module demonstrates the following:
44

5-
- The usage of the Kafka Clients consumer DSL.
5+
- The usage of the Kafka Clients consumer API.
66
- The usage of Apache Avro and generic records.
77
- The usage of unit tests with a Mock consumer.
88

@@ -20,11 +20,8 @@ To compile and run this demo, you will need the following:
2020

2121
To run the application manually, please follow the steps below:
2222

23-
- Start
24-
a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp)
25-
in a Docker environment.
26-
- Produce records of type `<String, GenericRecord>` to a topic named `PERSON_TOPIC`. You can use
27-
the [producer Avro Generic](../../kafka-producer-quickstarts/kafka-producer-avro-generic) to do this.
23+
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
24+
- Produce records of type `<String, GenericRecord>` to a topic named `PERSON_TOPIC`. You can use the [producer Avro Generic](../../kafka-producer-quickstarts/kafka-producer-avro-generic) to do this.
2825
- Start the consumer.
2926

3027
To run the application in Docker, please use the following command:

kafka-consumer-quickstarts/kafka-consumer-avro-generic/src/main/java/io/github/loicgreffier/consumer/avro/generic/app/ConsumerRunner.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
@Slf4j
2525
@Component
2626
public class ConsumerRunner {
27-
2827
@Autowired
2928
private Consumer<String, GenericRecord> consumer;
3029

@@ -41,8 +40,7 @@ public void run() {
4140
try {
4241
log.info("Subscribing to {} topic", PERSON_TOPIC);
4342

44-
consumer.subscribe(Collections.singleton(PERSON_TOPIC),
45-
new CustomConsumerRebalanceListener());
43+
consumer.subscribe(Collections.singleton(PERSON_TOPIC), new CustomConsumerRebalanceListener());
4644

4745
while (true) {
4846
ConsumerRecords<String, GenericRecord> messages =

kafka-consumer-quickstarts/kafka-consumer-avro-generic/src/test/java/io/github/loicgreffier/consumer/avro/generic/KafkaConsumerAvroGenericApplicationTests.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.github.loicgreffier.consumer.avro.generic;
22

33
import static io.github.loicgreffier.consumer.avro.generic.constant.Topic.PERSON_TOPIC;
4-
import static org.assertj.core.api.Assertions.assertThat;
54
import static org.junit.jupiter.api.Assertions.assertThrows;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
66
import static org.mockito.ArgumentMatchers.any;
77
import static org.mockito.Mockito.times;
88
import static org.mockito.Mockito.verify;
@@ -36,8 +36,7 @@
3636
@ExtendWith(MockitoExtension.class)
3737
class KafkaConsumerAvroGenericApplicationTests {
3838
@Spy
39-
private MockConsumer<String, GenericRecord> mockConsumer =
40-
new MockConsumer<>(OffsetResetStrategy.EARLIEST);
39+
private MockConsumer<String, GenericRecord> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
4140

4241
@InjectMocks
4342
private ConsumerRunner consumerRunner;
@@ -73,8 +72,8 @@ void shouldConsumeSuccessfully() throws IOException {
7372

7473
consumerRunner.run();
7574

76-
assertThat(mockConsumer.closed()).isTrue();
77-
verify(mockConsumer, times(1)).commitSync();
75+
assertTrue(mockConsumer.closed());
76+
verify(mockConsumer).commitSync();
7877
}
7978

8079
@Test
@@ -101,8 +100,8 @@ void shouldFailOnPoisonPill() throws IOException {
101100

102101
assertThrows(RecordDeserializationException.class, () -> consumerRunner.run());
103102

104-
assertThat(mockConsumer.closed()).isTrue();
103+
assertTrue(mockConsumer.closed());
105104
verify(mockConsumer, times(3)).poll(any());
106-
verify(mockConsumer, times(1)).commitSync();
105+
verify(mockConsumer).commitSync();
107106
}
108107
}

kafka-consumer-quickstarts/kafka-consumer-avro-specific/README.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
This module demonstrates the following:
44

5-
- The usage of the Kafka Clients consumer DSL.
5+
- The usage of the Kafka Clients consumer API.
66
- The usage of Apache Avro and specific records.
77
- The usage of unit tests with a Mock consumer.
88

@@ -20,11 +20,8 @@ To compile and run this demo, you will need the following:
2020

2121
To run the application manually, please follow the steps below:
2222

23-
- Start
24-
a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp)
25-
in a Docker environment.
26-
- Produce records of type `<String, KafkaPerson>` to a topic named `PERSON_TOPIC`. You can use
27-
the [producer Avro Specific](../../kafka-producer-quickstarts/kafka-producer-avro-specific) to do this.
23+
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
24+
- Produce records of type `<String, KafkaPerson>` to a topic named `PERSON_TOPIC`. You can use the [producer Avro Specific](../../kafka-producer-quickstarts/kafka-producer-avro-specific) to do this.
2825
- Start the consumer.
2926

3027
To run the application in Docker, please use the following command:

kafka-consumer-quickstarts/kafka-consumer-avro-specific/src/main/java/io/github/loicgreffier/consumer/avro/specific/app/ConsumerRunner.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
@Slf4j
2525
@Component
2626
public class ConsumerRunner {
27-
2827
@Autowired
2928
private Consumer<String, KafkaPerson> consumer;
3029

@@ -40,8 +39,7 @@ public void run() {
4039
try {
4140
log.info("Subscribing to {} topic", PERSON_TOPIC);
4241

43-
consumer.subscribe(Collections.singleton(PERSON_TOPIC),
44-
new CustomConsumerRebalanceListener());
42+
consumer.subscribe(Collections.singleton(PERSON_TOPIC), new CustomConsumerRebalanceListener());
4543

4644
while (true) {
4745
ConsumerRecords<String, KafkaPerson> messages =

kafka-consumer-quickstarts/kafka-consumer-avro-specific/src/test/java/io/github/loicgreffier/consumer/avro/specific/KafkaConsumerAvroSpecificApplicationTests.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.github.loicgreffier.consumer.avro.specific;
22

33
import static io.github.loicgreffier.consumer.avro.specific.constant.Topic.PERSON_TOPIC;
4-
import static org.assertj.core.api.Assertions.assertThat;
54
import static org.junit.jupiter.api.Assertions.assertThrows;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
66
import static org.mockito.Mockito.any;
77
import static org.mockito.Mockito.times;
88
import static org.mockito.Mockito.verify;
@@ -30,8 +30,7 @@
3030
@ExtendWith(MockitoExtension.class)
3131
class KafkaConsumerAvroSpecificApplicationTests {
3232
@Spy
33-
private MockConsumer<String, KafkaPerson> mockConsumer =
34-
new MockConsumer<>(OffsetResetStrategy.EARLIEST);
33+
private MockConsumer<String, KafkaPerson> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
3534

3635
@InjectMocks
3736
private ConsumerRunner consumerRunner;
@@ -62,8 +61,8 @@ void shouldConsumeSuccessfully() {
6261

6362
consumerRunner.run();
6463

65-
assertThat(mockConsumer.closed()).isTrue();
66-
verify(mockConsumer, times(1)).commitSync();
64+
assertTrue(mockConsumer.closed());
65+
verify(mockConsumer).commitSync();
6766
}
6867

6968
@Test
@@ -85,8 +84,8 @@ void shouldFailOnPoisonPill() {
8584

8685
assertThrows(RecordDeserializationException.class, () -> consumerRunner.run());
8786

88-
assertThat(mockConsumer.closed()).isTrue();
87+
assertTrue(mockConsumer.closed());
8988
verify(mockConsumer, times(3)).poll(any());
90-
verify(mockConsumer, times(1)).commitSync();
89+
verify(mockConsumer).commitSync();
9190
}
9291
}

kafka-consumer-quickstarts/kafka-consumer-circuit-breaker/README.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
This module demonstrates the following:
44

5-
- The usage of the Kafka Clients consumer DSL.
5+
- The usage of the Kafka Clients consumer API.
66
- The usage of Apache Avro
77
- The usage of unit tests with a Mock consumer.
88

@@ -26,11 +26,8 @@ To compile and run this demo, you will need the following:
2626

2727
To run the application manually, please follow the steps below:
2828

29-
- Start
30-
a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp)
31-
in a Docker environment.
32-
- Produce records of type `<String, KafkaPerson>` to a topic named `PERSON_TOPIC`. You can use
33-
the [producer Avro Specific](../../kafka-producer-quickstarts/kafka-producer-avro-specific) to do this.
29+
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
30+
- Produce records of type `<String, KafkaPerson>` to a topic named `PERSON_TOPIC`. You can use the [producer Avro Specific](../../kafka-producer-quickstarts/kafka-producer-avro-specific) to do this.
3431
- Make sure a deserialization exception will occur by either:
3532
- Deleting the Avro schema from the schema registry.
3633
- Producing a String message in the middle of Avro messages.

kafka-consumer-quickstarts/kafka-consumer-circuit-breaker/src/main/java/io/github/loicgreffier/consumer/circuit/breaker/app/ConsumerRunner.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
@Slf4j
2626
@Component
2727
public class ConsumerRunner {
28-
2928
@Autowired
3029
private Consumer<String, KafkaPerson> consumer;
3130

@@ -43,8 +42,7 @@ public void run() {
4342
try {
4443
log.info("Subscribing to {} topic", PERSON_TOPIC);
4544

46-
consumer.subscribe(Collections.singleton(PERSON_TOPIC),
47-
new CustomConsumerRebalanceListener());
45+
consumer.subscribe(Collections.singleton(PERSON_TOPIC), new CustomConsumerRebalanceListener());
4846

4947
while (true) {
5048
try {

kafka-consumer-quickstarts/kafka-consumer-circuit-breaker/src/test/java/io/github/loicgreffier/consumer/circuit/breaker/KafkaConsumerCircuitBreakerApplicationTests.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.github.loicgreffier.consumer.circuit.breaker;
22

33
import static io.github.loicgreffier.consumer.circuit.breaker.constant.Topic.PERSON_TOPIC;
4-
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
55
import static org.mockito.Mockito.any;
66
import static org.mockito.Mockito.times;
77
import static org.mockito.Mockito.verify;
@@ -29,8 +29,7 @@
2929
@ExtendWith(MockitoExtension.class)
3030
class KafkaConsumerCircuitBreakerApplicationTests {
3131
@Spy
32-
private MockConsumer<String, KafkaPerson> mockConsumer =
33-
new MockConsumer<>(OffsetResetStrategy.EARLIEST);
32+
private MockConsumer<String, KafkaPerson> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
3433

3534
@InjectMocks
3635
private ConsumerRunner consumerRunner;
@@ -61,8 +60,8 @@ void shouldConsumeSuccessfully() {
6160

6261
consumerRunner.run();
6362

64-
assertThat(mockConsumer.closed()).isTrue();
65-
verify(mockConsumer, times(1)).commitSync();
63+
assertTrue(mockConsumer.closed());
64+
verify(mockConsumer).commitSync();
6665
}
6766

6867
@Test
@@ -96,9 +95,9 @@ void shouldBreakCircuitOnPoisonPill() {
9695

9796
consumerRunner.run();
9897

99-
assertThat(mockConsumer.closed()).isTrue();
98+
assertTrue(mockConsumer.closed());
10099
verify(mockConsumer, times(5)).poll(any());
101100
verify(mockConsumer, times(2)).commitSync();
102-
verify(mockConsumer, times(1)).seek(topicPartition, 2);
101+
verify(mockConsumer).seek(topicPartition, 2);
103102
}
104103
}

0 commit comments

Comments
 (0)