Skip to content

Commit a157758

Browse files
committed
Add a dedicated module for record headers access when producing
1 parent cb21d7c commit a157758

File tree

24 files changed

+438
-71
lines changed

24 files changed

+438
-71
lines changed

README.md

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ to simplify the development of applications.
1919

2020
### Producers
2121

22-
| Module | Library | Main Concept | Additional Content |
23-
|---------------------------------------------------------------------------|---------------|--------------------------|------------------------|
24-
| [Avro Generic](/kafka-producer-quickstarts/kafka-producer-avro-generic) | Kafka Clients | Generic Avro production | Mock producer |
25-
| [Avro Specific](/kafka-producer-quickstarts/kafka-producer-avro-specific) | Kafka Clients | Specific Avro production | Mock producer |
26-
| [Simple](/kafka-producer-quickstarts/kafka-producer-simple) | Kafka Clients | String production | Headers, Mock producer |
27-
| [Transaction](/kafka-producer-quickstarts/kafka-producer-transaction) | Kafka Clients | Transaction | Mock producer |
22+
| Module | Library | Main Concept |
23+
|---------------------------------------------------------------------------|---------------|---------------------------------------------------------------|
24+
| [Avro Generic](/kafka-producer-quickstarts/kafka-producer-avro-generic) | Kafka Clients | Produce generic Avro records |
25+
| [Avro Specific](/kafka-producer-quickstarts/kafka-producer-avro-specific) | Kafka Clients | Produce specific Avro records |
26+
| [Headers](/kafka-producer-quickstarts/kafka-producer-headers) | Kafka Clients | Add record headers and produce records |
27+
| [Simple](/kafka-producer-quickstarts/kafka-producer-simple) | Kafka Clients | Produce String records |
28+
| [Transaction](/kafka-producer-quickstarts/kafka-producer-transaction) | Kafka Clients | Guarantee atomicity between multiple topics with transactions |
29+
30+
Each producer module provides unit tests with the MockProducer API.
2831

2932
### Consumers
3033

kafka-producer-quickstarts/kafka-producer-avro-generic/README.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
This module demonstrates the following:
44

5-
- The usage of the Kafka Clients producer DSL.
5+
- The usage of the Kafka Clients producer API.
66
- The usage of Apache Avro and generic records.
77
- The usage of unit tests with a Mock producer.
88

@@ -20,9 +20,7 @@ To compile and run this demo, you will need the following:
2020

2121
To run the application manually, please follow the steps below:
2222

23-
- Start
24-
a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp)
25-
in a Docker environment.
23+
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
2624
- Start the producer.
2725

2826
To run the application in Docker, please use the following command:

kafka-producer-quickstarts/kafka-producer-avro-generic/src/main/java/io/github/loicgreffier/producer/avro/generic/app/ProducerRunner.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,11 @@ public void run() throws IOException {
4949

5050
int i = 0;
5151
while (true) {
52-
ProducerRecord<String, GenericRecord> message = new ProducerRecord<>(PERSON_TOPIC,
53-
String.valueOf(i), buildGenericRecord(schema, i));
52+
ProducerRecord<String, GenericRecord> message = new ProducerRecord<>(
53+
PERSON_TOPIC,
54+
String.valueOf(i),
55+
buildGenericRecord(schema, i)
56+
);
5457

5558
send(message);
5659

kafka-producer-quickstarts/kafka-producer-avro-generic/src/test/java/io/github/loicgreffier/producer/avro/generic/KafkaProducerAvroGenericApplicationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ class KafkaProducerAvroGenericApplicationTests {
4242
};
4343

4444
@Spy
45-
private MockProducer<String, GenericRecord> mockProducer =
46-
new MockProducer<>(false, new StringSerializer(), serializer);
45+
private MockProducer<String, GenericRecord> mockProducer = new MockProducer<>(
46+
false, new StringSerializer(), serializer);
4747

4848
@InjectMocks
4949
private ProducerRunner producerRunner;

kafka-producer-quickstarts/kafka-producer-avro-specific/README.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
This module demonstrates the following:
44

5-
- The usage of the Kafka Clients producer DSL.
5+
- The usage of the Kafka Clients producer API.
66
- The usage of Apache Avro and specific records.
77
- The usage of unit tests with a Mock producer.
88

@@ -20,9 +20,7 @@ To compile and run this demo, you will need the following:
2020

2121
To run the application manually, please follow the steps below:
2222

23-
- Start
24-
a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp)
25-
in a Docker environment.
23+
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
2624
- Start the producer.
2725

2826
To run the application in Docker, please use the following command:

kafka-producer-quickstarts/kafka-producer-avro-specific/src/main/java/io/github/loicgreffier/producer/avro/specific/app/ProducerRunner.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@ public class ProducerRunner {
4040
public void run() {
4141
int i = 0;
4242
while (true) {
43-
ProducerRecord<String, KafkaPerson> message =
44-
new ProducerRecord<>(PERSON_TOPIC, String.valueOf(i), buildKafkaPerson(i));
43+
ProducerRecord<String, KafkaPerson> message = new ProducerRecord<>(
44+
PERSON_TOPIC,
45+
String.valueOf(i),
46+
buildKafkaPerson(i)
47+
);
4548

4649
send(message);
4750

kafka-producer-quickstarts/kafka-producer-avro-specific/src/test/java/io/github/loicgreffier/producer/avro/specific/KafkaProducerAvroSpecificApplicationTests.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,20 @@ class KafkaProducerAvroSpecificApplicationTests {
3737
};
3838

3939
@Spy
40-
private MockProducer<String, KafkaPerson> mockProducer =
41-
new MockProducer<>(false, new StringSerializer(), serializer);
40+
private MockProducer<String, KafkaPerson> mockProducer = new MockProducer<>(
41+
false, new StringSerializer(), serializer);
4242

4343
@InjectMocks
4444
private ProducerRunner producerRunner;
4545

4646
@Test
4747
void shouldSendSuccessfully() throws ExecutionException, InterruptedException {
48-
ProducerRecord<String, KafkaPerson> message =
49-
new ProducerRecord<>(PERSON_TOPIC, "1", KafkaPerson.newBuilder()
50-
.setId(1L)
51-
.setFirstName("John")
52-
.setLastName("Doe")
53-
.setBirthDate(Instant.parse("2000-01-01T01:00:00.00Z"))
54-
.build());
48+
ProducerRecord<String, KafkaPerson> message = new ProducerRecord<>(PERSON_TOPIC, "1", KafkaPerson.newBuilder()
49+
.setId(1L)
50+
.setFirstName("John")
51+
.setLastName("Doe")
52+
.setBirthDate(Instant.parse("2000-01-01T01:00:00.00Z"))
53+
.build());
5554

5655
Future<RecordMetadata> record = producerRunner.send(message);
5756
mockProducer.completeNext();
@@ -65,13 +64,12 @@ void shouldSendSuccessfully() throws ExecutionException, InterruptedException {
6564

6665
@Test
6766
void shouldSendWithFailure() {
68-
ProducerRecord<String, KafkaPerson> message =
69-
new ProducerRecord<>(PERSON_TOPIC, "1", KafkaPerson.newBuilder()
70-
.setId(1L)
71-
.setFirstName("John")
72-
.setLastName("Doe")
73-
.setBirthDate(Instant.parse("2000-01-01T01:00:00.00Z"))
74-
.build());
67+
ProducerRecord<String, KafkaPerson> message = new ProducerRecord<>(PERSON_TOPIC, "1", KafkaPerson.newBuilder()
68+
.setId(1L)
69+
.setFirstName("John")
70+
.setLastName("Doe")
71+
.setBirthDate(Instant.parse("2000-01-01T01:00:00.00Z"))
72+
.build());
7573

7674
Future<RecordMetadata> record = producerRunner.send(message);
7775
RuntimeException exception = new RuntimeException("Error sending message");
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Producer Headers
2+
3+
This module demonstrates the following:
4+
5+
- The usage of the Kafka Clients producer API.
6+
- The usage of headers in Kafka records.
7+
- The usage of unit tests with a Mock producer.
8+
9+
This module produces records of type `<String, String>` to a topic named `STRING_TOPIC`.
10+
11+
## Requirements
12+
13+
To compile and run this demo, you will need the following:
14+
15+
- Java 21
16+
- Maven
17+
- Docker
18+
19+
## Running the Application
20+
21+
To run the application manually, please follow the steps below:
22+
23+
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
24+
- Start the producer.
25+
26+
To run the application in Docker, please use the following command:
27+
28+
```console
29+
docker-compose up -d
30+
```
31+
32+
This command will start the following services in Docker:
33+
34+
- 1 Zookeeper
35+
- 1 Kafka broker
36+
- 1 Control Center
37+
- 1 producer headers
38+
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
---
2+
version: '2'
3+
services:
4+
broker:
5+
image: confluentinc/cp-kafka:7.4.1
6+
hostname: broker
7+
container_name: broker
8+
networks:
9+
- spring-boot-kafka-quickstarts
10+
ports:
11+
- "9092:9092"
12+
- "9101:9101"
13+
environment:
14+
KAFKA_NODE_ID: 1
15+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
16+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
17+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
18+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
19+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
20+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
21+
KAFKA_JMX_PORT: 9101
22+
KAFKA_JMX_HOSTNAME: localhost
23+
KAFKA_PROCESS_ROLES: 'broker,controller'
24+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
25+
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
26+
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
27+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
28+
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
29+
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
30+
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
31+
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
32+
33+
control-center:
34+
image: confluentinc/cp-enterprise-control-center:7.4.1
35+
hostname: control-center
36+
container_name: control-center
37+
networks:
38+
- spring-boot-kafka-quickstarts
39+
depends_on:
40+
- broker
41+
ports:
42+
- "9021:9021"
43+
environment:
44+
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
45+
CONTROL_CENTER_REPLICATION_FACTOR: 1
46+
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
47+
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
48+
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
49+
PORT: 9021
50+
51+
kafka-producer-headers:
52+
image: loicgreffier/spring-boot-kafka-quickstarts:kafka-producer-headers-1.0.0
53+
hostname: kafka-producer-headers
54+
container_name: kafka-producer-headers
55+
networks:
56+
- spring-boot-kafka-quickstarts
57+
depends_on:
58+
- broker
59+
ports:
60+
- "8082:8080"
61+
restart: unless-stopped
62+
environment:
63+
SPRING_APPLICATION_JSON: |
64+
{
65+
"kafka": {
66+
"properties": {
67+
"bootstrap.servers": "broker:29092"
68+
}
69+
}
70+
}
71+
72+
networks:
73+
spring-boot-kafka-quickstarts:
74+
driver: bridge
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>io.github.loicgreffier</groupId>
8+
<artifactId>kafka-producer-quickstarts</artifactId>
9+
<version>1.0.0</version>
10+
</parent>
11+
12+
<artifactId>kafka-producer-headers</artifactId>
13+
</project>

0 commit comments

Comments
 (0)