Skip to content

Commit 9797066

Browse files
Messagebus ack (#12)
* Fixed issue with multiprocessing and added acknowledgement
1 parent 8c8f86d commit 9797066

File tree

9 files changed

+277
-65
lines changed

9 files changed

+277
-65
lines changed

src/main/java/com/ericsson/ei/handlers/EventHandler.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.core.Message;
6+
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
57
import org.springframework.beans.factory.annotation.Autowired;
68
import org.springframework.beans.factory.annotation.Value;
79
import org.springframework.context.annotation.Bean;
@@ -11,7 +13,8 @@
1113

1214
import com.ericsson.ei.rules.RulesHandler;
1315
import com.ericsson.ei.rules.RulesObject;
14-
import org.springframework.stereotype.Service;
16+
17+
import com.rabbitmq.client.Channel;
1518

1619
import java.util.concurrent.Executor;
1720

@@ -45,17 +48,25 @@ public Executor asyncExecutor() {
4548
return executor;
4649
}
4750

48-
@Async
4951
public void eventReceived(byte[] message) {
5052
log.info("Thread id " + Thread.currentThread().getId() + " spawned");
5153
String actualMessage = new String(message);
5254
log.info("Event received <" + actualMessage + ">");
5355
eventReceived(actualMessage);
54-
if (System.getProperty("flow.test") == "true") {
55-
String countStr = System.getProperty("eiffel.intelligence.processedEventsCount");
56-
int count = Integer.parseInt(countStr);
57-
count++;
58-
System.setProperty("eiffel.intelligence.processedEventsCount", "" + count);
59-
}
56+
// if (System.getProperty("flow.test") == "true") {
57+
// String countStr = System.getProperty("eiffel.intelligence.processedEventsCount");
58+
// int count = Integer.parseInt(countStr);
59+
// count++;
60+
// System.setProperty("eiffel.intelligence.processedEventsCount", "" + count);
61+
// }
62+
}
63+
64+
@Async
65+
public void onMessage(Message message, Channel channel) throws Exception {
66+
byte[] messageBody = message.getBody();
67+
eventReceived(messageBody);
68+
long deliveryTag = message.getMessageProperties().getDeliveryTag();
69+
channel.basicAck(deliveryTag, false);
70+
int breakHere = 1;
6071
}
6172
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.ericsson.ei.rmqhandler;
2+
3+
import org.springframework.amqp.core.Message;
4+
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
5+
6+
import com.ericsson.ei.handlers.EventHandler;
7+
import com.rabbitmq.client.Channel;
8+
9+
public class EIMessageListenerAdapter extends MessageListenerAdapter {
10+
11+
public EIMessageListenerAdapter(Object delegate) {
12+
super(delegate);
13+
}
14+
15+
@Override
16+
public void onMessage(Message message, Channel channel) throws Exception {
17+
Object delegate = getDelegate();
18+
if (delegate != this) {
19+
if (delegate instanceof EventHandler) {
20+
if (channel != null) {
21+
channel.basicQos(150);
22+
((EventHandler) delegate).onMessage(message, channel);
23+
return;
24+
}
25+
}
26+
}
27+
}
28+
}

src/main/java/com/ericsson/ei/rmqhandler/RmqHandler.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44

55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
7+
import org.springframework.amqp.core.AcknowledgeMode;
78
import org.springframework.amqp.core.Binding;
89
import org.springframework.amqp.core.BindingBuilder;
910
import org.springframework.amqp.core.Queue;
1011
import org.springframework.amqp.core.TopicExchange;
1112
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
1213
import org.springframework.amqp.rabbit.connection.Connection;
1314
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
15+
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
1416
import org.springframework.amqp.rabbit.core.RabbitTemplate;
1517
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
1618
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
@@ -44,11 +46,12 @@ public class RmqHandler {
4446
private String domainId;
4547
@Value("${rabbitmq.componentName}")
4648
private String componentName;
49+
@Value("${rabbitmq.waitlist.queue.suffix}")
50+
private String waitlistSufix;
4751
@Value("${rabbitmq.routing.key}")
4852
private String routingKey;
4953
@Value("${rabbitmq.consumerName}")
5054
private String consumerName;
51-
// SimpleMessageListenerContainer container;
5255
private RabbitTemplate rabbitTemplate;
5356
private CachingConnectionFactory factory;
5457
private SimpleMessageListenerContainer container;
@@ -170,11 +173,12 @@ Binding binding(Queue queue, TopicExchange exchange) {
170173
@Bean
171174
SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
172175
String queueName = getQueueName();
173-
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(eventHandler, "eventReceived");
176+
MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
174177
container = new SimpleMessageListenerContainer();
175178
container.setConnectionFactory(factory);
176179
container.setQueueNames(queueName);
177180
container.setMessageListener(listenerAdapter);
181+
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
178182
return container;
179183
}
180184

@@ -183,6 +187,11 @@ public String getQueueName() {
183187
return domainId + "." + componentName + "." + consumerName + "." + durableName;
184188
}
185189

190+
public String getWaitlistQueueName() {
191+
String durableName = queueDurable ? "durable" : "transient";
192+
return domainId + "." + componentName + "." + consumerName + "." + durableName + "." + waitlistSufix;
193+
}
194+
186195
@Bean
187196
public RabbitTemplate rabbitMqTemplate() {
188197
if (rabbitTemplate == null) {
@@ -208,15 +217,6 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
208217
public void publishObjectToMessageBus(String message) {
209218
log.info("publishing message to message bus...");
210219
rabbitMqTemplate().convertAndSend(message);
211-
// Connection conn = factory.createConnection();
212-
// Channel channel = conn.createChannel(true);
213-
// String queueName = getQueueName();
214-
// String exchange = exchangeName;
215-
// try {
216-
// channel.basicPublish(exchange, queueName, null, message.getBytes());
217-
// } catch (Exception e) {
218-
// log.info(e.getMessage(),e);
219-
// }
220220
}
221221

222222
public void close() {

src/main/java/com/ericsson/ei/rules/RulesHandler.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package com.ericsson.ei.rules;
22

3-
import java.io.File;
3+
import java.io.*;
44
import java.util.Iterator;
5-
6-
import org.apache.commons.io.FileUtils;
7-
85
import org.slf4j.Logger;
96
import org.slf4j.LoggerFactory;
107

@@ -30,7 +27,8 @@ public class RulesHandler {
3027
@PostConstruct public void init() {
3128
if (parsedJason == null) {
3229
try {
33-
jsonFileContent = FileUtils.readFileToString(new File(jsonFilePath));
30+
InputStream in = this.getClass().getResourceAsStream(jsonFilePath);
31+
jsonFileContent = getContent(in);
3432
ObjectMapper objectmapper = new ObjectMapper();
3533
parsedJason = objectmapper.readTree(jsonFileContent);
3634
} catch (Exception e) {
@@ -48,7 +46,6 @@ public RulesObject getRulesForEvent(String event) {
4846
JsonNode type;
4947
JsonNode result;
5048
Iterator<JsonNode> iter = parsedJason.iterator();
51-
5249
while(iter.hasNext()) {
5350
JsonNode rule = iter.next();
5451
typeRule = rule.get("TypeRule").toString();
@@ -66,4 +63,21 @@ public RulesObject getRulesForEvent(String event) {
6663
return null;
6764
}
6865

69-
}
66+
private String getContent(InputStream inputStream){
67+
try {
68+
69+
70+
ByteArrayOutputStream result = new ByteArrayOutputStream();
71+
byte[] buffer = new byte[1024];
72+
int length;
73+
while ((length = inputStream.read(buffer)) != -1) {
74+
result.write(buffer, 0, length);
75+
}
76+
return result.toString("UTF-8");}
77+
catch (Exception e) {
78+
log.error(e.getMessage(), e);
79+
}
80+
return null;
81+
}
82+
83+
}

src/main/java/com/ericsson/ei/waitlist/WaitListWorker.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
@Component
2323
public class WaitListWorker {
2424

25+
@Value("${waitlist.initialDelayResend:}") private int initialDelayResend;
26+
@Value("${waitlist.fixedRateResend:}") private int fixedRateResend;
27+
2528
@Autowired
2629
private WaitListStorageHandler waitListStorageHandler;
2730

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
[
2+
{
3+
"TemplateName":"ARTIFACT_1",
4+
"Type":"EiffelArtifactCreatedEvent",
5+
"TypeRule": "meta.type",
6+
"IdRule": "meta.id",
7+
"StartEvent": "YES",
8+
"IdentifyRules" : "[meta.id]",
9+
"MatchIdRules": {"_id": "%IdentifyRules_objid%"},
10+
"ExtractionRules" : "{ id : meta.id, type : meta.type, time : meta.time, gav : data.gav, fileInformation : data.fileInformation, buildCommand : data.buildCommand }",
11+
"MergeResolverRules" : "",
12+
"ArrayMergeOptions": "",
13+
"HistoryIdentifyRules":"links | [?type=='COMPOSITION'].target",
14+
"HistoryExtractionRules":"{artifacts: [{id : meta.id}]}",
15+
"ProcessRules" : null,
16+
"ProcessFunction" : null
17+
},
18+
{
19+
"TemplateName":"ARTIFACT_1",
20+
"Type":"EiffelArtifactPublishedEvent",
21+
"TypeRule": "meta.type",
22+
"IdRule": "meta.id",
23+
"StartEvent": "NO",
24+
"IdentifyRules" : "links | [?type=='ARTIFACT'].target",
25+
"MatchIdRules": {"_id": "%IdentifyRules_objid%"},
26+
"ExtractionRules" : "{ publications :[ { eventId : meta.id, time : meta.time, locations : data.locations }] }",
27+
"MergeResolverRules" : "",
28+
"ArrayMergeOptions": "",
29+
"HistoryIdentifyRules": "",
30+
"HistoryExtractionRules": "",
31+
"ProcessRules":null,
32+
"ProcessFunction" : null
33+
},
34+
{
35+
"TemplateName":"ARTIFACT_1",
36+
"Type":"EiffelConfidenceLevelModifiedEvent",
37+
"TypeRule": "meta.type",
38+
"IdRule": "meta.id",
39+
"StartEvent": "NO",
40+
"IdentifyRules" : "links | [?type=='SUBJECT'].target",
41+
"MatchIdRules": {"_id": "%IdentifyRules_objid%"},
42+
"ExtractionRules" : "{confidenceLevels :[{ eventId:meta.id, time:meta.time, name:data.name, value:data.value}]}",
43+
"MergeResolverRules" : "",
44+
"ArrayMergeOptions": "",
45+
"HistoryIdentifyRules": "",
46+
"HistoryExtractionRules": "",
47+
"ProcessRules":null,
48+
"ProcessFunction" : null
49+
},
50+
{
51+
"TemplateName":"ARTIFACT_1",
52+
"Type":"EiffelTestCaseStartedEvent",
53+
"TypeRule": "meta.type",
54+
"IdRule": "meta.id",
55+
"StartEvent": "NO",
56+
"IdentifyRules" : "links | [?type=='IUT'].target",
57+
"MatchIdRules": {"_id": "%IdentifyRules_objid%"},
58+
"ExtractionRules" : "{testCaseExecutions :[{ testCaseStartedEventId:meta.id, testCaseStartedTime:meta.time, testCase:data.testCase}]}",
59+
"MergeResolverRules" : "",
60+
"ArrayMergeOptions": "",
61+
"HistoryIdentifyRules": "",
62+
"HistoryExtractionRules": "",
63+
"ProcessRules":null,
64+
"ProcessFunction" : null
65+
},
66+
{
67+
"TemplateName":"ARTIFACT_1",
68+
"Type":"EiffelTestCaseFinishedEvent",
69+
"TypeRule": "meta.type",
70+
"IdRule": "meta.id",
71+
"StartEvent": "NO",
72+
"IdentifyRules" : "links | [?type=='TEST_CASE_EXECUTION'].target",
73+
"MatchIdRules": { "$and": [{"aggregatedObject.testCaseExecutions.testCaseStartedEventId": "%IdentifyRules%"}]},
74+
"ExtractionRules" : "{testCaseExecutions :[{ testCaseFinishEventId:meta.id, testCaseFinishedTime:meta.time, testCase:data.outcome}]}",
75+
"MergeResolverRules" : {"testCaseStartedEventId":"%IdentifyRules%"},
76+
"ArrayMergeOptions": "",
77+
"HistoryIdentifyRules": "",
78+
"HistoryExtractionRules": "",
79+
"ProcessRules":"{testCaseExecutions :[{testCaseDuration : diff(testCaseExecutions[0].testCaseFinishedTime, testCaseExecutions[0].testCaseStartedTime)}]}",
80+
"ProcessFunction" : "difference"
81+
},
82+
{
83+
"TemplateName":"ARTIFACT_1",
84+
"Type":"EiffelCompositionDefinedEvent",
85+
"TypeRule": "meta.type",
86+
"IdRule": "meta.id",
87+
"StartEvent": "NO",
88+
"IdentifyRules" : "",
89+
"MatchIdRules": { },
90+
"ExtractionRules" : "",
91+
"MergeResolverRules" : "",
92+
"ArrayMergeOptions": "",
93+
"HistoryIdentifyRules":"links | [?type=='ELEMENT'].target",
94+
"HistoryExtractionRules": "{internalComposition: {eventId: meta.id, time:meta.time, name:data.name}}",
95+
"ProcessRules":null,
96+
"ProcessFunction" : null
97+
},
98+
{
99+
"TemplateName":"ARTIFACT_1",
100+
"Type":"EiffelSourceChangeCreatedEvent",
101+
"TypeRule": "meta.type",
102+
"IdRule": "meta.id",
103+
"StartEvent": "NO",
104+
"IdentifyRules" : "",
105+
"MatchIdRules": {},
106+
"ExtractionRules" : "",
107+
"MergeResolverRules" : "",
108+
"ArrayMergeOptions": "",
109+
"HistoryIdentifyRules": "",
110+
"HistoryExtractionRules": "{internalComposition: {sourceChanges :[{ author:data.author, issues:data.issues}]}}",
111+
"ProcessRules":null,
112+
"ProcessFunction" : null
113+
},
114+
{
115+
"TemplateName":"ARTIFACT_1",
116+
"Type":"EiffelSourceChangeSubmittedEvent",
117+
"TypeRule": "meta.type",
118+
"IdRule": "meta.id",
119+
"StartEvent": "NO",
120+
"IdentifyRules" : "",
121+
"MatchIdRules": {},
122+
"ExtractionRules" : "",
123+
"MergeResolverRules" : "",
124+
"ArrayMergeOptions": "",
125+
"HistoryIdentifyRules":"links | [?type=='CHANGE'].target",
126+
"HistoryExtractionRules": "{internalComposition: {sourceChanges :[{svnIdentifier:data.svnIdentifier, submitter:data.submitter}]}}",
127+
"ProcessRules":null,
128+
"ProcessFunction" : null
129+
}
130+
]

src/main/resources/application.properties

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
rules.path: src/test/resources/ArtifactRules_new.json
1+
rules.path: /ArtifactRules_new.json
22

33
logging.level.root: INFO
44
logging.level.org.springframework.web: DEBUG
@@ -15,6 +15,7 @@ rabbitmq.componentName: eiffelintelligence
1515
rabbitmq.consumerName: messageConsumer
1616
rabbitmq.queue.durable:true
1717
rabbitmq.routing.key:#
18+
rabbitmq.waitlist.queue.suffix:waitlist
1819

1920
mergeidmarker: %IdentifyRules%
2021

@@ -25,6 +26,8 @@ aggregated.collection.name: aggregated_objects
2526
event_object_map.collection.name: event_object_map
2627
waitlist.collection.name: wait_list
2728
waitlist.collection.ttlValue: 600
29+
waitlist.initialDelayResend: 2000
30+
waitlist.fixedRateResend: 15000
2831
database.name: eiffel_intelligence_eiffelxxx
2932
subscription.collection.name: subscription
3033

0 commit comments

Comments
 (0)