Skip to content

Commit 92b25d9

Browse files
authored
Events can be published in parallel (#302)
Events can be published in parallel
1 parent 5fa2e08 commit 92b25d9

File tree

13 files changed

+161
-124
lines changed

13 files changed

+161
-124
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
## 2.1.6
2+
- Events can be published in parallel.
3+
14
## 2.1.2
25
- Added external parameters to send username, password and/or uri to connect to Messagebus.
3-
6+
47
## 2.1.1
58
- Implemented the changes to log the eventId and HTTPStatus while the level is INFO.
69
- Implemented the changes to print the user information while the log level is INFO.

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<version>2.0.12</version>
1010
</parent>
1111
<properties>
12-
<eiffel-remrem-publish.version>2.1.5</eiffel-remrem-publish.version>
12+
<eiffel-remrem-publish.version>2.1.6</eiffel-remrem-publish.version>
1313
<eiffel-remrem-semantics.version>2.4.0</eiffel-remrem-semantics.version>
1414
</properties>
1515
<artifactId>eiffel-remrem-publish</artifactId>

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/exception/RemRemPublishException.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,15 @@ public RemRemPublishException(String message, RMQBeanConnectionFactory factory,
3232
Throwable cause) {
3333
super(message + factory.getHost() + ":" + factory.getPort(), cause);
3434
}
35+
36+
@Override
37+
public String getMessage() {
38+
String message = super.getMessage();
39+
Throwable cause = getCause();
40+
if (cause != null) {
41+
message += "; root cause: '" + cause.getMessage() + "'";
42+
}
43+
44+
return message;
45+
}
3546
}

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public void init() {
318318
//The exception can be safely handled here as there is a check for existence of exchange is done before each publish.
319319
checkAndCreateExchangeIfNeeded();
320320
} catch (RemRemPublishException e) {
321-
log.error("Error occured while setting up the RabbitMq Connection. "+e.getMessage());
321+
log.error("Error occurred while setting up the RabbitMq Connection. "+e.getMessage());
322322
e.printStackTrace();
323323
}
324324

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/EventTemplateHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public JsonNode eventTemplateParser(String jsonData , String eventName){
7171
updatedJson = mapper.readValue(eventTemplate, JsonNode.class);
7272
} catch (IOException e) {
7373
log.error(e.getMessage(), e);
74+
return null;
7475
}
7576

7677
// For each key/value pair for parsing to template

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageService.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,4 @@ public interface MessageService {
6060
* Does the cleanup like closing open connections
6161
*/
6262
public void cleanUp();
63-
64-
/**
65-
* Implemented Status code for the response
66-
*/
67-
public HttpStatus getHttpStatus();
68-
6963
}

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageServiceRMQImpl.java

Lines changed: 46 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package com.ericsson.eiffel.remrem.publish.service;
1616

1717
import java.io.IOException;
18+
import java.util.ArrayList;
1819
import java.util.HashMap;
1920
import java.util.List;
2021
import java.util.Map;
@@ -49,11 +50,6 @@
4950

5051
Logger log = (Logger) LoggerFactory.getLogger(MessageServiceRMQImpl.class);
5152

52-
/*Variables handles status codes*/
53-
List<Integer> statusCodes;
54-
List<JsonElement> errorItems;
55-
List<PublishResultItem> resultList;
56-
boolean checkEventStatus;
5753
/*
5854
* (non-Javadoc)
5955
* @see com.ericsson.eiffel.remrem.publish.service.MessageService#send(java.util.Map, java.util.Map)
@@ -74,7 +70,6 @@ public SendResult send(Map<String, String> routingKeyMap, Map<String, String> ms
7470
} else {
7571
event = new PublishResultItem(entryKey, HttpStatus.INTERNAL_SERVER_ERROR.value(), PropertiesConfig.SERVER_DOWN,
7672
PropertiesConfig.SERVER_DOWN_MESSAGE);
77-
checkEventStatus = false;
7873
}
7974
} catch (NackException e) {
8075
event = new PublishResultItem(entryKey, HttpStatus.INTERNAL_SERVER_ERROR.value(), PropertiesConfig.SERVER_DOWN,
@@ -105,65 +100,75 @@ public SendResult send(Map<String, String> routingKeyMap, Map<String, String> ms
105100
*/
106101
@Override
107102
public SendResult send(String jsonContent, MsgService msgService, String userDomainSuffix, String tag, String routingKey) {
108-
109-
JsonParser parser = new JsonParser();
110103
try {
111-
JsonElement json = parser.parse(jsonContent);
104+
JsonElement json = JsonParser.parseString(jsonContent);
112105
if (json.isJsonArray()) {
113106
return send(json, msgService, userDomainSuffix, tag, routingKey);
114-
} else {
115-
Map<String, String> map = new HashMap<>();
116-
Map<String, String> routingKeyMap = new HashMap<>();
117-
String eventId = msgService.getEventId(json.getAsJsonObject());
118-
if (StringUtils.isNotBlank(eventId)) {
119-
String routing_key = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix, tag, routingKey);
120-
if (StringUtils.isNotBlank(routing_key)) {
121-
map.put(eventId, json.toString());
122-
routingKeyMap.put(eventId, routing_key);
123-
} else if (routing_key == null) {
124-
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
125-
routingKeyGenerationFailure(resultItemList);
126-
return new SendResult(resultItemList);
127-
} else {
128-
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
129-
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
130-
resultItemList.add(resultItem);
131-
return new SendResult(resultItemList);
132-
}
107+
}
108+
109+
Map<String, String> map = new HashMap<>();
110+
Map<String, String> routingKeyMap = new HashMap<>();
111+
String eventId = msgService.getEventId(json.getAsJsonObject());
112+
if (StringUtils.isNotBlank(eventId)) {
113+
String routing_key = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix, tag, routingKey);
114+
if (StringUtils.isNotBlank(routing_key)) {
115+
map.put(eventId, json.toString());
116+
routingKeyMap.put(eventId, routing_key);
117+
} else if (routing_key == null) {
118+
List<PublishResultItem> resultItemList = new ArrayList<>();
119+
routingKeyGenerationFailure(resultItemList);
120+
return new SendResult(resultItemList);
133121
} else {
134-
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
135-
createFailureResult(resultItemList);
122+
List<PublishResultItem> resultItemList = new ArrayList<>();
123+
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
124+
resultItemList.add(resultItem);
136125
return new SendResult(resultItemList);
137126
}
138-
return send(routingKeyMap, map, msgService);
127+
} else {
128+
List<PublishResultItem> resultItemList = new ArrayList<>();
129+
createFailureResult(resultItemList);
130+
return new SendResult(resultItemList);
139131
}
132+
return send(routingKeyMap, map, msgService);
140133
} catch (final JsonSyntaxException e) {
141134
String resultMsg = "Could not parse JSON.";
142135
if (e.getCause() != null) {
143136
resultMsg = resultMsg + " Cause: " + e.getCause().getMessage();
144137
}
145138
log.error(resultMsg, e.getMessage());
146-
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
139+
List<PublishResultItem> resultItemList = new ArrayList<>();
147140
createFailureResult(resultItemList);
148141
return new SendResult(resultItemList);
149142
}
150143
}
151144

145+
protected SendResult createSendResult(List<PublishResultItem> resultList) {
146+
SendResult result = new SendResult();
147+
result.setEvents(resultList);
148+
return result;
149+
}
150+
152151
/*
153152
* (non-Javadoc)
154153
* @see com.ericsson.eiffel.remrem.publish.service.MessageService#send(com.google.gson.JsonElement, com.ericsson.eiffel.remrem.protocol.MsgService, java.lang.String)
155154
*/
156155
@Override
157156
public SendResult send(JsonElement json, MsgService msgService, String userDomainSuffix, String tag, String routingKey) {
157+
158+
List<PublishResultItem> resultList;
159+
boolean checkEventStatus;
160+
158161
Map<String, String> map = new HashMap<>();
159162
Map<String, String> routingKeyMap = new HashMap<>();
163+
160164
SendResult result;
161-
resultList = new CopyOnWriteArrayList<PublishResultItem>();
165+
resultList = new ArrayList<>();
162166
if (json == null) {
163167
createFailureResult(resultList);
168+
return createSendResult(resultList);
164169
}
170+
165171
if (json.isJsonArray()) {
166-
statusCodes = new CopyOnWriteArrayList<Integer>();
167172
checkEventStatus = true;
168173
JsonArray bodyJson = json.getAsJsonArray();
169174
for (JsonElement obj : bodyJson) {
@@ -174,57 +179,37 @@ public SendResult send(JsonElement json, MsgService msgService, String userDomai
174179
if (StringUtils.isNotBlank(routing_key)) {
175180
result = send(obj.toString(), msgService, userDomainSuffix, tag, routing_key);
176181
resultList.addAll(result.getEvents());
177-
int statusCode = result.getEvents().get(0).getStatusCode();
178-
if (!statusCodes.contains(statusCode))
179-
statusCodes.add(statusCode);
180182
} else if (routing_key == null) {
181183
routingKeyGenerationFailure(resultList);
182-
errorItems = new CopyOnWriteArrayList<JsonElement>();
183-
int statusCode = resultList.get(0).getStatusCode();
184-
statusCodes.add(statusCode);
185-
errorItems.add(obj);
186184
checkEventStatus = false;
187185
} else {
188186
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
189187
resultList.add(resultItem);
190-
int statusCode = resultItem.getStatusCode();
191-
statusCodes.add(statusCode);
192188
break;
193189
}
194190
} else {
195191
if (!checkEventStatus) {
196-
addUnsuccessfulResultItem(obj);
197-
int statusCode = resultList.get(0).getStatusCode();
198-
statusCodes.add(statusCode);
192+
addUnsuccessfulResultItem(resultList, obj);
199193
} else {
200194
createFailureResult(resultList);
201-
errorItems = new CopyOnWriteArrayList<JsonElement>();
202-
int statusCode = resultList.get(0).getStatusCode();
203-
statusCodes.add(statusCode);
204-
errorItems.add(obj);
205195
checkEventStatus = false;
206196
}
207197
}
208198
}
209199
} else {
210-
statusCodes = new CopyOnWriteArrayList<Integer>();
211200
result = send(json.toString(), msgService, userDomainSuffix, tag, routingKey);
212201
resultList.addAll(result.getEvents());
213-
int statusCode = result.getEvents().get(0).getStatusCode();
214-
if (!statusCodes.contains(statusCode))
215-
statusCodes.add(statusCode);
216202
}
217-
result = new SendResult();
218-
result.setEvents(resultList);
219-
return result;
203+
204+
return createSendResult(resultList);
220205
}
221206

222-
private String sendMessage(String routingKey, String msg, MsgService msgService) throws IOException,NackException, TimeoutException, RemRemPublishException {
207+
private String sendMessage(String routingKey, String msg, MsgService msgService) throws IOException,TimeoutException, RemRemPublishException {
223208
String resultMsg = PropertiesConfig.SUCCESS;
224209
try {
225210
instantiateRmqHelper();
226211
} catch (RemRemPublishException e) {
227-
log.error("RemRemPublishException occurred::" + e.getMessage());
212+
log.error("RemRemPublishException occurred: {}", e.getMessage());
228213
}
229214
rmqHelper.send(routingKey, msg, msgService);
230215
return resultMsg;
@@ -268,7 +253,7 @@ private String getAndCheckEvent(MsgService msgService, Map<String, String> map,
268253

269254
/**
270255
* Method returns result for the failure event.
271-
* @param events for list the eiffel events results
256+
* @param resultItemList for list the eiffel events results
272257
*/
273258
private void createFailureResult(List<PublishResultItem> resultItemList) {
274259
PublishResultItem resultItem = new PublishResultItem(null, 400, PropertiesConfig.INVALID_MESSAGE,
@@ -293,21 +278,9 @@ private void routingKeyGenerationFailure(List<PublishResultItem> resultItemList)
293278
resultItemList.add(resultItem);
294279
}
295280

296-
private void addUnsuccessfulResultItem(JsonElement obj) {
281+
private void addUnsuccessfulResultItem(List<PublishResultItem> resultList, JsonElement obj) {
297282
PublishResultItem event = new PublishResultItem(null, 503, PropertiesConfig.SERVICE_UNAVAILABLE,
298283
PropertiesConfig.UNSUCCESSFUL_EVENT_CONTENT);
299284
resultList.add(event);
300285
}
301-
302-
/**
303-
* Method returns the Http response code for the events.
304-
*/
305-
public HttpStatus getHttpStatus() {
306-
if (statusCodes.size() > 1) {
307-
return HttpStatus.MULTI_STATUS;
308-
} else {
309-
return HttpStatus.valueOf(statusCodes.get(0));
310-
311-
}
312-
}
313286
}

0 commit comments

Comments
 (0)