Skip to content

Commit b6c7a37

Browse files
authored
Merge pull request #117 from Cinimex-Informatica/feature/issue29_endless_loop
Endless loop replaced with boolean flag
2 parents c90c8e8 + fdb14df commit b6c7a37

File tree

4 files changed

+82
-35
lines changed

4 files changed

+82
-35
lines changed

src/main/java/ru/cinimex/exporter/mq/MQPCFSubscriber.java

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
* MQPCFSubscriber is technically not a subscriber, but a runnable object, which sends PCFCommands every n seconds to
2020
* retrieve specific statistics, which couldn't be retrieved by MQTopicSubscriber.
2121
*/
22-
public class MQPCFSubscriber implements Runnable {
22+
public class MQPCFSubscriber extends MQSubscriber {
2323
private static final Logger logger = LogManager.getLogger(MQPCFSubscriber.class);
2424
private MQConnection connection;
2525
private String queueManagerName;
2626
private MQObject object;
2727
private PCFMessageAgent agent;
2828
private List<MQObject> objects;
29+
private boolean isRunning;
2930

3031
/**
3132
* MQPCFSubscriber constructor which is used, when exporter is configured to use 1 MQPCFSubscriber per 1 MQObject.
@@ -37,6 +38,7 @@ public class MQPCFSubscriber implements Runnable {
3738
public MQPCFSubscriber(String queueManagerName, Hashtable<String, Object> connectionProperties, MQObject object) {
3839
establishMQConnection(queueManagerName, connectionProperties);
3940
this.object = object;
41+
this.isRunning = true;
4042
}
4143

4244
/**
@@ -50,6 +52,7 @@ public MQPCFSubscriber(String queueManagerName, Hashtable<String, Object> connec
5052
establishMQConnection(queueManagerName, connectionProperties);
5153
this.objects = objects;
5254
this.object = new MQObject("*", objects.get(0).getType());
55+
this.isRunning = true;
5356
}
5457

5558
/**
@@ -137,31 +140,46 @@ private void updateWithDirectPCFCommand(List<String> objectNames) {
137140
}
138141
}
139142

143+
/**
144+
* Stops subscriber.
145+
*/
146+
public void stopProcessing() {
147+
isRunning = false;
148+
try {
149+
agent.disconnect();
150+
connection.close();
151+
} catch (MQException e) {
152+
logger.error("Error occurred during stopping PCF subscriber: ", e);
153+
}
154+
}
155+
140156
@Override
141157
public void run() {
142-
try {
143-
logger.debug("Sending PCF command for object type {} with name {}...", object.getType(), object.getName());
144-
PCFMessage[] pcfResponse = agent.send(object.getPcfCmd());
145-
if (objects!= null && !objects.isEmpty()) {
146-
updateMetricsWithWildcards(pcfResponse);
147-
} else {
148-
for (PCFMessage response : pcfResponse) {
149-
updateMetricWithoutWildcards(response, object.getName());
158+
if (isRunning) {
159+
try {
160+
logger.debug("Sending PCF command for object type {} with name {}...", object.getType(), object.getName());
161+
PCFMessage[] pcfResponse = agent.send(object.getPcfCmd());
162+
if (objects != null && !objects.isEmpty()) {
163+
updateMetricsWithWildcards(pcfResponse);
164+
} else {
165+
for (PCFMessage response : pcfResponse) {
166+
updateMetricWithoutWildcards(response, object.getName());
167+
}
150168
}
151-
}
152-
logger.debug("PCF response for object type {} with name {} was processed successfully.", object.getType(), object.getName());
153-
} catch (PCFException e) {
154-
if (object.getType() == MQObject.MQType.CHANNEL && e.reasonCode == MQConstants.MQRCCF_CHL_STATUS_NOT_FOUND) {
155-
logger.warn("Channel {} is possibly inactive.", object.getName());
156-
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQCHS_INACTIVE), queueManagerName, object.getName());
157-
} else if (object.getType() == MQObject.MQType.LISTENER && e.reasonCode == MQConstants.MQRC_UNKNOWN_OBJECT_NAME) {
158-
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQSVC_STATUS_STOPPED), queueManagerName, object.getName());
159-
logger.warn("Listener {} is possibly stopped.", object.getName());
160-
} else {
169+
logger.debug("PCF response for object type {} with name {} was processed successfully.", object.getType(), object.getName());
170+
} catch (PCFException e) {
171+
if (object.getType() == MQObject.MQType.CHANNEL && e.reasonCode == MQConstants.MQRCCF_CHL_STATUS_NOT_FOUND) {
172+
logger.warn("Channel {} is possibly inactive.", object.getName());
173+
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQCHS_INACTIVE), queueManagerName, object.getName());
174+
} else if (object.getType() == MQObject.MQType.LISTENER && e.reasonCode == MQConstants.MQRC_UNKNOWN_OBJECT_NAME) {
175+
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQSVC_STATUS_STOPPED), queueManagerName, object.getName());
176+
logger.warn("Listener {} is possibly stopped.", object.getName());
177+
} else {
178+
logger.error("Error occurred during sending PCF command: ", e);
179+
}
180+
} catch (MQException | IOException e) {
161181
logger.error("Error occurred during sending PCF command: ", e);
162182
}
163-
} catch (MQException | IOException e) {
164-
logger.error("Error occurred during sending PCF command: ", e);
165183
}
166184
}
167185

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package ru.cinimex.exporter.mq;
2+
3+
public abstract class MQSubscriber extends Thread {
4+
5+
/**
6+
* Stops subscriber.
7+
*/
8+
public abstract void stopProcessing();
9+
}

src/main/java/ru/cinimex/exporter/mq/MQSubscriberManager.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ public class MQSubscriberManager {
1717
private static final Logger logger = LogManager.getLogger(MQSubscriberManager.class);
1818
private Hashtable<String, Object> connectionProperties;
1919
private String queueManagerName;
20-
private ArrayList<Thread> subscribers;
20+
private ArrayList<MQSubscriber> subscribers;
21+
private ScheduledExecutorService executor;
2122

2223
/**
2324
* Constructor sets params for connecting to target queue manager.
@@ -56,7 +57,7 @@ public void runSubscribers(List<PCFElement> elements, List<MQObject> objects, bo
5657
addPCFSubscribers(objects, interval);
5758
}
5859
}
59-
for (Thread subscriber : subscribers) {
60+
for (MQSubscriber subscriber : subscribers) {
6061
subscriber.start();
6162
}
6263
if (!subscribers.isEmpty()) {
@@ -68,6 +69,15 @@ public void runSubscribers(List<PCFElement> elements, List<MQObject> objects, bo
6869

6970
}
7071

72+
public void stopSubscribers() {
73+
if (executor != null) {
74+
executor.shutdown();
75+
}
76+
for (MQSubscriber subscriber : subscribers) {
77+
subscriber.stopProcessing();
78+
}
79+
}
80+
7181
/**
7282
* Add topic subscribers for each MQ object being monitored.
7383
*
@@ -97,7 +107,7 @@ private void addTopicSubscriber(MQObject object, PCFElement element) {
97107
PCFElement objElement = new PCFElement(element.getTopicString(), element.getRows());
98108
objElement.formatTopicString(object.getName());
99109
try {
100-
subscribers.add(new Thread(new MQTopicSubscriber(objElement, queueManagerName, connectionProperties, queueManagerName, object.getName())));
110+
subscribers.add(new MQTopicSubscriber(objElement, queueManagerName, connectionProperties, queueManagerName, object.getName()));
101111
} catch (MQException e) {
102112
logger.error("Error during creating topic subscriber: ", e);
103113
}
@@ -111,7 +121,7 @@ private void addTopicSubscriber(MQObject object, PCFElement element) {
111121
*/
112122
private void addTopicSubscriber(PCFElement element) {
113123
try {
114-
subscribers.add(new Thread(new MQTopicSubscriber(element, queueManagerName, connectionProperties, queueManagerName)));
124+
subscribers.add(new MQTopicSubscriber(element, queueManagerName, connectionProperties, queueManagerName));
115125
} catch (MQException e) {
116126
logger.error("Error during creating topic subscriber: ", e);
117127
}
@@ -125,11 +135,11 @@ private void addTopicSubscriber(PCFElement element) {
125135
*/
126136
private void addPCFSubscribers(Map<MQObject.MQType, ArrayList<MQObject>> objects, int interval) {
127137
int corePoolSize = MQObject.MQType.values().length;
128-
ScheduledExecutorService executor = Executors.newScheduledThreadPool(corePoolSize);
138+
executor = Executors.newScheduledThreadPool(corePoolSize);
129139
for (Map.Entry<MQObject.MQType, ArrayList<MQObject>> entry : objects.entrySet()) {
130140
if (!entry.getValue().isEmpty()) {
131141
MQPCFSubscriber subscriber = new MQPCFSubscriber(queueManagerName, connectionProperties, entry.getValue());
132-
subscribers.add(new Thread(subscriber));
142+
subscribers.add(subscriber);
133143
logger.debug("Starting subscriber for sending direct PCF commands to retrieve statistics about object with type {} and name {}.", entry.getKey().name());
134144
executor.scheduleAtFixedRate(subscriber, 0, interval, TimeUnit.SECONDS);
135145
logger.debug("Subscriber for sending direct PCF commands for objects with type {} successfully started.", entry.getKey().name());
@@ -148,7 +158,7 @@ private void addPCFSubscribers(List<MQObject> objects, int interval) {
148158
ScheduledExecutorService executor = Executors.newScheduledThreadPool(corePoolSize);
149159
for (MQObject object : objects) {
150160
MQPCFSubscriber subscriber = new MQPCFSubscriber(queueManagerName, connectionProperties, object);
151-
subscribers.add(new Thread(subscriber));
161+
subscribers.add(subscriber);
152162
logger.debug("Starting subscriber for sending direct PCF commands to retrieve statistics about object with type {} and name {}.", object.getType().name(), object.getName());
153163
executor.scheduleAtFixedRate(subscriber, 0, interval, TimeUnit.SECONDS);
154164
logger.debug("Subscriber for sending direct PCF commands to retrieve statistics about object with type {} and name {} successfully started.", object.getType().name(), object.getName());

src/main/java/ru/cinimex/exporter/mq/MQTopicSubscriber.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
/**
2020
* MQTopicSubscriber is used to subscribe to specific topic.
2121
*/
22-
public class MQTopicSubscriber implements Runnable {
22+
public class MQTopicSubscriber extends MQSubscriber {
2323
private static final Logger logger = LogManager.getLogger(MQTopicSubscriber.class);
2424
private MQTopic topic;
2525
private PCFElement element;
2626
private MQConnection connection;
2727
private String[] labels;
28+
private boolean isRunning;
2829

2930
/**
3031
* Subscriber constructor
@@ -70,6 +71,13 @@ private void scrapeMetrics(MQGetMessageOptions gmo) {
7071
}
7172
}
7273

74+
/**
75+
* Stops subscriber.
76+
*/
77+
public void stopProcessing() {
78+
isRunning = false;
79+
}
80+
7381
/**
7482
* Starts subscriber.
7583
*/
@@ -79,19 +87,21 @@ public void run() {
7987
MQGetMessageOptions gmo = new MQGetMessageOptions();
8088
gmo.options = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_COMPLETE_MSG;
8189
gmo.waitInterval = 12000;
82-
while (true) {
90+
isRunning = true;
91+
while (isRunning) {
8392
scrapeMetrics(gmo);
8493
}
8594
} catch (MQException e) {
8695
logger.error("Error occurred during establishing connection with topic {}", element.getTopicString(), e);
8796
} finally {
88-
if (topic != null && topic.isOpen()) {
89-
try {
97+
System.out.println("Finishing topic work!");
98+
try {
99+
if (topic != null && topic.isOpen()) {
90100
topic.close();
91-
connection.close();
92-
} catch (MQException e) {
93-
logger.error("Error occurred during disconnecting from topic {}. Error: ", topic.toString(), e);
94101
}
102+
connection.close();
103+
} catch (MQException e) {
104+
logger.error("Error occurred during disconnecting from topic {}. Error: ", topic.toString(), e);
95105
}
96106
}
97107
}

0 commit comments

Comments
 (0)