Skip to content

Commit dec04d8

Browse files
committed
Upgraded to Kafka 2.1.0 and Confluent 5.1.0
Signed-off-by: Pavan Jadda <jpavanaryan@gmail.com>
1 parent ab05ee4 commit dec04d8

File tree

4 files changed

+82
-47
lines changed

4 files changed

+82
-47
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.kafkastream.constants;
2+
3+
public class KafkaConstants
4+
{
5+
public static String schemaRegistryUrl = "http://localhost:8081";
6+
public static String APPLICATION_ID_CONFIG = "cqrs-streams";
7+
public static String APPLICATION_SERVER_CONFIG = "localhost:8095";
8+
public static String BOOTSTRAP_SERVERS_CONFIG = "localhost:9092";
9+
public static String COMMIT_INTERVAL_MS_CONFIG = "2000";
10+
public static String AUTO_OFFSET_RESET_CONFIG = "earliest";
11+
}

src/main/java/com/kafkastream/service/EventsListener.java

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.kafkastream.service;
22

3+
import com.kafkastream.constants.KafkaConstants;
34
import com.kafkastream.dto.CustomerOrderDTO;
45
import com.kafkastream.model.Customer;
56
import com.kafkastream.model.CustomerOrder;
@@ -31,19 +32,19 @@ public class EventsListener
3132
{
3233
private static KafkaStreams streams;
3334

34-
private static StreamsBuilder streamsBuilder;
35+
private static StreamsBuilder streamsBuilder;
3536

3637
private static Properties properties;
3738

3839
private static void setUp()
3940
{
4041
properties = new Properties();
41-
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "cqrs-streams");
42-
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
43-
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG,"localhost:8095");
44-
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "2000");
45-
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
46-
properties.put("schema.registry.url", "http://localhost:8081");
42+
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, KafkaConstants.APPLICATION_ID_CONFIG);
43+
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS_CONFIG);
44+
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, KafkaConstants.APPLICATION_SERVER_CONFIG);
45+
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, KafkaConstants.COMMIT_INTERVAL_MS_CONFIG);
46+
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConstants.AUTO_OFFSET_RESET_CONFIG);
47+
properties.put("schema.registry.url", KafkaConstants.schemaRegistryUrl);
4748
properties.put("acks", "all");
4849
properties.put("key.deserializer", Serdes.String().deserializer().getClass());
4950
properties.put("value.deserializer", SpecificAvroDeserializer.class);
@@ -55,35 +56,49 @@ private static void setUp()
5556
public static void main(String[] args)
5657
{
5758
setUp();
58-
List<CustomerOrderDTO> customerOrderList=null;
59-
SpecificAvroSerde<Customer> customerSerde = createSerde("http://localhost:8081");
60-
SpecificAvroSerde<Order> orderSerde = createSerde("http://localhost:8081");
61-
SpecificAvroSerde<CustomerOrder> customerOrderSerde = createSerde("http://localhost:8081");
6259

63-
StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer-store"),Serdes.String(), customerSerde)
60+
List<CustomerOrderDTO> customerOrderList;
61+
SpecificAvroSerde<Customer> customerSerde = createSerde(KafkaConstants.schemaRegistryUrl);
62+
SpecificAvroSerde<Order> orderSerde = createSerde(KafkaConstants.schemaRegistryUrl);
63+
SpecificAvroSerde<CustomerOrder> customerOrderSerde = createSerde(KafkaConstants.schemaRegistryUrl);
64+
65+
StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer-store"), Serdes.String(), customerSerde)
6466
.withLoggingEnabled(new HashMap<>());
65-
StoreBuilder orderStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("order-store"),Serdes.String(), customerSerde)
67+
StoreBuilder orderStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("order-store"), Serdes.String(), customerSerde)
6668
.withLoggingEnabled(new HashMap<>());
67-
StoreBuilder customerOrderStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customerordersstore"),Serdes.String(), customerSerde)
69+
StoreBuilder customerOrderStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customerordersstore"), Serdes.String(), customerSerde)
6870
.withLoggingEnabled(new HashMap<>()).withCachingEnabled();
69-
KTable<String, Customer> customerKTable = streamsBuilder.table("customer",Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
71+
72+
KTable<String, Customer> customerKTable = streamsBuilder.table("customer", Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
7073
.withKeySerde(Serdes.String())
7174
.withValueSerde(customerSerde));
72-
//customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)));
7375

76+
customerKTable.filter((key, value) ->
77+
{
78+
System.out.println("customerKTable.key: " + key);
79+
System.out.println("customerKTable.value: " + value);
80+
return true;
81+
});
7482

75-
streamsBuilder.stream("order",Consumed.with(Serdes.String(), orderSerde))
76-
.selectKey((key, value) -> value.getCustomerId().toString()).to("order-to-ktable",Produced.with(Serdes.String(),orderSerde));
77-
KTable<String,Order> orderKTable=streamsBuilder.table("order-to-ktable",Materialized.<String, Order, KeyValueStore<Bytes, byte[]>>as(orderStateStore.name())
83+
streamsBuilder.stream("order", Consumed.with(Serdes.String(), orderSerde))
84+
.selectKey((key, value) -> value.getCustomerId().toString()).to("order-to-ktable", Produced.with(Serdes.String(), orderSerde));
85+
KTable<String, Order> orderKTable = streamsBuilder.table("order-to-ktable", Materialized.<String, Order, KeyValueStore<Bytes, byte[]>>as(orderStateStore.name())
7886
.withKeySerde(Serdes.String())
7987
.withValueSerde(orderSerde));
80-
//orderKTable.foreach(((key, value) -> System.out.println("Order from Topic: " + value)));
88+
//Print orderKTable
89+
orderKTable.filter((key, value) ->
90+
{
91+
System.out.println("orderKTable.key: " + key);
92+
System.out.println("orderKTable.value: " + value);
93+
return true;
94+
});
95+
8196

82-
KTable<String,CustomerOrder> customerOrderKTable=customerKTable.join(orderKTable,(customer, order)->
97+
KTable<String, CustomerOrder> customerOrderKTable = customerKTable.join(orderKTable, (customer, order) ->
8398
{
84-
if(customer!=null && order!=null)
99+
if (customer != null && order != null)
85100
{
86-
CustomerOrder customerOrder=new CustomerOrder();
101+
CustomerOrder customerOrder = new CustomerOrder();
87102
customerOrder.setCustomerId(customer.getCustomerId());
88103
customerOrder.setFirstName(customer.getFirstName());
89104
customerOrder.setLastName(customer.getLastName());
@@ -95,9 +110,17 @@ public static void main(String[] args)
95110
customerOrder.setOrderPurchaseTime(order.getOrderPurchaseTime());
96111
return customerOrder;
97112
}
98-
return null;
99-
},Materialized.<String, CustomerOrder, KeyValueStore<Bytes, byte[]>>as(customerOrderStateStore.name()).withKeySerde(Serdes.String()).withValueSerde(customerOrderSerde));
100-
//customerOrderKTable.foreach(((key, value) -> System.out.println("Customer Order -> "+value.toString())));
113+
return null;
114+
}, Materialized.<String, CustomerOrder, KeyValueStore<Bytes, byte[]>>as(customerOrderStateStore.name()).withKeySerde(Serdes.String()).withValueSerde(customerOrderSerde));
115+
//Print customerOrderKTable
116+
customerOrderKTable.filter((key, value) ->
117+
{
118+
System.out.println("customerOrderKTable.key: " + key);
119+
System.out.println("customerOrderKTable.value: " + value);
120+
return true;
121+
});
122+
123+
101124

102125
Topology topology = streamsBuilder.build();
103126
streams = new KafkaStreams(topology, properties);
@@ -108,7 +131,7 @@ public static void main(String[] args)
108131
streams.start();
109132
final HostInfo restEndpoint = new HostInfo("localhost", 8095);
110133
final StateStoreRestService restService = startRestProxy(streams, restEndpoint);
111-
customerOrderList=restService.getAllCustomersOrders();
134+
customerOrderList = restService.getAllCustomersOrders();
112135
printList(customerOrderList);
113136
latch.await();
114137
}
@@ -130,6 +153,10 @@ public void run()
130153
System.exit(0);
131154
}
132155

156+
private static void printKTable()
157+
{
158+
}
159+
133160
private static void printList(List<CustomerOrderDTO> customerOrderList)
134161
{
135162
for (CustomerOrderDTO customerOrderdto : customerOrderList)
@@ -139,7 +166,7 @@ private static void printList(List<CustomerOrderDTO> customerOrderList)
139166
}
140167

141168

142-
private static <VT extends SpecificRecord> SpecificAvroSerde<VT> createSerde(final String schemaRegistryUrl)
169+
private static <VT extends SpecificRecord> SpecificAvroSerde<VT> createSerde(String schemaRegistryUrl)
143170
{
144171

145172
final SpecificAvroSerde<VT> serde = new SpecificAvroSerde<>();
@@ -166,7 +193,7 @@ public static <T> T waitUntilStoreIsQueryable(final String storeName, final Quer
166193
}
167194
}
168195

169-
static StateStoreRestService startRestProxy(final KafkaStreams streams, final HostInfo hostInfo) throws Exception
196+
private static StateStoreRestService startRestProxy(final KafkaStreams streams, final HostInfo hostInfo) throws Exception
170197
{
171198
final StateStoreRestService interactiveQueriesRestService = new StateStoreRestService(streams, hostInfo);
172199
interactiveQueriesRestService.start();

src/main/java/com/kafkastream/service/StateStoreService.java renamed to src/main/java/com/kafkastream/statestore/StateStoreService.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
package com.kafkastream.service;
1+
package com.kafkastream.statestore;
22

33
import com.kafkastream.config.StreamsBuilderConfig;
44
import com.kafkastream.dto.CustomerOrderDTO;
55
import com.kafkastream.model.CustomerOrder;
66
import com.kafkastream.web.kafkarest.StateStoreRestService;
77
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
8-
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
98
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
109
import org.apache.avro.specific.SpecificRecord;
11-
import org.apache.kafka.common.serialization.Serdes;
1210
import org.apache.kafka.streams.KafkaStreams;
1311
import org.apache.kafka.streams.StreamsBuilder;
1412
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -29,23 +27,22 @@
2927
import javax.ws.rs.core.MediaType;
3028
import java.util.*;
3129

30+
31+
/**
32+
* Not using this class for the moment see EventsListener class for more details
33+
*/
34+
3235
@Service
3336
public class StateStoreService
3437
{
35-
private Properties properties;
36-
37-
private StreamsBuilder streamsBuilder;
38-
39-
private String host="localhost";
40-
41-
private int port=8096;
4238

4339
private final Client client = ClientBuilder.newBuilder().register(JacksonFeature.class).build();
4440
private Server jettyServer;
4541

4642
public StateStoreService()
4743
{
48-
this.properties = new Properties();
44+
/*
45+
Properties properties = new Properties();
4946
properties.put("application.id", "cqrs-streams");
5047
properties.put("bootstrap.servers", "localhost:9092");
5148
properties.put("application.server", "localhost:8096");
@@ -54,9 +51,9 @@ public StateStoreService()
5451
properties.put("schema.registry.url", "http://localhost:8081");
5552
properties.put("acks", "all");
5653
properties.put("key.deserializer", Serdes.String().deserializer().getClass());
57-
properties.put("value.deserializer", SpecificAvroDeserializer.class);
54+
properties.put("value.deserializer", SpecificAvroDeserializer.class); */
5855

59-
this.streamsBuilder = StreamsBuilderConfig.getInstance();
56+
StreamsBuilder streamsBuilder = StreamsBuilderConfig.getInstance();
6057
}
6158

6259

@@ -65,7 +62,8 @@ public List<CustomerOrderDTO> getCustomerOrders(String customerId)
6562
List<CustomerOrderDTO> customerOrderDTOList = new ArrayList<>();
6663
//start();
6764
//Remote server config. Will replace place holders in future
68-
List<CustomerOrder> customerOrderList=client.target(String.format("http://%s:%d/%s",host, 8095, "customer-orders/all")).request(MediaType.APPLICATION_JSON_TYPE).get(new GenericType<List<CustomerOrder>>()
65+
String host = "localhost";
66+
List<CustomerOrder> customerOrderList = client.target(String.format("http://%s:%d/%s", host, 8095, "customer-orders/all")).request(MediaType.APPLICATION_JSON_TYPE).get(new GenericType<List<CustomerOrder>>()
6967
{
7068
});
7169

@@ -124,6 +122,7 @@ private void start() throws Exception
124122
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
125123
context.setContextPath("/");
126124

125+
int port = 8096;
127126
jettyServer = new Server(port);
128127
jettyServer.setHandler(context);
129128

src/main/java/com/kafkastream/web/EventsController.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22

33
import com.kafkastream.dto.CustomerOrderDTO;
44
import com.kafkastream.model.Customer;
5-
import com.kafkastream.model.CustomerOrder;
65
import com.kafkastream.model.Greetings;
76
import com.kafkastream.model.Order;
8-
import com.kafkastream.service.EventsListener;
97
import com.kafkastream.service.EventsSender;
10-
import com.kafkastream.service.StateStoreService;
8+
import com.kafkastream.statestore.StateStoreService;
119
import org.springframework.beans.factory.annotation.Autowired;
1210
import org.springframework.http.HttpStatus;
1311
import org.springframework.web.bind.annotation.GetMapping;
@@ -28,7 +26,7 @@ public class EventsController
2826

2927
private StateStoreService stateStoreService;
3028

31-
Random random=new Random(1);
29+
private Random random = new Random(1);
3230

3331
@Autowired
3432
public EventsController(EventsSender eventsSender,StateStoreService stateStoreService)

0 commit comments

Comments
 (0)