Skip to content

Commit 4df15b7

Browse files
committed
Add reconciliation use-case
1 parent 866f0e9 commit 4df15b7

File tree

32 files changed

+1395
-8
lines changed

32 files changed

+1395
-8
lines changed

README.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,12 @@ Code samples around Kafka Clients and Kafka Streams leveraging Spring Boot to si
110110

111111
#### Processor
112112

113-
| Module | Library | Content | DSL | Processor API |
114-
|:---------------------------------------------------------------------------------|---------------|-------------------------------------------------------------------------|-------------------|----------------------------------------------|
115-
| [Process](/kafka-streams-quickstarts/kafka-streams-process) | Kafka Streams | Apply a processor to a stream | `process()` | `context()`, `forward()`, `Record#headers()` |
116-
| [ProcessValues](/kafka-streams-quickstarts/kafka-streams-process-values) | Kafka Streams | Apply a fixed key processor to a stream | `processValues()` | `context()`, `forward()`, `Record#headers()` |
117-
| [Schedule](/kafka-streams-quickstarts/kafka-streams-schedule) | Kafka Streams | Schedule punctuation functions based on wall clock time and stream time | `process()` | `schedule()`, `getStateStore()` |
113+
| Module | Library | Content | DSL | Processor API |
114+
|:--------------------------------------------------------------------------|---------------|-------------------------------------------------------------------------|----------------------------------------------------------------|----------------------------------------------------|
115+
| [Process](/kafka-streams-quickstarts/kafka-streams-process) | Kafka Streams | Apply a processor to a stream | `process()` | `context()`, `forward()`, `Record#headers()` |
116+
| [ProcessValues](/kafka-streams-quickstarts/kafka-streams-process-values) | Kafka Streams | Apply a fixed key processor to a stream | `processValues()` | `context()`, `forward()`, `Record#headers()` |
117+
| [Reconciliation](/kafka-streams-quickstarts/kafka-streams-reconciliation) | Kafka Streams | Reconcile events across two streams | `selectKey()`, `repartition()`, `process()`, `addStateStore()` | `getStateStore()`, `forward()`, `Record#headers()` |
118+
| [Schedule](/kafka-streams-quickstarts/kafka-streams-schedule) | Kafka Streams | Schedule punctuation functions based on wall clock time and stream time | `process()` | `schedule()`, `getStateStore()` |
118119

119120
#### Stores
120121

kafka-producer-quickstarts/kafka-producer-avro-generic/src/main/java/io/github/loicgreffier/producer/avro/generic/app/ProducerRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
@Slf4j
4545
@Component
4646
public class ProducerRunner {
47-
private Random random = new Random();
47+
private final Random random = new Random();
4848
private final Producer<String, GenericRecord> producer;
4949

5050
/**

kafka-producer-quickstarts/kafka-producer-avro-specific/src/main/java/io/github/loicgreffier/producer/avro/specific/app/ProducerRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
@Slf4j
4141
@Component
4242
public class ProducerRunner {
43-
private Random random = new Random();
43+
private final Random random = new Random();
4444
private final Producer<String, KafkaUser> producer;
4545

4646
/**
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Kafka Streams Reconciliation
2+
3+
This module streams records of type `<String, KafkaUser>` from the `USER_TOPIC` and `<String, KafkaOrder>` from the `ORDER_TOPIC`, and reconciles an order with its customer.
4+
It demonstrates the following:
5+
6+
- How to reconcile data from two different topics, regardless of the order in which records arrive or how much time passes between them.
7+
- Unit testing using the Topology Test Driver.
8+
9+
![topology.png](topology.png)
10+
11+
## Prerequisites
12+
13+
To compile and run this demo, you’ll need:
14+
15+
- Java 21
16+
- Maven
17+
- Docker
18+
19+
## Running the Application
20+
21+
To run the application manually:
22+
23+
- Start a [Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) in a Docker environment.
24+
- Produce records of type `<String, KafkaUser>` to the `USER_TOPIC`. You can use the [Producer User](../specific-producers/kafka-streams-producer-user) for this.
25+
- Produce records of type `<String, KafkaOrder>` to the `ORDER_TOPIC`. You can use the [Producer Order](../specific-producers/kafka-streams-producer-order) for this.
26+
- Start the Kafka Streams application.
27+
28+
Alternatively, to run the application with Docker, use the following command:
29+
30+
```console
31+
docker-compose up -d
32+
```
33+
34+
This command will start the following services in Docker:
35+
36+
- 1 Kafka Broker (KRaft mode)
37+
- 1 Schema Registry
38+
- 1 Control Center
39+
- 1 Producer User
40+
- 1 Producer Order
41+
- 1 Kafka Streams Reconciliation
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
---
2+
version: '2'
3+
services:
4+
broker:
5+
image: confluentinc/cp-kafka:7.7.0
6+
hostname: broker
7+
container_name: broker
8+
networks:
9+
- spring-boot-kafka-quickstarts
10+
ports:
11+
- "9092:9092"
12+
- "9101:9101"
13+
environment:
14+
KAFKA_NODE_ID: 1
15+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
16+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
17+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
18+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
19+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
20+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
21+
KAFKA_JMX_PORT: 9101
22+
KAFKA_JMX_HOSTNAME: localhost
23+
KAFKA_PROCESS_ROLES: 'broker,controller'
24+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
25+
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
26+
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
27+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
28+
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
29+
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
30+
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
31+
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
32+
33+
schema-registry:
34+
image: confluentinc/cp-schema-registry:7.7.0
35+
hostname: schema-registry
36+
container_name: schema-registry
37+
networks:
38+
- spring-boot-kafka-quickstarts
39+
depends_on:
40+
- broker
41+
ports:
42+
- "8081:8081"
43+
environment:
44+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
45+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
46+
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
47+
48+
control-center:
49+
image: confluentinc/cp-enterprise-control-center:7.7.0
50+
hostname: control-center
51+
container_name: control-center
52+
networks:
53+
- spring-boot-kafka-quickstarts
54+
depends_on:
55+
- broker
56+
- schema-registry
57+
ports:
58+
- "9021:9021"
59+
environment:
60+
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
61+
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
62+
CONTROL_CENTER_REPLICATION_FACTOR: 1
63+
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
64+
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
65+
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
66+
PORT: 9021
67+
68+
kafka-streams-producer-user:
69+
image: loicgreffier/spring-boot-kafka-quickstarts:kafka-streams-producer-user-1.0.0
70+
hostname: kafka-streams-producer-user
71+
container_name: kafka-streams-producer-user
72+
networks:
73+
- spring-boot-kafka-quickstarts
74+
depends_on:
75+
- broker
76+
- schema-registry
77+
ports:
78+
- "8082:8080"
79+
restart: unless-stopped
80+
environment:
81+
SPRING_APPLICATION_JSON: |
82+
{
83+
"kafka": {
84+
"properties": {
85+
"bootstrap.servers": "broker:29092",
86+
"schema.registry.url": "http://schema-registry:8081"
87+
}
88+
}
89+
}
90+
91+
kafka-streams-producer-order:
92+
image: loicgreffier/spring-boot-kafka-quickstarts:kafka-streams-producer-order-1.0.0
93+
hostname: kafka-streams-producer-order
94+
container_name: kafka-streams-producer-order
95+
networks:
96+
- spring-boot-kafka-quickstarts
97+
depends_on:
98+
- broker
99+
- schema-registry
100+
ports:
101+
- "8083:8080"
102+
restart: unless-stopped
103+
environment:
104+
SPRING_APPLICATION_JSON: |
105+
{
106+
"kafka": {
107+
"properties": {
108+
"bootstrap.servers": "broker:29092",
109+
"schema.registry.url": "http://schema-registry:8081"
110+
}
111+
}
112+
}
113+
114+
kafka-streams-reconciliation:
115+
image: loicgreffier/spring-boot-kafka-quickstarts:kafka-streams-reconciliation-1.0.0
116+
hostname: kafka-streams-reconciliation
117+
container_name: kafka-streams-reconciliation
118+
networks:
119+
- spring-boot-kafka-quickstarts
120+
depends_on:
121+
- kafka-streams-producer-user
122+
ports:
123+
- "8084:8080"
124+
restart: unless-stopped
125+
environment:
126+
SPRING_APPLICATION_JSON: |
127+
{
128+
"kafka": {
129+
"properties": {
130+
"bootstrap.servers": "broker:29092",
131+
"schema.registry.url": "http://schema-registry:8081"
132+
}
133+
}
134+
}
135+
136+
networks:
137+
spring-boot-kafka-quickstarts:
138+
driver: bridge
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<parent>
5+
<groupId>io.github.loicgreffier</groupId>
6+
<artifactId>kafka-streams-quickstarts</artifactId>
7+
<version>1.0.0</version>
8+
</parent>
9+
10+
<artifactId>kafka-streams-reconciliation</artifactId>
11+
12+
<build>
13+
<plugins>
14+
<plugin>
15+
<groupId>org.apache.avro</groupId>
16+
<artifactId>avro-maven-plugin</artifactId>
17+
</plugin>
18+
</plugins>
19+
</build>
20+
</project>
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
{
2+
"namespace": "io.github.loicgreffier.avro",
3+
"type": "record",
4+
"name": "KafkaOrder",
5+
"fields": [
6+
{
7+
"name": "id",
8+
"type": [
9+
"null",
10+
"long"
11+
],
12+
"default": null,
13+
"doc": "Order id"
14+
},
15+
{
16+
"name": "items",
17+
"type": {
18+
"type": "array",
19+
"items": "string"
20+
}
21+
},
22+
{
23+
"name": "totalAmount",
24+
"type": ["null","double"],
25+
"default": null,
26+
"doc": "Total amount of the order"
27+
},
28+
{
29+
"name": "customerId",
30+
"type": [
31+
"null",
32+
"long"
33+
],
34+
"default": null,
35+
"doc": "Customer id"
36+
}
37+
]
38+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"namespace": "io.github.loicgreffier.avro",
3+
"type": "record",
4+
"name": "KafkaReconciliation",
5+
"fields": [
6+
{
7+
"name": "customer",
8+
"type": [
9+
"null",
10+
"KafkaUser"
11+
],
12+
"default": null,
13+
"doc": "Customer"
14+
},
15+
{
16+
"name": "order",
17+
"type": [
18+
"null",
19+
"KafkaOrder"
20+
],
21+
"default": null,
22+
"doc": "Order"
23+
}
24+
]
25+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
{
2+
"namespace": "io.github.loicgreffier.avro",
3+
"type": "record",
4+
"name": "KafkaUser",
5+
"fields": [
6+
{
7+
"name": "id",
8+
"type": [
9+
"null",
10+
"long"
11+
],
12+
"default": null,
13+
"doc": "User id"
14+
},
15+
{
16+
"name": "firstName",
17+
"type": [
18+
"null",
19+
"string"
20+
],
21+
"default": null,
22+
"doc": "User first name"
23+
},
24+
{
25+
"name": "lastName",
26+
"type": [
27+
"null",
28+
"string"
29+
],
30+
"default": null,
31+
"doc": "User last name"
32+
},
33+
{
34+
"name": "nationality",
35+
"type": [
36+
"null",
37+
{
38+
"name": "CountryCode",
39+
"type": "enum",
40+
"symbols" : ["FR", "DE", "ES", "IT", "GB", "US", "BE"]
41+
}
42+
],
43+
"default": null,
44+
"doc": "User nationality"
45+
},
46+
{
47+
"name": "birthDate",
48+
"type": [
49+
"null",
50+
{
51+
"type": "long",
52+
"logicalType": "timestamp-millis"
53+
}
54+
],
55+
"default": null,
56+
"doc": "User date of birth"
57+
}
58+
]
59+
}

0 commit comments

Comments
 (0)