Skip to content

Commit 9fb6438

Browse files
remove old bindings of a queue from message bus (#471)
* Added code changes for removing unused bindings of a queue from message bus Co-authored-by: @Navyatha-reddi
1 parent 169aa3b commit 9fb6438

File tree

8 files changed

+187
-2
lines changed

8 files changed

+187
-2
lines changed

src/functionaltests/java/com/ericsson/ei/rabbitmq/connection/RabbitMQTestConnectionSteps.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
import java.util.List;
88
import java.util.concurrent.TimeUnit;
99

10+
import org.json.JSONObject;
1011
import org.junit.Ignore;
1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
14+
import org.springframework.amqp.core.Binding;
15+
import org.springframework.amqp.core.Binding.DestinationType;
1316
import org.springframework.amqp.core.BindingBuilder;
1417
import org.springframework.amqp.core.Queue;
1518
import org.springframework.amqp.core.TopicExchange;
@@ -28,8 +31,13 @@
2831
import com.ericsson.ei.handlers.EventHandler;
2932
import com.ericsson.ei.handlers.RMQHandler;
3033
import com.ericsson.ei.handlers.RMQProperties;
34+
import com.ericsson.ei.mongo.MongoCondition;
35+
import com.ericsson.ei.mongo.MongoDBHandler;
36+
import com.ericsson.ei.mongo.MongoQuery;
37+
import com.ericsson.ei.mongo.MongoStringQuery;
3138
import com.ericsson.ei.utils.AMQPBrokerManager;
3239
import com.ericsson.ei.utils.FunctionalTestBase;
40+
import com.mongodb.BasicDBObject;
3341

3442
import cucumber.api.java.en.Given;
3543
import cucumber.api.java.en.Then;
@@ -38,6 +46,7 @@
3846
@Ignore
3947
@TestPropertySource(properties = {
4048
"spring.data.mongodb.database: RabbitMQTestConnectionSteps",
49+
"bindingkeys.collection.name: RabbitMQConfigurationTestSteps-bindingKeys",
4150
"failed.notifications.collection.name: RabbitMQTestConnectionSteps-failedNotifications",
4251
"rabbitmq.exchange.name: RabbitMQTestConnectionSteps-exchange",
4352
"rabbitmq.queue.suffix: RabbitMQTestConnectionSteps" })
@@ -49,6 +58,9 @@ public class RabbitMQTestConnectionSteps extends FunctionalTestBase {
4958
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTestConnectionSteps.class);
5059
private static final String EIFFEL_EVENTS = "src/functionaltests/resources/eiffel_events_for_thread_testing.json";
5160

61+
private static final String BINDING_KEY_1 = "binding-key-1";
62+
private static final String BINDING_KEY_2 = "binding-key-2";
63+
5264
private AMQPBrokerManager amqpBroker;
5365

5466
@Autowired
@@ -58,6 +70,15 @@ public class RabbitMQTestConnectionSteps extends FunctionalTestBase {
5870
@Autowired
5971
EventHandler eventHandler;
6072

73+
@Value("${spring.data.mongodb.database}")
74+
private String dataBaseName;
75+
76+
@Value("${bindingkeys.collection.name}")
77+
private String collectionName;
78+
79+
@Autowired
80+
private MongoDBHandler mongoDBHandler;
81+
6182
@Given("^We are connected to message bus$")
6283
public void connect_to_message_bus() throws Exception {
6384
int port = SocketUtils.findAvailableTcpPort();
@@ -116,6 +137,19 @@ public void can_send_events_which_are_put_in_the_waitlist() throws Exception {
116137
assertEquals(4, waitListSize);
117138
}
118139

140+
@When("^add the binding documents to mongoDB$")
141+
public void add_the_binding_documents_to_mongoDB() {
142+
BasicDBObject dbBinding = insertBinding();
143+
assertEquals(5,dbBinding.size());
144+
}
145+
146+
@Then("^compare the binding keys and remove the old binding keys from rabbitMQ and mongoDB$")
147+
public void compare_the_binding_keys_and_remove_the_old_binding_keys_from_rabbitMQ_and_mongoDB() {
148+
LOGGER.debug("comparing the binding keys to remove the old binding key");
149+
ArrayList<String> removedBinding = compareAndRemoveBindings();
150+
assertEquals(1, removedBinding.size());
151+
}
152+
119153
/**
120154
* This method collects all the event names of events we will send to the
121155
* message bus.
@@ -150,4 +184,33 @@ private RabbitAdmin createExchange(final RMQHandler rmqHandler) {
150184
return admin;
151185
}
152186

187+
private BasicDBObject insertBinding() {
188+
BasicDBObject docInput = new BasicDBObject();
189+
docInput.put("destination", "ei-domain.eiffel-intelligence.messageConsumer.durable");
190+
docInput.put("destinationType", "QUEUE");
191+
docInput.put("exchange", "ei-binding-keys");
192+
docInput.put("bindingKeys", BINDING_KEY_1);
193+
docInput.put("arg", null);
194+
mongoDBHandler.insertDocument(dataBaseName, collectionName, docInput.toString());
195+
return docInput;
196+
}
197+
198+
private ArrayList<String> compareAndRemoveBindings() {
199+
ArrayList<Binding> listBinding = new ArrayList<Binding>();
200+
listBinding.add(new Binding("ei-domain.eiffel-intelligence.messageConsumer.durable", DestinationType.QUEUE,
201+
"ei-binding-keys", BINDING_KEY_2, null));
202+
List<String> allObjects = mongoDBHandler.getAllDocuments(dataBaseName, collectionName);
203+
ArrayList<String> removedBinding = new ArrayList<String>();
204+
205+
JSONObject dbBinding = new JSONObject(allObjects.get(0));
206+
String mongoDbBinding = dbBinding.getString("bindingKeys");
207+
if (!(listBinding.contains(mongoDbBinding))) {
208+
removedBinding.add(mongoDbBinding);
209+
listBinding.remove(0);
210+
final MongoCondition condition = MongoCondition.bindingKeyCondition(mongoDbBinding);
211+
mongoDBHandler.dropDocument(dataBaseName, collectionName, condition);
212+
}
213+
return removedBinding;
214+
}
215+
153216
}

src/functionaltests/resources/features/rabbitMQTestConnection.feature

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,9 @@ Feature: Test Rabbit MQ Connection
77
When Message bus goes down
88
And Message bus is restarted
99
Then I can send events which are put in the waitlist
10+
11+
@RabbitMQConfigurationDeleteBindingKeysScenario
12+
Scenario: Test that old binding keys are deleted from rabbitMQ and mongoDB
13+
Given We are connected to message bus
14+
When add the binding documents to mongoDB
15+
Then compare the binding keys and remove the old binding keys from rabbitMQ and mongoDB

src/integrationtests/resources/application.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ rabbitmq.queue.durable: true
2626
rabbitmq.binding.key: #
2727
rabbitmq.waitlist.queue.suffix: waitList
2828

29+
bindingkeys.collection.name: binding_keys
30+
2931
encrypted.mongodb.password: ENC(en2h9ZQlZEuWyyfiTVrOPcQbRTwcXxOJ)
3032
spring.data.mongodb.uri: mongodb://admin:${encrypted.mongodb.password}@localhost:27016
3133
spring.data.mongodb.database: eiffel_intelligence

src/main/java/com/ericsson/ei/handlers/RMQHandler.java

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,23 @@
1717
package com.ericsson.ei.handlers;
1818

1919
import java.util.ArrayList;
20+
import java.util.Arrays;
2021
import java.util.List;
2122

2223
import org.apache.commons.lang3.StringUtils;
24+
import org.json.JSONObject;
2325
import org.slf4j.Logger;
2426
import org.slf4j.LoggerFactory;
2527
import org.springframework.amqp.core.AcknowledgeMode;
28+
import org.springframework.amqp.core.AmqpAdmin;
2629
import org.springframework.amqp.core.Binding;
30+
import org.springframework.amqp.core.Binding.DestinationType;
2731
import org.springframework.amqp.core.BindingBuilder;
2832
import org.springframework.amqp.core.Queue;
2933
import org.springframework.amqp.core.TopicExchange;
3034
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
3135
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
36+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
3237
import org.springframework.amqp.rabbit.core.RabbitTemplate;
3338
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
3439
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
@@ -41,6 +46,11 @@
4146

4247
import com.ericsson.ei.listeners.EIMessageListenerAdapter;
4348
import com.ericsson.ei.listeners.RMQConnectionListener;
49+
import com.ericsson.ei.mongo.MongoCondition;
50+
import com.ericsson.ei.mongo.MongoConstants;
51+
import com.ericsson.ei.mongo.MongoDBHandler;
52+
import com.fasterxml.jackson.annotation.JsonIgnore;
53+
import com.mongodb.BasicDBObject;
4454

4555
import lombok.Getter;
4656
import lombok.Setter;
@@ -68,6 +78,24 @@ public class RMQHandler {
6878
@Autowired
6979
private RMQProperties rmqProperties;
7080

81+
@Getter
82+
@Setter
83+
@Value("${spring.data.mongodb.database}")
84+
private String dataBaseName;
85+
86+
@Getter
87+
@Setter
88+
@Value("${bindingkeys.collection.name}")
89+
private String collectionName;
90+
91+
@Setter
92+
@Autowired
93+
private MongoDBHandler mongoDBHandler;
94+
95+
@Getter
96+
@JsonIgnore
97+
private AmqpAdmin amqpAdmin;
98+
7199
@Bean
72100
public ConnectionFactory connectionFactory() {
73101
cachingConnectionFactory = new CachingConnectionFactory(rmqProperties.getHost(), rmqProperties.getPort());
@@ -169,11 +197,12 @@ protected Binding binding() {
169197

170198
@Bean
171199
public List<Binding> bindings() {
172-
String[] bingingKeysArray = splitBindingKeys(rmqProperties.getBindingKeys());
200+
String[] bindingKeysArray = splitBindingKeys(rmqProperties.getBindingKeys());
173201
List<Binding> bindingList = new ArrayList<Binding>();
174-
for (String bindingKey : bingingKeysArray) {
202+
for (String bindingKey : bindingKeysArray) {
175203
bindingList.add(BindingBuilder.bind(externalQueue()).to(exchange()).with(bindingKey));
176204
}
205+
deleteBindings(bindingKeysArray,bindingList);
177206
return bindingList;
178207
}
179208

@@ -185,4 +214,61 @@ private String[] splitBindingKeys(String bindingKeys) {
185214
String bindingKeysWithoutWhitespace = bindingKeys.replaceAll("\\s+", "");
186215
return bindingKeysWithoutWhitespace.split(",");
187216
}
217+
218+
/**
219+
* This method is used to delete the bindings in rabbitMQ.
220+
* By comparing the binding keys used in the properties and binding keys stored in mongoDB.
221+
* newBindingKeysArray is the only binding keys array.
222+
* AMQPBindingObjectList is entire list of bindings.
223+
* Binding key which is not present in the current AMQPBindingObjectList gets deleted and removed from mongoDB.
224+
* @return
225+
*/
226+
private void deleteBindings(String[] newBindingKeysArray, List<Binding> AMQPBindingObjectList) {
227+
// Creating BindingKeys Collection in mongoDB
228+
ArrayList<String> allDocuments = mongoDBHandler.getAllDocuments(dataBaseName, collectionName);
229+
ArrayList<String> existingBindingsData = new ArrayList<String>();
230+
if (!allDocuments.isEmpty()) {
231+
for (String bindings : allDocuments) {
232+
JSONObject bindingObj = new JSONObject(bindings);
233+
final String mongoDbBindingKey = bindingObj.getString("bindingKeys");
234+
MongoCondition condition = MongoCondition.bindingKeyCondition(mongoDbBindingKey);
235+
if (!Arrays.asList(newBindingKeysArray).contains(mongoDbBindingKey)) {
236+
String destinationDB = bindingObj.getString("destination");
237+
String exchangeDB = bindingObj.getString("exchange");
238+
// Binding the old binding key and removing from queue
239+
Binding b = new Binding(destinationDB, DestinationType.QUEUE, exchangeDB, mongoDbBindingKey, null);
240+
amqpAdmin = new RabbitAdmin(connectionFactory());
241+
amqpAdmin.removeBinding(b);
242+
// Removing binding document from mongoDB
243+
mongoDBHandler.dropDocument(dataBaseName, collectionName, condition);
244+
} else {
245+
// storing the existing key into an array.
246+
existingBindingsData.add(mongoDbBindingKey);
247+
}
248+
}
249+
}
250+
// to store the binding keys used for rabbitMQ, in mongo db.
251+
storeNewBindingKeys(existingBindingsData, AMQPBindingObjectList);
252+
}
253+
254+
/**
255+
* This method is used to store the binding keys used for rabbitMQ, in mongoDB.
256+
* @return
257+
*/
258+
private void storeNewBindingKeys(ArrayList<String> existingBindingsData, List<Binding> AMQPBindingObjectList){
259+
// comparing with the stored key and adding the new binding key into the mongoDB.
260+
for(final Binding bindingKey:AMQPBindingObjectList){
261+
if(existingBindingsData.contains(bindingKey.getRoutingKey())){
262+
LOGGER.info("Binding already present in mongoDB");
263+
}else{
264+
BasicDBObject document = new BasicDBObject();
265+
document.put(MongoConstants.MB_DESTINATION,bindingKey.getDestination());
266+
document.put(MongoConstants.MB_DESTINATIONT_TYPE, bindingKey.getDestinationType().toString());
267+
document.put(MongoConstants.MB_EXCHANGE, bindingKey.getExchange());
268+
document.put(MongoConstants.MB_BINDING_KEYS, bindingKey.getRoutingKey());
269+
document.put(MongoConstants.MB_ARG, bindingKey.getArguments().toString());
270+
mongoDBHandler.insertDocument(dataBaseName, collectionName, document.toString());
271+
}
272+
}
273+
}
188274
}

src/main/java/com/ericsson/ei/mongo/MongoCondition.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,18 @@ public static MongoCondition idCondition(String documentId) {
3939
return condition(MongoConstants.ID, documentId);
4040
}
4141

42+
/**
43+
* Creates a MongoCondition to find a document with a given bindingKey. Called with
44+
* <code>my-bindingKey</code> the JSON will look like this:
45+
* <code>{"bindingKeys":"my-bindingKey"}</code>
46+
*
47+
* @param bindingKey The value of the bindingKey
48+
* @return A MongoCondition with bindingKey set
49+
*/
50+
public static MongoCondition bindingKeyCondition(String bindingKey) {
51+
return condition(MongoConstants.MB_BINDING_KEYS, bindingKey);
52+
}
53+
4254
/**
4355
* Creates a MongoCodition just as {@link #idCondition(String)} but with the
4456
* <code>jsonNode</code> value fetched correctly.

src/main/java/com/ericsson/ei/mongo/MongoConstants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,10 @@ public class MongoConstants {
55
public static final String EVENT = "Event";
66
public static final String TIME = "Time";
77
public static final String NOT_LOCKED = "0";
8+
9+
public static final String MB_DESTINATION = "destination";
10+
public static final String MB_DESTINATIONT_TYPE = "destinationType";
11+
public static final String MB_EXCHANGE = "exchange";
12+
public static final String MB_BINDING_KEYS = "bindingKeys";
13+
public static final String MB_ARG = "arg";
814
}

src/main/resources/application.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ rabbitmq.queue.durable: true
2626
rabbitmq.binding.key: #
2727
rabbitmq.waitlist.queue.suffix: waitList
2828

29+
bindingkeys.collection.name: binding_keys
30+
2931
spring.data.mongodb.uri: mongodb://localhost:27017
3032
spring.data.mongodb.database: eiffel_intelligence
3133

wiki/configuration.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,13 @@ The rabbitmq.tls.version property specifies the security protocol and you can fi
204204
* rabbitmq.binding.key
205205
* rabbitmq.waitlist.queue.suffix
206206

207+
## Storage of rabbitMQ binding keys in mongo database
208+
209+
Eiffel Intelligence stores bindings of a queue in a database with the collection name
210+
configured with the property **bindingkeys.collection.name**. Upon starting, Eiffel Intelligence compares,
211+
the bindingkeys with the property value rabbitmq.binding.key and bindings
212+
from mongoDB, then removes unused bindings from RabbitMQ and adds new bindings to the
213+
mongoDB collection. The name of this particular collection is defined by the below property:
207214

208215
## Security
209216

@@ -296,3 +303,4 @@ Eiffel-Intelligence will decrypt password properties with help of the provided s
296303
Execute Eiffel-Intelligence with "jasypt.encryptor.password=mySecretEncryptorPassword" flag, example:
297304

298305
java -jar eiffel-intelligence-<version>.war --jasypt.encryptor.password=mySecretEncryptorPassword --spring.config.location=/path/to/application.properties
306+

0 commit comments

Comments
 (0)