Skip to content

Commit 82a4bd6

Browse files
committed
Update id generation for KafkaUser
1 parent e71dd89 commit 82a4bd6

File tree

27 files changed

+707
-273
lines changed

27 files changed

+707
-273
lines changed

kafka-consumer-quickstarts/kafka-consumer-avro-generic/src/main/avro/user.avsc

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"name": "id",
88
"type": [
99
"null",
10-
"long"
10+
"string"
1111
],
1212
"default": null,
1313
"doc": "User id"
@@ -29,18 +29,6 @@
2929
],
3030
"default": null,
3131
"doc": "User last name"
32-
},
33-
{
34-
"name": "birthDate",
35-
"type": [
36-
"null",
37-
{
38-
"type": "long",
39-
"logicalType": "timestamp-millis"
40-
}
41-
],
42-
"default": null,
43-
"doc": "User date of birth"
4432
}
4533
]
4634
}

kafka-consumer-quickstarts/kafka-consumer-avro-generic/src/main/java/io/github/loicgreffier/consumer/avro/generic/app/ConsumerRunner.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,11 @@ public void run() {
6868

6969
for (ConsumerRecord<String, GenericRecord> message : messages) {
7070
log.info(
71-
"Received offset = {}, partition = {}, key = {}, firstName = {}, lastName = {}",
71+
"Received offset = {}, partition = {}, key = {}, value = {}",
7272
message.offset(),
7373
message.partition(),
7474
message.key(),
75-
message.value().get("firstName"),
76-
message.value().get("lastName"));
75+
message.value());
7776
}
7877

7978
if (!messages.isEmpty()) {

kafka-consumer-quickstarts/kafka-consumer-avro-generic/src/test/java/io/github/loicgreffier/consumer/avro/generic/KafkaConsumerAvroGenericApplicationTest.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@
2525
import static org.mockito.Mockito.times;
2626
import static org.mockito.Mockito.verify;
2727

28+
import com.google.common.primitives.Bytes;
2829
import io.github.loicgreffier.consumer.avro.generic.app.ConsumerRunner;
2930
import java.io.File;
3031
import java.io.IOException;
31-
import java.sql.Timestamp;
32-
import java.time.Instant;
3332
import java.util.Collections;
3433
import java.util.Map;
34+
import java.util.UUID;
3535
import org.apache.avro.Schema;
3636
import org.apache.avro.generic.GenericData;
3737
import org.apache.avro.generic.GenericRecord;
@@ -71,14 +71,7 @@ void shouldConsumeSuccessfully() throws IOException {
7171
File schemaFile = new ClassPathResource("user.avsc").getFile();
7272
Schema schema = new Schema.Parser().parse(schemaFile);
7373

74-
GenericRecord genericRecord = new GenericData.Record(schema);
75-
genericRecord.put("id", 1L);
76-
genericRecord.put("firstName", "Homer");
77-
genericRecord.put("lastName", "Simpson");
78-
genericRecord.put(
79-
"birthDate",
80-
Timestamp.from(Instant.parse("2000-01-01T01:00:00Z")).getTime());
81-
74+
GenericRecord genericRecord = buildGenericRecord(schema);
8275
ConsumerRecord<String, GenericRecord> message = new ConsumerRecord<>(USER_TOPIC, 0, 0, "1", genericRecord);
8376

8477
mockConsumer.schedulePollTask(() -> mockConsumer.addRecord(message));
@@ -95,14 +88,7 @@ void shouldFailOnPoisonPill() throws IOException {
9588
File schemaFile = new ClassPathResource("user.avsc").getFile();
9689
Schema schema = new Schema.Parser().parse(schemaFile);
9790

98-
GenericRecord genericRecord = new GenericData.Record(schema);
99-
genericRecord.put("id", 1L);
100-
genericRecord.put("firstName", "Homer");
101-
genericRecord.put("lastName", "Simpson");
102-
genericRecord.put(
103-
"birthDate",
104-
Timestamp.from(Instant.parse("2000-01-01T01:00:00Z")).getTime());
105-
91+
GenericRecord genericRecord = buildGenericRecord(schema);
10692
ConsumerRecord<String, GenericRecord> message = new ConsumerRecord<>(USER_TOPIC, 0, 0, "1", genericRecord);
10793

10894
mockConsumer.schedulePollTask(() -> mockConsumer.addRecord(message));
@@ -127,4 +113,19 @@ void shouldFailOnPoisonPill() throws IOException {
127113
verify(mockConsumer, times(3)).poll(any());
128114
verify(mockConsumer).commitSync();
129115
}
116+
117+
private GenericRecord buildGenericRecord(Schema schema) {
118+
String firstName = "Homer";
119+
String lastName = "Simpson";
120+
121+
GenericRecord genericRecord = new GenericData.Record(schema);
122+
genericRecord.put(
123+
"id",
124+
UUID.nameUUIDFromBytes(Bytes.concat(firstName.getBytes(), lastName.getBytes()))
125+
.toString());
126+
genericRecord.put("firstName", firstName);
127+
genericRecord.put("lastName", lastName);
128+
129+
return genericRecord;
130+
}
130131
}

kafka-consumer-quickstarts/kafka-consumer-avro-specific/src/main/avro/user.avsc

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"name": "id",
88
"type": [
99
"null",
10-
"long"
10+
"string"
1111
],
1212
"default": null,
1313
"doc": "User id"
@@ -29,18 +29,6 @@
2929
],
3030
"default": null,
3131
"doc": "User last name"
32-
},
33-
{
34-
"name": "birthDate",
35-
"type": [
36-
"null",
37-
{
38-
"type": "long",
39-
"logicalType": "timestamp-millis"
40-
}
41-
],
42-
"default": null,
43-
"doc": "User date of birth"
4432
}
4533
]
4634
}

kafka-consumer-quickstarts/kafka-consumer-avro-specific/src/test/java/io/github/loicgreffier/consumer/avro/specific/KafkaConsumerAvroSpecificApplicationTest.java

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525
import static org.mockito.Mockito.times;
2626
import static org.mockito.Mockito.verify;
2727

28+
import com.google.common.primitives.Bytes;
2829
import io.github.loicgreffier.avro.KafkaUser;
2930
import io.github.loicgreffier.consumer.avro.specific.app.ConsumerRunner;
30-
import java.time.Instant;
3131
import java.util.Collections;
3232
import java.util.Map;
33+
import java.util.UUID;
3334
import org.apache.kafka.clients.consumer.ConsumerRecord;
3435
import org.apache.kafka.clients.consumer.MockConsumer;
3536
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -62,17 +63,9 @@ void setUp() {
6263

6364
@Test
6465
void shouldConsumeSuccessfully() {
65-
ConsumerRecord<String, KafkaUser> message = new ConsumerRecord<>(
66-
USER_TOPIC,
67-
0,
68-
0,
69-
"1",
70-
KafkaUser.newBuilder()
71-
.setId(1L)
72-
.setFirstName("Homer")
73-
.setLastName("Simpson")
74-
.setBirthDate(Instant.parse("2000-01-01T01:00:00Z"))
75-
.build());
66+
KafkaUser kafkaUser = buildKafkaUser();
67+
ConsumerRecord<String, KafkaUser> message =
68+
new ConsumerRecord<>(USER_TOPIC, 0, 0, kafkaUser.getId(), kafkaUser);
7669

7770
mockConsumer.schedulePollTask(() -> mockConsumer.addRecord(message));
7871
mockConsumer.schedulePollTask(mockConsumer::wakeup);
@@ -85,17 +78,9 @@ void shouldConsumeSuccessfully() {
8578

8679
@Test
8780
void shouldFailOnPoisonPill() {
88-
ConsumerRecord<String, KafkaUser> message = new ConsumerRecord<>(
89-
USER_TOPIC,
90-
0,
91-
0,
92-
"1",
93-
KafkaUser.newBuilder()
94-
.setId(1L)
95-
.setFirstName("Homer")
96-
.setLastName("Simpson")
97-
.setBirthDate(Instant.parse("2000-01-01T01:00:00Z"))
98-
.build());
81+
KafkaUser kafkaUser = buildKafkaUser();
82+
ConsumerRecord<String, KafkaUser> message =
83+
new ConsumerRecord<>(USER_TOPIC, 0, 0, kafkaUser.getId(), kafkaUser);
9984

10085
mockConsumer.schedulePollTask(() -> mockConsumer.addRecord(message));
10186
mockConsumer.schedulePollTask(() -> {
@@ -119,4 +104,16 @@ void shouldFailOnPoisonPill() {
119104
verify(mockConsumer, times(3)).poll(any());
120105
verify(mockConsumer).commitSync();
121106
}
107+
108+
private KafkaUser buildKafkaUser() {
109+
String firstName = "Homer";
110+
String lastName = "Simpson";
111+
112+
return KafkaUser.newBuilder()
113+
.setId(UUID.nameUUIDFromBytes(Bytes.concat(firstName.getBytes(), lastName.getBytes()))
114+
.toString())
115+
.setFirstName(firstName)
116+
.setLastName(lastName)
117+
.build();
118+
}
122119
}

kafka-consumer-quickstarts/kafka-consumer-circuit-breaker/src/main/avro/user.avsc

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"name": "id",
88
"type": [
99
"null",
10-
"long"
10+
"string"
1111
],
1212
"default": null,
1313
"doc": "User id"
@@ -29,18 +29,6 @@
2929
],
3030
"default": null,
3131
"doc": "User last name"
32-
},
33-
{
34-
"name": "birthDate",
35-
"type": [
36-
"null",
37-
{
38-
"type": "long",
39-
"logicalType": "timestamp-millis"
40-
}
41-
],
42-
"default": null,
43-
"doc": "User date of birth"
4432
}
4533
]
4634
}

kafka-consumer-quickstarts/kafka-consumer-circuit-breaker/src/test/java/io/github/loicgreffier/consumer/circuit/breaker/KafkaConsumerCircuitBreakerApplicationTest.java

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@
2424
import static org.mockito.Mockito.times;
2525
import static org.mockito.Mockito.verify;
2626

27+
import com.google.common.primitives.Bytes;
2728
import io.github.loicgreffier.avro.KafkaUser;
2829
import io.github.loicgreffier.consumer.circuit.breaker.app.ConsumerRunner;
29-
import java.time.Instant;
3030
import java.util.Collections;
3131
import java.util.Map;
32+
import java.util.UUID;
3233
import org.apache.kafka.clients.consumer.ConsumerRecord;
3334
import org.apache.kafka.clients.consumer.MockConsumer;
3435
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -61,17 +62,9 @@ void setUp() {
6162

6263
@Test
6364
void shouldConsumeSuccessfully() {
64-
ConsumerRecord<String, KafkaUser> message = new ConsumerRecord<>(
65-
USER_TOPIC,
66-
0,
67-
0,
68-
"1",
69-
KafkaUser.newBuilder()
70-
.setId(1L)
71-
.setFirstName("Homer")
72-
.setLastName("Simpson")
73-
.setBirthDate(Instant.parse("2000-01-01T01:00:00Z"))
74-
.build());
65+
KafkaUser kafkaUser = buildKafkaUser();
66+
ConsumerRecord<String, KafkaUser> message =
67+
new ConsumerRecord<>(USER_TOPIC, 0, 0, kafkaUser.getId(), kafkaUser);
7568

7669
mockConsumer.schedulePollTask(() -> mockConsumer.addRecord(message));
7770
mockConsumer.schedulePollTask(mockConsumer::wakeup);
@@ -84,29 +77,12 @@ void shouldConsumeSuccessfully() {
8477

8578
@Test
8679
void shouldBreakCircuitOnPoisonPill() {
87-
ConsumerRecord<String, KafkaUser> message = new ConsumerRecord<>(
88-
USER_TOPIC,
89-
0,
90-
0,
91-
"1",
92-
KafkaUser.newBuilder()
93-
.setId(1L)
94-
.setFirstName("Homer")
95-
.setLastName("Simpson")
96-
.setBirthDate(Instant.parse("2000-01-01T01:00:00Z"))
97-
.build());
98-
99-
ConsumerRecord<String, KafkaUser> message2 = new ConsumerRecord<>(
100-
USER_TOPIC,
101-
0,
102-
2,
103-
"2",
104-
KafkaUser.newBuilder()
105-
.setId(2L)
106-
.setFirstName("Homer")
107-
.setLastName("Simpson")
108-
.setBirthDate(Instant.parse("2000-01-01T01:00:00Z"))
109-
.build());
80+
KafkaUser kafkaUser = buildKafkaUser();
81+
ConsumerRecord<String, KafkaUser> message =
82+
new ConsumerRecord<>(USER_TOPIC, 0, 0, kafkaUser.getId(), kafkaUser);
83+
84+
ConsumerRecord<String, KafkaUser> message2 =
85+
new ConsumerRecord<>(USER_TOPIC, 0, 2, kafkaUser.getId(), kafkaUser);
11086

11187
mockConsumer.schedulePollTask(() -> mockConsumer.addRecord(message));
11288

@@ -135,4 +111,16 @@ void shouldBreakCircuitOnPoisonPill() {
135111
verify(mockConsumer, times(2)).commitSync();
136112
verify(mockConsumer).seek(topicPartition, 2);
137113
}
114+
115+
private KafkaUser buildKafkaUser() {
116+
String firstName = "Homer";
117+
String lastName = "Simpson";
118+
119+
return KafkaUser.newBuilder()
120+
.setId(UUID.nameUUIDFromBytes(Bytes.concat(firstName.getBytes(), lastName.getBytes()))
121+
.toString())
122+
.setFirstName(firstName)
123+
.setLastName(lastName)
124+
.build();
125+
}
138126
}

kafka-consumer-quickstarts/kafka-consumer-headers/src/main/java/io/github/loicgreffier/consumer/headers/app/ConsumerRunner.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,21 +68,16 @@ public void run() {
6868
log.info("Pulled {} records", messages.count());
6969

7070
for (ConsumerRecord<String, String> message : messages) {
71-
Header headerId = message.headers().lastHeader("id");
72-
String headerIdValue = headerId != null ? new String(headerId.value(), StandardCharsets.UTF_8) : "";
73-
7471
Header headerMessage = message.headers().lastHeader("message");
7572
String headerMessageValue =
7673
headerMessage != null ? new String(headerMessage.value(), StandardCharsets.UTF_8) : "";
7774

7875
log.info(
79-
"Received offset = {}, partition = {}, key = {}, value = {}, header id = {}, "
80-
+ "header message = {}",
76+
"Received offset = {}, partition = {}, key = {}, value = {}, header = {}",
8177
message.offset(),
8278
message.partition(),
8379
message.key(),
8480
message.value(),
85-
headerIdValue,
8681
headerMessageValue);
8782
}
8883

0 commit comments

Comments
 (0)