@@ -78,54 +78,32 @@ public void listenForOrderEventsTopic() throws Exception {
78
78
TopicConnection tconn = t_cf .createTopicConnection (inventoryResource .inventoryuser , inventoryResource .inventorypw );
79
79
TopicSession tsess = tconn .createTopicSession (true , Session .CLIENT_ACKNOWLEDGE );
80
80
81
+ TracingMessageConsumer tracingMessageConsumer = null ;
81
82
tconn .start ();
82
83
Topic orderEvents = ((AQjmsSession ) tsess ).getTopic (inventoryResource .queueOwner , inventoryResource .orderQueueName );
83
84
TopicReceiver receiver = ((AQjmsSession ) tsess ).createTopicReceiver (orderEvents , "inventory_service" , null );
84
85
85
- Topic inventoryTopic = ((AQjmsSession ) tsess ).getTopic (InventoryResource .queueOwner , InventoryResource .inventoryQueueName );
86
- TopicPublisher publisher = tsess .createPublisher (inventoryTopic );
87
-
88
86
Order order ;
89
87
String inventorylocation ;
90
- Inventory inventory ;
91
- ResultSet res ;
92
88
TextMessage orderMessage ;
93
- TextMessage inventoryMessage = tsess .createTextMessage ();
94
- int i ;
95
-
89
+
96
90
dbConnection = ((AQjmsSession ) tsess ).getDBConnection ();
97
- OraclePreparedStatement st = (OraclePreparedStatement ) dbConnection .prepareStatement (DECREMENT_BY_ID );
98
- st .registerReturnParameter (2 , Types .VARCHAR );
99
91
100
92
boolean done = false ;
101
93
while (!done ) {
102
94
try {
95
+ tracingMessageConsumer = new TracingMessageConsumer (receiver , inventoryResource .getTracer ());
103
96
// Receive next order event
104
- orderMessage = (TextMessage ) (receiver .receive (-1 ));
105
-
106
- // Parse order event
107
- order = JsonUtils .read (orderMessage .getText (), Order .class );
108
-
109
- // Check inventory
110
- st .setString (1 , order .getItemid ());
111
- i = st .executeUpdate ();
112
- res = st .getReturnResultSet ();
113
- if (i > 0 && res .next ()) {
114
- inventorylocation = res .getString (1 );
115
- } else {
116
- inventorylocation = INVENTORYDOESNOTEXIST ;
117
- }
118
-
119
- // Create inventory event
120
- inventory = new Inventory (order .getOrderid (), order .getItemid (), inventorylocation , "beer" ); //static suggestiveSale - represents an additional service/event
121
- inventoryMessage .setText (JsonUtils .writeValueAsString (inventory ));
122
-
123
- // Publish inventory event
124
- publisher .send (inventoryTopic , inventoryMessage , DeliveryMode .PERSISTENT , 2 , AQjmsConstants .EXPIRATION_NEVER );
125
-
126
- // Commit
127
- tsess .commit ();
128
-
97
+ orderMessage = (TextMessage ) (tracingMessageConsumer .receive (-1 ));
98
+ String txt = orderMessage .getText ();
99
+ System .out .println ("txt " + txt );
100
+ System .out .print (" Message: " + orderMessage .getIntProperty ("Id" ));
101
+ order = JsonUtils .read (txt , Order .class );
102
+ System .out .print (" orderid:" + order .getOrderid ());
103
+ System .out .print (" itemid:" + order .getItemid ());
104
+ updateDataAndSendEventOnInventory ((AQjmsSession ) tsess , order .getOrderid (), order .getItemid ());
105
+ if (tsess !=null ) tsess .commit ();
106
+ System .out .println ("message sent" );
129
107
} catch (IllegalStateException e ) {
130
108
System .out .println ("IllegalStateException in receiveMessages: " + e + " unrecognized message will be ignored" );
131
109
if (tsess != null ) tsess .commit (); //drain unrelated messages - todo add selector for this instead
0 commit comments