17
17
package com .ericsson .ei .handlers ;
18
18
19
19
import java .util .ArrayList ;
20
+ import java .util .Arrays ;
20
21
import java .util .List ;
21
-
22
+ import org . json . JSONObject ;
22
23
import org .slf4j .Logger ;
23
24
import org .slf4j .LoggerFactory ;
24
25
import org .springframework .amqp .core .AcknowledgeMode ;
26
+ import org .springframework .amqp .core .AmqpAdmin ;
25
27
import org .springframework .amqp .core .Binding ;
28
+ import org .springframework .amqp .core .Binding .DestinationType ;
26
29
import org .springframework .amqp .core .BindingBuilder ;
27
30
import org .springframework .amqp .core .Queue ;
28
31
import org .springframework .amqp .core .TopicExchange ;
29
32
import org .springframework .amqp .rabbit .connection .CachingConnectionFactory ;
30
33
import org .springframework .amqp .rabbit .connection .ConnectionFactory ;
34
+ import org .springframework .amqp .rabbit .core .RabbitAdmin ;
31
35
import org .springframework .amqp .rabbit .core .RabbitTemplate ;
32
36
import org .springframework .amqp .rabbit .core .RabbitTemplate .ConfirmCallback ;
33
37
import org .springframework .amqp .rabbit .listener .SimpleMessageListenerContainer ;
34
38
import org .springframework .amqp .rabbit .listener .adapter .MessageListenerAdapter ;
35
39
import org .springframework .amqp .rabbit .support .CorrelationData ;
40
+ import org .springframework .beans .factory .annotation .Autowired ;
36
41
import org .springframework .beans .factory .annotation .Value ;
37
42
import org .springframework .context .annotation .Bean ;
38
43
import org .springframework .stereotype .Component ;
39
44
40
45
import com .ericsson .ei .listener .EIMessageListenerAdapter ;
41
46
import com .fasterxml .jackson .annotation .JsonIgnore ;
47
+ import com .mongodb .BasicDBObject ;
42
48
43
49
import lombok .Getter ;
44
50
import lombok .Setter ;
@@ -108,9 +114,23 @@ public class RmqHandler {
108
114
@ Setter
109
115
@ Value ("${rabbitmq.consumerName}" )
110
116
private String consumerName ;
117
+
118
+ @ Getter
119
+ @ Setter
120
+ @ Value ("${spring.data.mongodb.database}" )
121
+ private String dataBaseName ;
122
+
123
+ @ Getter
124
+ @ Setter
125
+ @ Value ("${bindingkeys.collection.name}" )
126
+ private String collectionName ;
111
127
112
128
@ Value ("${threads.maxPoolSize}" )
113
129
private int maxThreads ;
130
+
131
+ @ Setter
132
+ @ Autowired
133
+ private MongoDBHandler mongoDBHandler ;
114
134
115
135
@ Setter
116
136
@ JsonIgnore
@@ -122,6 +142,10 @@ public class RmqHandler {
122
142
@ JsonIgnore
123
143
private SimpleMessageListenerContainer container ;
124
144
145
+ @ Getter
146
+ @ JsonIgnore
147
+ private AmqpAdmin amqpAdmin ;
148
+
125
149
@ Bean
126
150
public ConnectionFactory connectionFactory () {
127
151
cachingConnectionFactory = new CachingConnectionFactory (host , port );
@@ -171,15 +195,75 @@ Binding binding() {
171
195
}
172
196
173
197
@ Bean
174
- public List <Binding > bindings () {
175
- final String [] bingingKeysArray = splitBindingKeys (bindingKeys );
198
+ public List <Binding > bindings (){
199
+ final String [] bindingKeysArray = splitBindingKeys (bindingKeys );
176
200
final List <Binding > bindingList = new ArrayList <>();
177
- for (final String bindingKey : bingingKeysArray ) {
201
+ for (final String bindingKey : bindingKeysArray ) {
178
202
bindingList .add (BindingBuilder .bind (externalQueue ()).to (exchange ()).with (bindingKey ));
179
203
}
204
+ deleteBindings (bindingKeysArray ,bindingList );
180
205
return bindingList ;
181
206
}
182
207
208
+ /**
209
+ * This method is used to delete the bindings in rabbitMQ.
210
+ * By comparing the binding keys used in the properties and binding keys stored in mongoDB.
211
+ * newBindingKeysArray is the only binding keys array.
212
+ * AMQPBindingObjectList is entire list of bindings.
213
+ * Binding key which is not present in the current AMQPBindingObjectList gets deleted and removed from mongoDB.
214
+ * @return
215
+ */
216
+
217
+ private void deleteBindings (String [] newBindingKeysArray , List <Binding > AMQPBindingObjectList ) {
218
+ // Creating BindingKeys Collection in mongoDB
219
+ ArrayList <String > allDocuments = mongoDBHandler .getAllDocuments (dataBaseName , collectionName );
220
+ ArrayList <String > existingBindingsData = new ArrayList <String >();
221
+ if (!allDocuments .isEmpty ()) {
222
+ for (String bindings : allDocuments ) {
223
+ JSONObject bindingObj = new JSONObject (bindings );
224
+ final String mongoDbBindingKey = bindingObj .getString ("bindingKeys" );
225
+ String condition = "{\" bindingKeys\" : /.*" + mongoDbBindingKey + "/}" ;
226
+ if (!Arrays .asList (newBindingKeysArray ).contains (mongoDbBindingKey )) {
227
+ String destinationDB = bindingObj .getString ("destination" );
228
+ String exchangeDB = bindingObj .getString ("exchange" );
229
+ // Binding the old binding key and removing from queue
230
+ Binding b = new Binding (destinationDB , DestinationType .QUEUE , exchangeDB , mongoDbBindingKey , null );
231
+ amqpAdmin = new RabbitAdmin (connectionFactory ());
232
+ amqpAdmin .removeBinding (b );
233
+ // Removing binding document from mongoDB
234
+ mongoDBHandler .dropDocument (dataBaseName , collectionName , condition );
235
+ } else {
236
+ // storing the existing key into an array.
237
+ existingBindingsData .add (mongoDbBindingKey );
238
+ }
239
+ }
240
+ }
241
+ // To store the new binding key into the mongoDB.
242
+ storeNewBindingKeys (existingBindingsData , AMQPBindingObjectList );
243
+ }
244
+
245
+ /**
246
+ * This method is used to store the bindings of new binding key into mongoDB.
247
+ * @return
248
+ */
249
+
250
+ private void storeNewBindingKeys (ArrayList <String > existingBindingsData , List <Binding > AMQPBindingObjectList ){
251
+ // comparing with the stored key and adding the new binding key into the mongoDB.
252
+ for (final Binding bindingKey :AMQPBindingObjectList ){
253
+ if (existingBindingsData .contains (bindingKey .getRoutingKey ())){
254
+ LOGGER .info ("Binding already present in mongoDB" );
255
+ }else {
256
+ BasicDBObject document = new BasicDBObject ();
257
+ document .put ("destination" ,bindingKey .getDestination ());
258
+ document .put ("destinationType" , bindingKey .getDestinationType ().toString ());
259
+ document .put ("exchange" , bindingKey .getExchange ());
260
+ document .put ("bindingKeys" , bindingKey .getRoutingKey ());
261
+ document .put ("arg" , bindingKey .getArguments ().toString ());
262
+ mongoDBHandler .insertDocument (dataBaseName , collectionName , document .toString ());
263
+ }
264
+ }
265
+ }
266
+
183
267
@ Bean
184
268
public SimpleMessageListenerContainer bindToQueueForRecentEvents (
185
269
final ConnectionFactory springConnectionFactory ,
0 commit comments