Skip to content

Commit a3a568a

Browse files
committed
Update consumer and producer docs
1 parent 82a4bd6 commit a3a568a

File tree

24 files changed

+150
-182
lines changed

24 files changed

+150
-182
lines changed
Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
# Consumer Avro Generic
22

3-
This module demonstrates the following:
4-
5-
- The use of the Kafka Clients consumer API.
6-
- The use of Apache Avro and generic records.
7-
- Unit testing using a Mock consumer.
8-
93
This module consumes records of type `<String, GenericRecord>` from a topic named `USER_TOPIC`.
4+
It demonstrates the following:
5+
- Use of the Kafka Clients consumer API
6+
- Use of Apache Avro and generic records
7+
- Unit testing with a mock consumer
108

119
## Prerequisites
1210

13-
To compile and run this demo, you will need the following:
11+
To compile and run this demo, you’ll need:
1412

1513
- Java 21
1614
- Maven
@@ -21,7 +19,7 @@ To compile and run this demo, you will need the following:
2119
To run the application manually:
2220

2321
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
24-
- Produce records of type `<String, GenericRecord>` to a topic named `USER_TOPIC`. You can use the [producer Avro Generic](../../kafka-producer-quickstarts/kafka-producer-avro-generic) to do this.
22+
- Produce records of type `<String, GenericRecord>` to the `USER_TOPIC`. You can use the [producer Avro Generic](../../kafka-producer-quickstarts/kafka-producer-avro-generic) for this.
2523
- Start the consumer.
2624

2725
To run the application in Docker, use the following command:
@@ -30,10 +28,10 @@ To run the application in Docker, use the following command:
3028
docker-compose up -d
3129
```
3230

33-
This command will start the following services in Docker:
31+
This command starts the following services in Docker:
3432

3533
- 1 Kafka broker (KRaft mode)
36-
- 1 Schema registry
34+
- 1 Schema Registry
3735
- 1 Control Center
38-
- 1 producer Avro Generic
39-
- 1 consumer Avro Generic
36+
- 1 Producer Avro Generic
37+
- 1 Consumer Avro Generic

kafka-consumer-quickstarts/kafka-consumer-avro-generic/src/main/java/io/github/loicgreffier/consumer/avro/generic/app/ConsumerRunner.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ public ConsumerRunner(Consumer<String, GenericRecord> consumer) {
5050
}
5151

5252
/**
53-
* Asynchronously starts the Kafka consumer when the application is ready. The asynchronous annotation is used to
54-
* run the consumer in a separate thread and not block the main thread. The Kafka consumer processes generic Avro
55-
* records from the USER_TOPIC topic, so it does not require to know the schema of the records.
53+
* Asynchronously starts the Kafka consumer once the application is ready. The {@code @Async} annotation ensures
54+
* that the consumer runs in a separate thread, preventing the main thread from being blocked. The Kafka consumer
55+
* processes generic Avro records from the USER_TOPIC topic, so it does not require to know the schema of the
56+
* records.
5657
*/
5758
@Async
5859
@EventListener(ApplicationReadyEvent.class)
Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
# Consumer Avro Specific
22

3-
This module demonstrates the following:
4-
5-
- The use of the Kafka Clients consumer API.
6-
- The use of Apache Avro and specific records.
7-
- Unit testing using a Mock consumer.
8-
93
This module consumes records of type `<String, KafkaUser>` from a topic named `USER_TOPIC`.
4+
It demonstrates the following:
5+
- Use of the Kafka Clients consumer API
6+
- Use of Apache Avro and specific records
7+
- Unit testing with a mock consumer
108

119
## Prerequisites
1210

13-
To compile and run this demo, you will need the following:
11+
To compile and run this demo, you’ll need:
1412

1513
- Java 21
1614
- Maven
@@ -21,7 +19,7 @@ To compile and run this demo, you will need the following:
2119
To run the application manually:
2220

2321
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
24-
- Produce records of type `<String, KafkaUser>` to a topic named `USER_TOPIC`. You can use the [producer Avro Specific](../../kafka-producer-quickstarts/kafka-producer-avro-specific) to do this.
22+
- Produce records of type `<String, KafkaUser>` to the `USER_TOPIC`. You can use the [producer Avro Specific](../../kafka-producer-quickstarts/kafka-producer-avro-specific) for this.
2523
- Start the consumer.
2624

2725
To run the application in Docker, use the following command:
@@ -30,10 +28,10 @@ To run the application in Docker, use the following command:
3028
docker-compose up -d
3129
```
3230

33-
This command will start the following services in Docker:
31+
This command starts the following services in Docker:
3432

3533
- 1 Kafka broker (KRaft mode)
36-
- 1 Schema registry
34+
- 1 Schema Registry
3735
- 1 Control Center
38-
- 1 producer Avro Specific
39-
- 1 consumer Avro Specific
36+
- 1 Producer Avro Specific
37+
- 1 Consumer Avro Specific

kafka-consumer-quickstarts/kafka-consumer-avro-specific/src/main/java/io/github/loicgreffier/consumer/avro/specific/app/ConsumerRunner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ public ConsumerRunner(Consumer<String, KafkaUser> consumer) {
5050
}
5151

5252
/**
53-
* Asynchronously starts the Kafka consumer when the application is ready. The asynchronous annotation is used to
54-
* run the consumer in a separate thread and not block the main thread. The Kafka consumer processes specific Avro
55-
* records from the USER_TOPIC topic.
53+
* Asynchronously starts the Kafka consumer once the application is ready. The {@code @Async} annotation ensures
54+
* that the consumer runs in a separate thread, preventing the main thread from being blocked. The Kafka consumer
55+
* processes specific Avro records from the USER_TOPIC topic.
5656
*/
5757
@Async
5858
@EventListener(ApplicationReadyEvent.class)
Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,15 @@
11
# Consumer Circuit Breaker
22

3-
This module demonstrates the following:
4-
5-
- The use of the Kafka Clients consumer API.
6-
- The use of Apache Avro
7-
- Unit testing using a Mock consumer.
8-
9-
This module performs the following tasks:
10-
11-
- Consume records of type `<String, KafkaUser>` from a topic named `USER_TOPIC`.
12-
- Handle deserialization exceptions and seek the poison pill.
13-
- If a poison pill is found in the middle of a batch of good records, the `poll()` method will return the good records
14-
in the first loop and then throw the deserialization exception in the second loop.
15-
- To ensure no good records are lost, the module seeks to the next offset.
3+
This module consumes records of type `<String, KafkaUser>` from a topic named `USER_TOPIC`, handling deserialization exceptions.
4+
It demonstrates the following:
5+
- Use of the Kafka Clients consumer API
6+
- How to handle poison pills in a Kafka consumer
7+
- Use of Apache Avro
8+
- Unit testing with a mock consumer
169

1710
## Prerequisites
1811

19-
To compile and run this demo, you will need the following:
12+
To compile and run this demo, you’ll need:
2013

2114
- Java 21
2215
- Maven
@@ -27,10 +20,10 @@ To compile and run this demo, you will need the following:
2720
To run the application manually:
2821

2922
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
30-
- Produce records of type `<String, KafkaUser>` to a topic named `USER_TOPIC`. You can use the [producer Avro Specific](../../kafka-producer-quickstarts/kafka-producer-avro-specific) to do this.
31-
- Make sure a deserialization exception will occur by either:
32-
- Deleting the Avro schema from the schema registry.
33-
- Producing a String message in the middle of Avro messages.
23+
- Produce records of type `<String, KafkaUser>` to the `USER_TOPIC`. You can use the [producer Avro Specific](../../kafka-producer-quickstarts/kafka-producer-avro-specific) for this.
24+
- Ensure a deserialization exception will occur by either:
25+
- Deleting the Avro schema from the schema registry
26+
- Producing a String message in the middle of Avro messages
3427
- Start the consumer.
3528

3629
To run the application in Docker, use the following command:
@@ -39,10 +32,10 @@ To run the application in Docker, use the following command:
3932
docker-compose up -d
4033
```
4134

42-
This command will start the following services in Docker:
35+
This command starts the following services in Docker:
4336

4437
- 1 Kafka broker (KRaft mode)
45-
- 1 Schema registry
38+
- 1 Schema Registry
4639
- 1 Control Center
47-
- 1 producer Avro Specific
48-
- 1 Circuit Breaker consumer
40+
- 1 Producer Avro Specific
41+
- 1 Circuit Breaker Consumer

kafka-consumer-quickstarts/kafka-consumer-circuit-breaker/src/main/java/io/github/loicgreffier/consumer/circuit/breaker/app/ConsumerRunner.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,12 @@ public ConsumerRunner(Consumer<String, KafkaUser> consumer) {
5151
}
5252

5353
/**
54-
* Asynchronously starts the Kafka consumer when the application is ready. The asynchronous annotation is used to
55-
* run the consumer in a separate thread and not block the main thread. The Kafka consumer processes messages from
56-
* the USER_TOPIC topic and handles deserialization errors. When a deserialization error occurs, the consumer seeks
57-
* to the next offset in order to skip the record that caused the error.
54+
* Asynchronously starts the Kafka consumer once the application is ready. The {@code @Async} annotation ensures
55+
* that the consumer runs in a separate thread, preventing the main thread from being blocked. The Kafka consumer
56+
* processes messages from the USER_TOPIC topic and handles deserialization errors. When a deserialization error
57+
* occurs, the consumer skips the problematic record by seeking to the next offset. Note that if a deserialization
58+
* error occurs in the middle of a batch, the poll operation * will still return the valid records, throw the
59+
* deserialization exception, and continue with the rest of the batch.
5860
*/
5961
@Async
6062
@EventListener(ApplicationReadyEvent.class)
Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
# Consumer Headers
22

3-
This module demonstrates the following:
4-
5-
- The use of the Kafka Clients consumer API.
6-
- The use of headers in Kafka records.
7-
- Unit testing using a Mock consumer.
8-
93
This module consumes records of type `<String, String>` with headers from a topic named `STRING_TOPIC`.
4+
It demonstrates the following:
5+
- Use of the Kafka Clients consumer API
6+
- Use of headers in Kafka records
7+
- Unit testing with a mock consumer
108

119
## Prerequisites
1210

13-
To compile and run this demo, you will need the following:
11+
To compile and run this demo, you’ll need:
1412

1513
- Java 21
1614
- Maven
@@ -21,7 +19,7 @@ To compile and run this demo, you will need the following:
2119
To run the application manually:
2220

2321
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
24-
- Produce records of type `<String, String>` with headers named `id` and `message` to a topic named `STRING_TOPIC`. You can use the [producer headers](../../kafka-producer-quickstarts/kafka-producer-headers) to do this.
22+
- Produce records of type `<String, String>` with a header named `message` to the `STRING_TOPIC`. You can use the [producer Headers](../../kafka-producer-quickstarts/kafka-producer-headers) for this.
2523
- Start the consumer.
2624

2725
To run the application in Docker, use the following command:
@@ -30,9 +28,9 @@ To run the application in Docker, use the following command:
3028
docker-compose up -d
3129
```
3230

33-
This command will start the following services in Docker:
31+
This command starts the following services in Docker:
3432

3533
- 1 Kafka broker (KRaft mode)
3634
- 1 Control Center
37-
- 1 producer Headers
38-
- 1 consumer Headers
35+
- 1 Producer Headers
36+
- 1 Consumer Headers

kafka-consumer-quickstarts/kafka-consumer-headers/src/main/java/io/github/loicgreffier/consumer/headers/app/ConsumerRunner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public ConsumerRunner(Consumer<String, String> consumer) {
5151
}
5252

5353
/**
54-
* Asynchronously starts the Kafka consumer when the application is ready. The asynchronous annotation is used to
55-
* run the consumer in a separate thread and not block the main thread. The Kafka consumer processes string records
56-
* with headers from the STRING_TOPIC topic.
54+
* Asynchronously starts the Kafka consumer once the application is ready. The {@code @Async} annotation ensures
55+
* that the consumer runs in a separate thread, preventing the main thread from being blocked. The Kafka consumer
56+
* processes string records with headers from the STRING_TOPIC topic.
5757
*/
5858
@Async
5959
@EventListener(ApplicationReadyEvent.class)
Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,13 @@
11
# Consumer Retry External System
22

3-
This module demonstrates the following:
4-
5-
- The use of the Kafka Clients consumer API.
6-
- Unit testing using a Mock consumer.
7-
8-
This module performs the following tasks:
9-
10-
- Consumes records of type `<String, String>` from a topic named `STRING_TOPIC`.
11-
- Sends the records to a fake external system.
12-
- Implements a strong retry mechanism to handle failures in the external system. If the call to the external system
13-
fails, the module retries the operation using the retry mechanism.
3+
This module consumes records of type `<String, String>` from a topic named `STRING_TOPIC`, handling failures in an external system and implementing a retry mechanism.
4+
It demonstrates the following:
5+
- Use of the Kafka Clients consumer API
6+
- Unit testing with a mock consumer
147

158
## Prerequisites
169

17-
To compile and run this demo, you will need the following:
10+
To compile and run this demo, you’ll need:
1811

1912
- Java 21
2013
- Maven
@@ -25,7 +18,7 @@ To compile and run this demo, you will need the following:
2518
To run the application manually:
2619

2720
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
28-
- Produce records of type `<String, String>` to a topic named `STRING_TOPIC`. You can use the [producer simple](../../kafka-producer-quickstarts/kafka-producer-simple) to do this.
21+
- Produce records of type `<String, String>` to the `STRING_TOPIC`. You can use the [producer Simple](../../kafka-producer-quickstarts/kafka-producer-simple) for this.
2922
- Start the consumer.
3023

3124
To run the application in Docker, use the following command:
@@ -34,9 +27,9 @@ To run the application in Docker, use the following command:
3427
docker-compose up -d
3528
```
3629

37-
This command will start the following services in Docker:
30+
This command starts the following services in Docker:
3831

3932
- 1 Kafka broker (KRaft mode)
4033
- 1 Control Center
41-
- 1 producer Simple
42-
- 1 consumer Retry External System
34+
- 1 Producer Simple
35+
- 1 Consumer Retry External System

kafka-consumer-quickstarts/kafka-consumer-retry-external-system/src/main/java/io/github/loicgreffier/consumer/retry/app/ConsumerRunner.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ public ConsumerRunner(
6565
}
6666

6767
/**
68-
* Asynchronously starts the Kafka consumer when the application is ready. The asynchronous annotation is used to
69-
* run the consumer in a separate thread and not block the main thread. The Kafka consumer processes messages from
70-
* the STRING_TOPIC topic. If an error occurs during the external system call, the consumer pauses the
71-
* topic-partitions. and rewinds to the failed record offset as a call to poll() has automatically advanced the
72-
* consumer offsets. The consumer being paused, it will not commit the offsets and the next call to poll() will not
73-
* return any records. Consequently, the consumer will honor the pause duration given by the poll() timeout. Once
74-
* the pause duration is elapsed, the consumer will resume the topic-partitions and consume the records from the
75-
* failed record offset.
68+
* Asynchronously starts the Kafka consumer once the application is ready. The {@code @Async} annotation ensures
69+
* that the consumer runs in a separate thread, preventing the main thread from being blocked. The Kafka consumer
70+
* processes messages from the STRING_TOPIC topic. If an error occurs during the external system call, the consumer
71+
* pauses the topic-partitions. and rewinds to the failed record offset as a call to poll() has automatically
72+
* advanced the consumer offsets. The consumer being paused, it will not commit the offsets and the next call to
73+
* poll() will not return any records. Consequently, the consumer will honor the pause duration given by the poll()
74+
* timeout. Once the pause duration is elapsed, the consumer will resume the topic-partitions and consume the
75+
* records from the failed record offset.
7676
*/
7777
@Async
7878
@EventListener(ApplicationReadyEvent.class)

0 commit comments

Comments
 (0)