Skip to content

Commit ebba1a3

Browse files
authored
Merge branch 'master' into feature/issue20_add_timeouts
2 parents b756a3e + b6c7a37 commit ebba1a3

File tree

7 files changed

+108
-49
lines changed

7 files changed

+108
-49
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@ Steps, that need to be taken to build the exporter:
141141
mvn package
142142
```
143143

144-
4. After processing is completed, go to mq-java-exporter/target. dependency-jars directory and mq_exporter.jar should appear there.
144+
4. After processing is completed, go to mq-java-exporter/target. lib directory and mq_exporter.jar should appear there.
145145

146146
#### Run
147147
<sub><sup> [Back to TOC.](#table-of-contents) </sup></sub><br/>
148-
To run exporter, dependency-jars directory (and all jars in it) and
148+
To run exporter, lib directory (and all jars in it) and
149149
mq_exporter.jar should be located in the same folder.
150150

151151
##### Running exporter as mq service

lib/com.ibm.mq.allclient.jar

-7.83 MB
Binary file not shown.

pom.xml

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,32 @@
88
<packaging>jar</packaging>
99
<properties>
1010
<jdk.version>1.8</jdk.version>
11-
<mq.allclient.version>9.0.0.1</mq.allclient.version>
11+
<mq.allclient.version>9.1.2.0</mq.allclient.version>
1212
<snakeyaml.version>1.23</snakeyaml.version>
1313
<prometheus.version>0.6.0</prometheus.version>
1414
<log4j.version>2.11.2</log4j.version>
15-
<mq.allclient.jar.path>/lib/com.ibm.mq.allclient.jar</mq.allclient.jar.path>
1615
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1716
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
1817
</properties>
1918
<dependencies>
20-
<!--When patch from IBM would appear - we will use latest version from Maven repo -->
21-
<!-- <dependency><groupId>com.ibm.mq</groupId><artifactId>com.ibm.mq.allclient</artifactId><version>9.0.5.0</version></dependency>-->
22-
<!-- <dependency><groupId>com.ibm.mq</groupId><artifactId>com.ibm.mq.allclient</artifactId><version>${mq.allclient.version}</version><scope>system</scope></dependency>-->
2319
<dependency>
2420
<groupId>com.ibm.mq</groupId>
2521
<artifactId>com.ibm.mq.allclient</artifactId>
2622
<version>${mq.allclient.version}</version>
27-
<scope>system</scope>
28-
<systemPath>${basedir}${mq.allclient.jar.path}</systemPath>
23+
<exclusions>
24+
<exclusion>
25+
<groupId>org.bouncycastle</groupId>
26+
<artifactId>bcprov-jdk15on</artifactId>
27+
</exclusion>
28+
<exclusion>
29+
<groupId>org.bouncycastle</groupId>
30+
<artifactId>bcpkix-jdk15on</artifactId>
31+
</exclusion>
32+
<exclusion>
33+
<groupId>javax.jms</groupId>
34+
<artifactId>javax.jms-api</artifactId>
35+
</exclusion>
36+
</exclusions>
2937
</dependency>
3038
<dependency>
3139
<groupId>org.yaml</groupId>
@@ -81,7 +89,7 @@
8189
<!-- Jar file entry point -->
8290
<addClasspath>true</addClasspath>
8391
<mainClass>ru.cinimex.exporter.ExporterLauncher</mainClass>
84-
<classpathPrefix>dependency-jars/</classpathPrefix>
92+
<classpathPrefix>lib/</classpathPrefix>
8593
</manifest>
8694
</archive>
8795
</configuration>
@@ -99,7 +107,7 @@
99107
</goals>
100108
<configuration>
101109
<includeScope>runtime</includeScope>
102-
<outputDirectory>${project.build.directory}/dependency-jars/</outputDirectory>
110+
<outputDirectory>${project.build.directory}/lib/</outputDirectory>
103111
</configuration>
104112
</execution>
105113
</executions>

src/main/java/ru/cinimex/exporter/mq/MQPCFSubscriber.java

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
* MQPCFSubscriber is technically not a subscriber, but a runnable object, which sends PCFCommands every n seconds to
2020
* retrieve specific statistics, which couldn't be retrieved by MQTopicSubscriber.
2121
*/
22-
public class MQPCFSubscriber implements Runnable {
22+
public class MQPCFSubscriber extends MQSubscriber {
2323
private static final Logger logger = LogManager.getLogger(MQPCFSubscriber.class);
2424
private MQConnection connection;
2525
private String queueManagerName;
2626
private MQObject object;
2727
private PCFMessageAgent agent;
2828
private List<MQObject> objects;
29+
private boolean isRunning;
2930

3031
/**
3132
* MQPCFSubscriber constructor which is used, when exporter is configured to use 1 MQPCFSubscriber per 1 MQObject.
@@ -37,6 +38,7 @@ public class MQPCFSubscriber implements Runnable {
3738
public MQPCFSubscriber(String queueManagerName, Hashtable<String, Object> connectionProperties, MQObject object) {
3839
establishMQConnection(queueManagerName, connectionProperties);
3940
this.object = object;
41+
this.isRunning = true;
4042
}
4143

4244
/**
@@ -50,6 +52,7 @@ public MQPCFSubscriber(String queueManagerName, Hashtable<String, Object> connec
5052
establishMQConnection(queueManagerName, connectionProperties);
5153
this.objects = objects;
5254
this.object = new MQObject("*", objects.get(0).getType());
55+
this.isRunning = true;
5356
}
5457

5558
/**
@@ -112,19 +115,22 @@ private void updateMetricsWithWildcards(PCFMessage[] pcfResponse) {
112115
/**
113116
* Retrieves info about all objects from input array via direct pcf commands.
114117
*
115-
* @param objectNames - input array with objects.
118+
* @param objectNames - input list with objects.
116119
*/
117-
private void updateWithDirectPCFCommand(ArrayList<String> objectNames) {
120+
private void updateWithDirectPCFCommand(List<String> objectNames) {
118121
for (String objectName : objectNames) {
119122
MQObject directObject = new MQObject(objectName, object.getType());
120123
try {
121124
PCFMessage[] directPCFResponse = agent.send(directObject.getPcfCmd());
122125
updateMetricWithoutWildcards(directPCFResponse[0], objectName);
123126
} catch (PCFException e) {
124127
//This error means, that channel has status "inactive".
125-
if (e.reasonCode == MQConstants.MQRCCF_CHL_STATUS_NOT_FOUND) {
128+
if (object.getType() == MQObject.MQType.CHANNEL && e.reasonCode == MQConstants.MQRCCF_CHL_STATUS_NOT_FOUND) {
126129
logger.warn("Channel {} is possibly inactive.", objectName);
127130
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQCHS_INACTIVE), queueManagerName, objectName);
131+
} else if (object.getType() == MQObject.MQType.LISTENER && e.reasonCode == MQConstants.MQRC_UNKNOWN_OBJECT_NAME) {
132+
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQSVC_STATUS_STOPPED), queueManagerName, objectName);
133+
logger.warn("Listener {} is possibly stopped.", objectName);
128134
} else {
129135
logger.error("Error occurred during sending PCF command: ", e);
130136
}
@@ -134,30 +140,46 @@ private void updateWithDirectPCFCommand(ArrayList<String> objectNames) {
134140
}
135141
}
136142

143+
/**
144+
* Stops subscriber.
145+
*/
146+
public void stopProcessing() {
147+
isRunning = false;
148+
try {
149+
agent.disconnect();
150+
connection.close();
151+
} catch (MQException e) {
152+
logger.error("Error occurred during stopping PCF subscriber: ", e);
153+
}
154+
}
155+
137156
@Override
138157
public void run() {
139-
try {
140-
logger.debug("Sending PCF command for object type {} with name {}...", object.getType(), object.getName());
141-
PCFMessage[] pcfResponse = agent.send(object.getPcfCmd());
142-
if (!objects.isEmpty()) {
143-
updateMetricsWithWildcards(pcfResponse);
144-
} else {
145-
for (PCFMessage response : pcfResponse) {
146-
updateMetricWithoutWildcards(response, object.getName());
158+
if (isRunning) {
159+
try {
160+
logger.debug("Sending PCF command for object type {} with name {}...", object.getType(), object.getName());
161+
PCFMessage[] pcfResponse = agent.send(object.getPcfCmd());
162+
if (objects != null && !objects.isEmpty()) {
163+
updateMetricsWithWildcards(pcfResponse);
164+
} else {
165+
for (PCFMessage response : pcfResponse) {
166+
updateMetricWithoutWildcards(response, object.getName());
167+
}
147168
}
169+
logger.debug("PCF response for object type {} with name {} was processed successfully.", object.getType(), object.getName());
170+
} catch (PCFException e) {
171+
if (object.getType() == MQObject.MQType.CHANNEL && e.reasonCode == MQConstants.MQRCCF_CHL_STATUS_NOT_FOUND) {
172+
logger.warn("Channel {} is possibly inactive.", object.getName());
173+
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQCHS_INACTIVE), queueManagerName, object.getName());
174+
} else if (object.getType() == MQObject.MQType.LISTENER && e.reasonCode == MQConstants.MQRC_UNKNOWN_OBJECT_NAME) {
175+
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQSVC_STATUS_STOPPED), queueManagerName, object.getName());
176+
logger.warn("Listener {} is possibly stopped.", object.getName());
177+
} else {
178+
logger.error("Error occurred during sending PCF command: ", e);
179+
}
180+
} catch (MQException | IOException e) {
181+
logger.error("Error occurred during sending PCF command: ", e);
148182
}
149-
logger.debug("PCF response for object type {} with name {} was processed successfully.", object.getType(), object.getName());
150-
} catch (PCFException e) {
151-
if (e.reasonCode == MQConstants.MQRCCF_CHL_STATUS_NOT_FOUND) {
152-
logger.warn("Channel {} is possibly inactive.", object.getName());
153-
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQCHS_INACTIVE), queueManagerName, object.getName());
154-
}
155-
if (object.getType() == MQObject.MQType.LISTENER && e.reasonCode == MQConstants.MQRC_UNKNOWN_OBJECT_NAME) {
156-
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQSVC_STATUS_STOPPED), queueManagerName, object.getName());
157-
logger.warn("Listener {} is possibly stopped.", object.getName());
158-
}
159-
} catch (MQException | IOException e) {
160-
logger.error("Error occurred during sending PCF command: ", e);
161183
}
162184
}
163185

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package ru.cinimex.exporter.mq;
2+
3+
public abstract class MQSubscriber extends Thread {
4+
5+
/**
6+
* Stops subscriber.
7+
*/
8+
public abstract void stopProcessing();
9+
}

src/main/java/ru/cinimex/exporter/mq/MQSubscriberManager.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ public class MQSubscriberManager {
1717
private static final Logger logger = LogManager.getLogger(MQSubscriberManager.class);
1818
private Hashtable<String, Object> connectionProperties;
1919
private String queueManagerName;
20-
private ArrayList<Thread> subscribers;
20+
private ArrayList<MQSubscriber> subscribers;
21+
private ScheduledExecutorService executor;
2122

2223
/**
2324
* Constructor sets params for connecting to target queue manager.
@@ -57,7 +58,7 @@ public void runSubscribers(List<PCFElement> elements, List<MQObject> objects, bo
5758
addPCFSubscribers(objects, interval);
5859
}
5960
}
60-
for (Thread subscriber : subscribers) {
61+
for (MQSubscriber subscriber : subscribers) {
6162
subscriber.start();
6263
}
6364
if (!subscribers.isEmpty()) {
@@ -69,6 +70,15 @@ public void runSubscribers(List<PCFElement> elements, List<MQObject> objects, bo
6970

7071
}
7172

73+
public void stopSubscribers() {
74+
if (executor != null) {
75+
executor.shutdown();
76+
}
77+
for (MQSubscriber subscriber : subscribers) {
78+
subscriber.stopProcessing();
79+
}
80+
}
81+
7282
/**
7383
* Add topic subscribers for each MQ object being monitored.
7484
*
@@ -100,7 +110,7 @@ private void addTopicSubscriber(MQObject object, PCFElement element, int timeout
100110
PCFElement objElement = new PCFElement(element.getTopicString(), element.getRows());
101111
objElement.formatTopicString(object.getName());
102112
try {
103-
subscribers.add(new Thread(new MQTopicSubscriber(objElement, queueManagerName, connectionProperties, timeout, queueManagerName, object.getName())));
113+
subscribers.add(new MQTopicSubscriber(objElement, queueManagerName, connectionProperties, timeout, queueManagerName, object.getName()));
104114
} catch (MQException e) {
105115
logger.error("Error during creating topic subscriber: ", e);
106116
}
@@ -115,7 +125,7 @@ private void addTopicSubscriber(MQObject object, PCFElement element, int timeout
115125
*/
116126
private void addTopicSubscriber(PCFElement element, int timeout) {
117127
try {
118-
subscribers.add(new Thread(new MQTopicSubscriber(element, queueManagerName, connectionProperties, timeout, queueManagerName)));
128+
subscribers.add(new MQTopicSubscriber(element, queueManagerName, connectionProperties, timeout, queueManagerName));
119129
} catch (MQException e) {
120130
logger.error("Error during creating topic subscriber: ", e);
121131
}
@@ -129,11 +139,11 @@ private void addTopicSubscriber(PCFElement element, int timeout) {
129139
*/
130140
private void addPCFSubscribers(Map<MQObject.MQType, ArrayList<MQObject>> objects, int interval) {
131141
int corePoolSize = MQObject.MQType.values().length;
132-
ScheduledExecutorService executor = Executors.newScheduledThreadPool(corePoolSize);
142+
executor = Executors.newScheduledThreadPool(corePoolSize);
133143
for (Map.Entry<MQObject.MQType, ArrayList<MQObject>> entry : objects.entrySet()) {
134144
if (!entry.getValue().isEmpty()) {
135145
MQPCFSubscriber subscriber = new MQPCFSubscriber(queueManagerName, connectionProperties, entry.getValue());
136-
subscribers.add(new Thread(subscriber));
146+
subscribers.add(subscriber);
137147
logger.debug("Starting subscriber for sending direct PCF commands to retrieve statistics about object with type {} and name {}.", entry.getKey().name());
138148
executor.scheduleAtFixedRate(subscriber, 0, interval, TimeUnit.SECONDS);
139149
logger.debug("Subscriber for sending direct PCF commands for objects with type {} successfully started.", entry.getKey().name());
@@ -152,7 +162,7 @@ private void addPCFSubscribers(List<MQObject> objects, int interval) {
152162
ScheduledExecutorService executor = Executors.newScheduledThreadPool(corePoolSize);
153163
for (MQObject object : objects) {
154164
MQPCFSubscriber subscriber = new MQPCFSubscriber(queueManagerName, connectionProperties, object);
155-
subscribers.add(new Thread(subscriber));
165+
subscribers.add(subscriber);
156166
logger.debug("Starting subscriber for sending direct PCF commands to retrieve statistics about object with type {} and name {}.", object.getType().name(), object.getName());
157167
executor.scheduleAtFixedRate(subscriber, 0, interval, TimeUnit.SECONDS);
158168
logger.debug("Subscriber for sending direct PCF commands to retrieve statistics about object with type {} and name {} successfully started.", object.getType().name(), object.getName());

src/main/java/ru/cinimex/exporter/mq/MQTopicSubscriber.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
/**
2020
* MQTopicSubscriber is used to subscribe to specific topic.
2121
*/
22-
public class MQTopicSubscriber implements Runnable {
22+
public class MQTopicSubscriber extends MQSubscriber {
2323
private static final Logger logger = LogManager.getLogger(MQTopicSubscriber.class);
2424
private MQTopic topic;
2525
private PCFElement element;
2626
private MQConnection connection;
2727
private String[] labels;
2828
private int timeout;
29+
private boolean isRunning;
2930

3031
/**
3132
* Subscriber constructor
@@ -73,6 +74,13 @@ private void scrapeMetrics(MQGetMessageOptions gmo) {
7374
}
7475
}
7576

77+
/**
78+
* Stops subscriber.
79+
*/
80+
public void stopProcessing() {
81+
isRunning = false;
82+
}
83+
7684
/**
7785
* Starts subscriber.
7886
*/
@@ -82,19 +90,21 @@ public void run() {
8290
MQGetMessageOptions gmo = new MQGetMessageOptions();
8391
gmo.options = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_COMPLETE_MSG;
8492
gmo.waitInterval = timeout;
85-
while (true) {
93+
isRunning = true;
94+
while (isRunning) {
8695
scrapeMetrics(gmo);
8796
}
8897
} catch (MQException e) {
8998
logger.error("Error occurred during establishing connection with topic {}", element.getTopicString(), e);
9099
} finally {
91-
if (topic != null && topic.isOpen()) {
92-
try {
100+
System.out.println("Finishing topic work!");
101+
try {
102+
if (topic != null && topic.isOpen()) {
93103
topic.close();
94-
connection.close();
95-
} catch (MQException e) {
96-
logger.error("Error occurred during disconnecting from topic {}. Error: ", topic.toString(), e);
97104
}
105+
connection.close();
106+
} catch (MQException e) {
107+
logger.error("Error occurred during disconnecting from topic {}. Error: ", topic.toString(), e);
98108
}
99109
}
100110
}

0 commit comments

Comments
 (0)