@@ -27,8 +27,9 @@ public class KafkaPostgresOrderEventConsumer implements Runnable {
27
27
final static String inventoryTopicName = "inventory.topic" ;
28
28
KafkaPostgressInventoryResource inventoryResource ;
29
29
30
- public KafkaPostgresOrderEventConsumer (KafkaPostgressInventoryResource inventoryResource ) {
30
+ public KafkaPostgresOrderEventConsumer (KafkaPostgressInventoryResource inventoryResource ) throws SQLException {
31
31
this .inventoryResource = inventoryResource ;
32
+ setupDB (inventoryResource .postgresDataSource .getConnection ());
32
33
}
33
34
34
35
@ Override
@@ -69,9 +70,7 @@ public void listenForOrderEvents() {
69
70
Order order = JsonUtils .read (txt , Order .class );
70
71
System .out .print (" orderid:" + order .getOrderid ());
71
72
System .out .print (" itemid:" + order .getItemid ());
72
- if (inventoryResource .crashAfterOrderMessageReceived ) System .exit (-1 );
73
73
updateDataAndSendEventOnInventory (order .getOrderid (), order .getItemid ());
74
- if (inventoryResource .crashAfterOrderMessageProcessed ) System .exit (-1 );
75
74
} catch (Exception ex ) {
76
75
System .out .printf ("message did not contain order" );
77
76
ex .printStackTrace ();
@@ -81,11 +80,12 @@ public void listenForOrderEvents() {
81
80
}
82
81
83
82
private void updateDataAndSendEventOnInventory ( String orderid , String itemid ) throws Exception {
83
+ if (inventoryResource .crashAfterOrderMessageReceived ) System .exit (-1 );
84
84
String inventorylocation = evaluateInventory (itemid );
85
85
Inventory inventory = new Inventory (orderid , itemid , inventorylocation , "beer" ); //static suggestiveSale - represents an additional service/event
86
86
String jsonString = JsonUtils .writeValueAsString (inventory );
87
87
System .out .println ("send inventory status message... jsonString:" + jsonString );
88
- System . out . println ( "sendInsertAndSendOrderMessage........." );
88
+ if ( inventoryResource . crashAfterOrderMessageProcessed ) System . exit (- 1 );
89
89
String topicName = inventoryTopicName ;
90
90
Properties props = new Properties ();
91
91
props .put ("bootstrap.servers" , "kafka-service:9092" );
@@ -109,21 +109,23 @@ private void updateDataAndSendEventOnInventory( String orderid, String itemid) t
109
109
private String evaluateInventory (String id ) {
110
110
System .out .println ("KafkaPostgresOrderEventConsumer postgresDataSource:" + inventoryResource .postgresDataSource );
111
111
System .out .println ("KafkaPostgresOrderEventConsumer evaluateInventory for inventoryid:" + id );
112
- String DECREMENT_BY_ID =
113
- "update inventory set inventorycount = inventorycount - 1 where inventoryid = ? and inventorycount > 0 returning inventorylocation into ?" ;
114
- // try (CallableStatement st = inventoryResource.postgresDataSource.getConnection().prepareCall(DECREMENT_BY_ID)) {
115
112
try (PreparedStatement st = inventoryResource .postgresDataSource .getConnection ().prepareStatement (
116
113
"select inventorycount, inventorylocation from inventory where inventoryid = ?"
117
114
)) {
118
115
st .setString (1 , id );
119
- // st.re.registerOutParameter(2, Types.VARCHAR);
120
116
ResultSet rs = st .executeQuery ();
121
117
rs .next ();
122
118
int inventoryCount = rs .getInt (1 );
123
119
String inventorylocation = rs .getString (2 );
124
120
rs .close ();
125
121
System .out .println ("InventoryServiceOrderEventConsumer.updateDataAndSendEventOnInventory id {" + id + "} location {" + inventorylocation + "} inventoryCount:" + inventoryCount );
126
122
if (inventoryCount > 0 ) {
123
+ PreparedStatement decrementPS = inventoryResource .postgresDataSource .getConnection ().prepareStatement (
124
+ "update inventory set inventorycount = ? where inventoryid = ?" );
125
+ decrementPS .setInt (1 , inventoryCount - 1 );
126
+ decrementPS .setString (2 , id );
127
+ decrementPS .execute ();
128
+ System .out .println ("InventoryServiceOrderEventConsumer.updateDataAndSendEventOnInventory reduced inventory count to:" + (inventoryCount - 1 ));
127
129
return inventorylocation ;
128
130
} else {
129
131
return "inventorydoesnotexist" ;
@@ -134,17 +136,22 @@ private String evaluateInventory(String id) {
134
136
return "unable to find inventory status" ;
135
137
}
136
138
139
+ private void setupDB (Connection connection ) throws SQLException {
140
+ createInventoryTable (connection );
141
+ populateInventoryTable (connection );
142
+ }
143
+
137
144
private void createInventoryTable (Connection connection ) throws SQLException {
138
- System .out .println ("KafkaPostgresOrderEventConsumer createInventoryTable" );
145
+ System .out .println ("KafkaPostgresOrderEventConsumer createInventoryTable IF NOT EXISTS " );
139
146
connection .prepareStatement (
140
- "create table inventory ( inventoryid varchar(16) PRIMARY KEY NOT NULL, inventorylocation varchar(32), inventorycount integer CONSTRAINT positive_inventory CHECK (inventorycount >= 0) )" ).execute ();
147
+ "CREATE TABLE IF NOT EXISTS inventory ( inventoryid varchar(16) PRIMARY KEY NOT NULL, inventorylocation varchar(32), inventorycount integer CONSTRAINT positive_inventory CHECK (inventorycount >= 0) )" ).execute ();
141
148
}
142
149
143
150
private void populateInventoryTable (Connection connection ) throws SQLException {
144
- System .out .println ("KafkaPostgresOrderEventConsumer populateInventoryTable" );
145
- connection .prepareStatement ("insert into inventory values ('sushi', '1468 WEBSTER ST,San Francisco,CA', 0)" ).execute ();
146
- connection .prepareStatement ("insert into inventory values ('pizza', '1469 WEBSTER ST,San Francisco,CA', 0)" ).execute ();
147
- connection .prepareStatement ("insert into inventory values ('burger', '1470 WEBSTER ST,San Francisco,CA', 0)" ).execute ();
151
+ System .out .println ("KafkaPostgresOrderEventConsumer populateInventoryTable if not populated " );
152
+ connection .prepareStatement ("insert into inventory values ('sushi', '1468 WEBSTER ST,San Francisco,CA', 0) ON CONFLICT DO NOTHING " ).execute ();
153
+ connection .prepareStatement ("insert into inventory values ('pizza', '1469 WEBSTER ST,San Francisco,CA', 0) ON CONFLICT DO NOTHING " ).execute ();
154
+ connection .prepareStatement ("insert into inventory values ('burger', '1470 WEBSTER ST,San Francisco,CA', 0) ON CONFLICT DO NOTHING " ).execute ();
148
155
}
149
156
150
157
0 commit comments