Skip to content

Commit 44aee91

Browse files
committed
Events can be published in parallel
1 parent 2b11dc9 commit 44aee91

File tree

6 files changed

+111
-109
lines changed

6 files changed

+111
-109
lines changed

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/exception/RemRemPublishException.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,15 @@ public RemRemPublishException(String message, RMQBeanConnectionFactory factory,
3232
Throwable cause) {
3333
super(message + factory.getHost() + ":" + factory.getPort(), cause);
3434
}
35+
36+
@Override
37+
public String getMessage() {
38+
String message = super.getMessage();
39+
Throwable cause = getCause();
40+
if (cause != null) {
41+
message += "; root cause: '" + cause.getMessage() + "'";
42+
}
43+
44+
return message;
45+
}
3546
}

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public void init() {
318318
//The exception can be safely handled here as there is a check for existence of exchange is done before each publish.
319319
checkAndCreateExchangeIfNeeded();
320320
} catch (RemRemPublishException e) {
321-
log.error("Error occured while setting up the RabbitMq Connection. "+e.getMessage());
321+
log.error("Error occurred while setting up the RabbitMq Connection. "+e.getMessage());
322322
e.printStackTrace();
323323
}
324324

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageService.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,4 @@ public interface MessageService {
6060
* Does the cleanup like closing open connections
6161
*/
6262
public void cleanUp();
63-
64-
/**
65-
* Implemented Status code for the response
66-
*/
67-
public HttpStatus getHttpStatus();
68-
6963
}

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageServiceRMQImpl.java

Lines changed: 64 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package com.ericsson.eiffel.remrem.publish.service;
1616

1717
import java.io.IOException;
18+
import java.util.ArrayList;
1819
import java.util.HashMap;
1920
import java.util.List;
2021
import java.util.Map;
@@ -49,11 +50,6 @@
4950

5051
Logger log = (Logger) LoggerFactory.getLogger(MessageServiceRMQImpl.class);
5152

52-
/*Variables handles status codes*/
53-
List<Integer> statusCodes;
54-
List<JsonElement> errorItems;
55-
List<PublishResultItem> resultList;
56-
boolean checkEventStatus;
5753
/*
5854
* (non-Javadoc)
5955
* @see com.ericsson.eiffel.remrem.publish.service.MessageService#send(java.util.Map, java.util.Map)
@@ -74,7 +70,6 @@ public SendResult send(Map<String, String> routingKeyMap, Map<String, String> ms
7470
} else {
7571
event = new PublishResultItem(entryKey, HttpStatus.INTERNAL_SERVER_ERROR.value(), PropertiesConfig.SERVER_DOWN,
7672
PropertiesConfig.SERVER_DOWN_MESSAGE);
77-
checkEventStatus = false;
7873
}
7974
} catch (NackException e) {
8075
event = new PublishResultItem(entryKey, HttpStatus.INTERNAL_SERVER_ERROR.value(), PropertiesConfig.SERVER_DOWN,
@@ -105,45 +100,43 @@ public SendResult send(Map<String, String> routingKeyMap, Map<String, String> ms
105100
*/
106101
@Override
107102
public SendResult send(String jsonContent, MsgService msgService, String userDomainSuffix, String tag, String routingKey) {
108-
109-
JsonParser parser = new JsonParser();
110103
try {
111-
JsonElement json = parser.parse(jsonContent);
104+
JsonElement json = JsonParser.parseString(jsonContent);
112105
if (json.isJsonArray()) {
113106
return send(json, msgService, userDomainSuffix, tag, routingKey);
114-
} else {
115-
Map<String, String> map = new HashMap<>();
116-
Map<String, String> routingKeyMap = new HashMap<>();
117-
String eventId = msgService.getEventId(json.getAsJsonObject());
118-
if (StringUtils.isNotBlank(eventId)) {
119-
String routing_key = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix, tag, routingKey);
120-
if (StringUtils.isNotBlank(routing_key)) {
121-
map.put(eventId, json.toString());
122-
routingKeyMap.put(eventId, routing_key);
123-
} else if (routing_key == null) {
124-
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
125-
routingKeyGenerationFailure(resultItemList);
126-
return new SendResult(resultItemList);
127-
} else {
128-
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
129-
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
130-
resultItemList.add(resultItem);
131-
return new SendResult(resultItemList);
132-
}
107+
}
108+
109+
Map<String, String> map = new HashMap<>();
110+
Map<String, String> routingKeyMap = new HashMap<>();
111+
String eventId = msgService.getEventId(json.getAsJsonObject());
112+
if (StringUtils.isNotBlank(eventId)) {
113+
String routing_key = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix, tag, routingKey);
114+
if (StringUtils.isNotBlank(routing_key)) {
115+
map.put(eventId, json.toString());
116+
routingKeyMap.put(eventId, routing_key);
117+
} else if (routing_key == null) {
118+
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
119+
routingKeyGenerationFailure(resultItemList);
120+
return new SendResult(resultItemList);
133121
} else {
134122
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
135-
createFailureResult(resultItemList);
123+
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
124+
resultItemList.add(resultItem);
136125
return new SendResult(resultItemList);
137126
}
138-
return send(routingKeyMap, map, msgService);
127+
} else {
128+
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
129+
createFailureResult(resultItemList);
130+
return new SendResult(resultItemList);
139131
}
132+
return send(routingKeyMap, map, msgService);
140133
} catch (final JsonSyntaxException e) {
141134
String resultMsg = "Could not parse JSON.";
142135
if (e.getCause() != null) {
143136
resultMsg = resultMsg + " Cause: " + e.getCause().getMessage();
144137
}
145138
log.error(resultMsg, e.getMessage());
146-
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
139+
List<PublishResultItem> resultItemList = new ArrayList<>();
147140
createFailureResult(resultItemList);
148141
return new SendResult(resultItemList);
149142
}
@@ -155,76 +148,64 @@ public SendResult send(String jsonContent, MsgService msgService, String userDom
155148
*/
156149
@Override
157150
public SendResult send(JsonElement json, MsgService msgService, String userDomainSuffix, String tag, String routingKey) {
151+
152+
List<PublishResultItem> resultList;
153+
boolean checkEventStatus;
154+
158155
Map<String, String> map = new HashMap<>();
159156
Map<String, String> routingKeyMap = new HashMap<>();
157+
160158
SendResult result;
161-
resultList = new CopyOnWriteArrayList<PublishResultItem>();
159+
resultList = new ArrayList<>();
162160
if (json == null) {
163161
createFailureResult(resultList);
164162
}
165-
if (json.isJsonArray()) {
166-
statusCodes = new CopyOnWriteArrayList<Integer>();
167-
checkEventStatus = true;
168-
JsonArray bodyJson = json.getAsJsonArray();
169-
for (JsonElement obj : bodyJson) {
170-
String eventId = msgService.getEventId(obj.getAsJsonObject());
171-
if (StringUtils.isNotEmpty(eventId) && checkEventStatus) {
172-
String routing_key = getAndCheckEvent(msgService, map, resultList, obj, routingKeyMap,
173-
userDomainSuffix, tag, routingKey);
174-
if (StringUtils.isNotBlank(routing_key)) {
175-
result = send(obj.toString(), msgService, userDomainSuffix, tag, routing_key);
176-
resultList.addAll(result.getEvents());
177-
int statusCode = result.getEvents().get(0).getStatusCode();
178-
if (!statusCodes.contains(statusCode))
179-
statusCodes.add(statusCode);
180-
} else if (routing_key == null) {
181-
routingKeyGenerationFailure(resultList);
182-
errorItems = new CopyOnWriteArrayList<JsonElement>();
183-
int statusCode = resultList.get(0).getStatusCode();
184-
statusCodes.add(statusCode);
185-
errorItems.add(obj);
186-
checkEventStatus = false;
187-
} else {
188-
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
189-
resultList.add(resultItem);
190-
int statusCode = resultItem.getStatusCode();
191-
statusCodes.add(statusCode);
192-
break;
193-
}
194-
} else {
195-
if (!checkEventStatus) {
196-
addUnsuccessfulResultItem(obj);
197-
int statusCode = resultList.get(0).getStatusCode();
198-
statusCodes.add(statusCode);
163+
else {
164+
if (json.isJsonArray()) {
165+
checkEventStatus = true;
166+
JsonArray bodyJson = json.getAsJsonArray();
167+
for (JsonElement obj : bodyJson) {
168+
String eventId = msgService.getEventId(obj.getAsJsonObject());
169+
if (StringUtils.isNotEmpty(eventId) && checkEventStatus) {
170+
String routing_key = getAndCheckEvent(msgService, map, resultList, obj, routingKeyMap,
171+
userDomainSuffix, tag, routingKey);
172+
if (StringUtils.isNotBlank(routing_key)) {
173+
result = send(obj.toString(), msgService, userDomainSuffix, tag, routing_key);
174+
resultList.addAll(result.getEvents());
175+
} else if (routing_key == null) {
176+
routingKeyGenerationFailure(resultList);
177+
checkEventStatus = false;
178+
} else {
179+
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
180+
resultList.add(resultItem);
181+
break;
182+
}
199183
} else {
200-
createFailureResult(resultList);
201-
errorItems = new CopyOnWriteArrayList<JsonElement>();
202-
int statusCode = resultList.get(0).getStatusCode();
203-
statusCodes.add(statusCode);
204-
errorItems.add(obj);
205-
checkEventStatus = false;
184+
if (!checkEventStatus) {
185+
addUnsuccessfulResultItem(resultList, obj);
186+
} else {
187+
createFailureResult(resultList);
188+
checkEventStatus = false;
189+
}
206190
}
207191
}
192+
} else {
193+
result = send(json.toString(), msgService, userDomainSuffix, tag, routingKey);
194+
resultList.addAll(result.getEvents());
208195
}
209-
} else {
210-
statusCodes = new CopyOnWriteArrayList<Integer>();
211-
result = send(json.toString(), msgService, userDomainSuffix, tag, routingKey);
212-
resultList.addAll(result.getEvents());
213-
int statusCode = result.getEvents().get(0).getStatusCode();
214-
if (!statusCodes.contains(statusCode))
215-
statusCodes.add(statusCode);
216196
}
197+
217198
result = new SendResult();
218199
result.setEvents(resultList);
219200
return result;
220201
}
221202

222-
private String sendMessage(String routingKey, String msg, MsgService msgService) throws IOException,NackException, TimeoutException, RemRemPublishException {
203+
private String sendMessage(String routingKey, String msg, MsgService msgService) throws IOException,TimeoutException, RemRemPublishException {
223204
String resultMsg = PropertiesConfig.SUCCESS;
224205
try {
225206
instantiateRmqHelper();
226207
} catch (RemRemPublishException e) {
227-
log.error("RemRemPublishException occurred::" + e.getMessage());
208+
log.error("RemRemPublishException occurred::{}", e.getMessage());
228209
}
229210
rmqHelper.send(routingKey, msg, msgService);
230211
return resultMsg;
@@ -268,7 +249,7 @@ private String getAndCheckEvent(MsgService msgService, Map<String, String> map,
268249

269250
/**
270251
* Method returns result for the failure event.
271-
* @param events for list the eiffel events results
252+
* @param resultItemList for list the eiffel events results
272253
*/
273254
private void createFailureResult(List<PublishResultItem> resultItemList) {
274255
PublishResultItem resultItem = new PublishResultItem(null, 400, PropertiesConfig.INVALID_MESSAGE,
@@ -293,21 +274,9 @@ private void routingKeyGenerationFailure(List<PublishResultItem> resultItemList)
293274
resultItemList.add(resultItem);
294275
}
295276

296-
private void addUnsuccessfulResultItem(JsonElement obj) {
277+
private void addUnsuccessfulResultItem(List<PublishResultItem> resultList, JsonElement obj) {
297278
PublishResultItem event = new PublishResultItem(null, 503, PropertiesConfig.SERVICE_UNAVAILABLE,
298279
PropertiesConfig.UNSUCCESSFUL_EVENT_CONTENT);
299280
resultList.add(event);
300281
}
301-
302-
/**
303-
* Method returns the Http response code for the events.
304-
*/
305-
public HttpStatus getHttpStatus() {
306-
if (statusCodes.size() > 1) {
307-
return HttpStatus.MULTI_STATUS;
308-
} else {
309-
return HttpStatus.valueOf(statusCodes.get(0));
310-
311-
}
312-
}
313282
}

publish-service/src/main/java/com/ericsson/eiffel/remrem/publish/controller/ProducerController.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import java.util.*;
1818

19+
import com.ericsson.eiffel.remrem.publish.service.*;
1920
import com.google.gson.*;
2021
import org.slf4j.LoggerFactory;
2122
import org.springframework.beans.factory.annotation.Autowired;
@@ -27,6 +28,7 @@
2728
import org.springframework.http.HttpStatus;
2829
import org.springframework.http.MediaType;
2930
import org.springframework.http.ResponseEntity;
31+
import org.springframework.lang.NonNull;
3032
import org.springframework.security.core.Authentication;
3133
import org.springframework.security.core.context.SecurityContextHolder;
3234
import org.springframework.security.core.userdetails.UserDetails;
@@ -43,10 +45,6 @@
4345
import com.ericsson.eiffel.remrem.publish.exception.RemRemPublishException;
4446
import com.ericsson.eiffel.remrem.publish.helper.PublishUtils;
4547
import com.ericsson.eiffel.remrem.publish.helper.RMQHelper;
46-
import com.ericsson.eiffel.remrem.publish.service.EventTemplateHandler;
47-
import com.ericsson.eiffel.remrem.publish.service.GenerateURLTemplate;
48-
import com.ericsson.eiffel.remrem.publish.service.MessageService;
49-
import com.ericsson.eiffel.remrem.publish.service.SendResult;
5048
import com.fasterxml.jackson.databind.JsonNode;
5149

5250
import ch.qos.logback.classic.Logger;
@@ -111,6 +109,7 @@ public void setRestTemplate(RestTemplate restTemplate) {
111109
this.restTemplate = restTemplate;
112110
}
113111

112+
private int callsInSend = 0;
114113
public void logUserName() {
115114
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
116115
// Check if the user is authenticated
@@ -165,12 +164,31 @@ public ResponseEntity send(final String msgProtocol, final String userDomain, fi
165164

166165
}
167166
synchronized (this) {
167+
callsInSend++;
168+
log.info("callsInSend (before): " + callsInSend);
168169
SendResult result = messageService.send(body, msgService, userDomain, tag, routingKey);
169-
log.info("HTTP Status: {}", messageService.getHttpStatus().value());
170-
return new ResponseEntity(result, messageService.getHttpStatus());
170+
callsInSend--;
171+
HttpStatus status = getHttpStatus(result);
172+
log.info("callsInSend (after): " + callsInSend);
173+
log.info("HTTP Status: {}", status.value());
174+
return new ResponseEntity(result, status);
171175
}
172176
}
173177

178+
private HttpStatus getHttpStatus(SendResult result) {
179+
List<PublishResultItem> events = result.getEvents();
180+
HttpStatus status;
181+
int nevents = events.size();
182+
if (nevents == 0) {
183+
return HttpStatus.BAD_REQUEST;
184+
}
185+
else if (events.size() == 1) {
186+
return HttpStatus.valueOf(events.get(0).getStatusCode());
187+
}
188+
else {
189+
return HttpStatus.MULTI_STATUS;
190+
}
191+
}
174192
/**
175193
* This controller used as producer to send messages or event
176194
* @param msgProtocol
@@ -275,6 +293,11 @@ public ResponseEntity generateAndPublish(@ApiParam(value = "message protocol", r
275293
}
276294
}
277295

296+
private boolean eventTypeExists(@NonNull MsgService msgService, String eventType) {
297+
Collection<String> supportedEventTypes = msgService.getSupportedEventTypes();
298+
return supportedEventTypes != null && supportedEventTypes.contains(eventType);
299+
}
300+
278301
/**
279302
* This controller provides single RemRem REST API End Point for both RemRem
280303
* Generate and Publish.
@@ -345,6 +368,11 @@ public ResponseEntity generateAndPublish(final String msgProtocol, final String
345368
parsedTemplates.append("[");
346369
for (JsonElement eventJson : events) {
347370
// -- parse params in incoming request -> body -------------
371+
if (!eventTypeExists(msgService, msgType)) {
372+
return createResponseEntity(HttpStatus.BAD_REQUEST, JSON_ERROR_STATUS,
373+
"Unknown event type '" + msgType + "'");
374+
}
375+
348376
JsonNode parsedTemplate = eventTemplateHandler.eventTemplateParser(eventJson.toString(), msgType);
349377
if (parsedTemplates.length() > 1) {
350378
parsedTemplates.append(",");

publish-service/src/test/java/com/ericsson/eiffel/remrem/publish/service/EiffelRemremCommonControllerUnitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void setUp() throws Exception {
110110

111111
when(service.getServiceName()).thenReturn("eiffelsemantics");
112112
when(service2.getServiceName()).thenReturn("eiffelsemantics");
113-
when(messageService.getHttpStatus()).thenReturn(HttpStatus.OK);
113+
// when(messageService.getHttpStatus()).thenReturn(HttpStatus.OK);
114114

115115
when(messageService.send(ArgumentMatchers.anyString(), ArgumentMatchers.any(MsgService.class), ArgumentMatchers.anyString(),
116116
ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(res);

0 commit comments

Comments
 (0)