Skip to content

Commit 7a6a509

Browse files
authored
Merge pull request #116 from Cinimex-Informatica/feature/issue20_add_timeouts
Get message timeout added to config file
2 parents b6c7a37 + ebba1a3 commit 7a6a509

File tree

6 files changed

+38
-18
lines changed

6 files changed

+38
-18
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ qmgrConnectionParams:
9191
password: mqm
9292
# Use MQCSP for connection?
9393
mqscp: false
94+
# How long to wait until metrics are published by queue manager (milliseconds).
95+
# Value must be at least 10000 (periodicity with which metrics are published by MQ).
96+
connTimeout: 12000
9497

9598
# Prometheus connection information -------------------------------
9699
prometheusEndpointParams:

src/main/java/ru/cinimex/exporter/Config.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class Config {
2525
private String user;
2626
private String password;
2727
private boolean mqscp;
28+
private int connTimeout;
2829
private int endpPort;
2930
private String endpURL;
3031
private ArrayList<String> queues;
@@ -55,6 +56,7 @@ public Config(String path) {
5556
this.user = (String) qmgrConnectionParams.get("user");
5657
this.password = (String) qmgrConnectionParams.get("password");
5758
this.mqscp = (boolean) qmgrConnectionParams.get("mqscp");
59+
this.connTimeout = (Integer) qmgrConnectionParams.get("connTimeout");
5860
queues = (ArrayList<String>) config.get("queues");
5961
listeners = (ArrayList<String>) config.get("listeners");
6062
channels = (ArrayList<String>) config.get("channels");
@@ -114,6 +116,10 @@ public String getPassword() {
114116
return password;
115117
}
116118

119+
public int getConnTimeout() {
120+
return connTimeout;
121+
}
122+
117123
public int getEndpPort() {
118124
return endpPort;
119125
}

src/main/java/ru/cinimex/exporter/ExporterLauncher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public static void main(String[] args) {
6464

6565
MetricsManager.initMetrics(elements, monitoringTypes);
6666
MQSubscriberManager manager = new MQSubscriberManager(config.getQmgrHost(), config.getQmgrPort(), config.getQmgrChannel(), config.getQmgrName(), config.getUser(), config.getPassword(), config.useMqscp());
67-
manager.runSubscribers(elements, objects, config.sendPCFCommands(), config.usePCFWildcards(), config.getScrapeInterval());
67+
manager.runSubscribers(elements, objects, config.sendPCFCommands(), config.usePCFWildcards(),
68+
config.getScrapeInterval(), config.getConnTimeout());
6869
try {
6970
new HTTPServer(new InetSocketAddress("0.0.0.0", config.getEndpPort()), config.getEndpURL(), Registry.getRegistry(), false);
7071
} catch (IOException e) {

src/main/java/ru/cinimex/exporter/mq/MQSubscriberManager.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@ public MQSubscriberManager(String host, int port, String channel, String qmName,
4444
* @param sendPCFCommands - this flag indicates, if we should send additional PCF commands (To get queues max depth, channels and listeners statuses).
4545
* @param usePCFWildcards - this flag indicates, if we should use wildcards (uses only 1 connection per MQObject type, but longer response processing).
4646
* @param interval - interval in seconds, at which additional PCF commands are sent.
47+
* @param timeout - timeout for MQGET operation (milliseconds).
4748
*/
48-
public void runSubscribers(List<PCFElement> elements, List<MQObject> objects, boolean sendPCFCommands, boolean usePCFWildcards, int interval) {
49+
public void runSubscribers(List<PCFElement> elements, List<MQObject> objects, boolean sendPCFCommands, boolean usePCFWildcards, int interval, int timeout) {
4950
logger.info("Launching subscribers...");
5051
subscribers = new ArrayList<>();
51-
addTopicSubscribers(elements, objects);
52+
addTopicSubscribers(elements, objects, timeout);
5253
if (sendPCFCommands) {
5354
if (usePCFWildcards) {
5455
EnumMap<MQObject.MQType, ArrayList<MQObject>> groups = groupMQObjects(objects);
@@ -83,15 +84,16 @@ public void stopSubscribers() {
8384
*
8485
* @param elements - list with PCFElements, received from MQ.
8586
* @param objects - list with monitored MQ objects.
87+
* @param timeout - timeout for MQGET operation (milliseconds).
8688
*/
87-
private void addTopicSubscribers(List<PCFElement> elements, List<MQObject> objects) {
89+
private void addTopicSubscribers(List<PCFElement> elements, List<MQObject> objects, int timeout) {
8890
for (PCFElement element : elements) {
8991
if (element.requiresMQObject()) {
9092
for (MQObject object : objects) {
91-
addTopicSubscriber(object, element);
93+
addTopicSubscriber(object, element, timeout);
9294
}
9395
} else {
94-
addTopicSubscriber(element);
96+
addTopicSubscriber(element, timeout);
9597
}
9698
}
9799
}
@@ -101,13 +103,14 @@ private void addTopicSubscribers(List<PCFElement> elements, List<MQObject> objec
101103
*
102104
* @param object - monitored MQ object.
103105
* @param element - PCFElement, received from MQ.
106+
* @param timeout - timeout for MQGET operation (milliseconds).
104107
*/
105-
private void addTopicSubscriber(MQObject object, PCFElement element) {
108+
private void addTopicSubscriber(MQObject object, PCFElement element, int timeout) {
106109
if (object.getType().equals(MQObject.MQType.QUEUE)) {
107110
PCFElement objElement = new PCFElement(element.getTopicString(), element.getRows());
108111
objElement.formatTopicString(object.getName());
109112
try {
110-
subscribers.add(new MQTopicSubscriber(objElement, queueManagerName, connectionProperties, queueManagerName, object.getName()));
113+
subscribers.add(new MQTopicSubscriber(objElement, queueManagerName, connectionProperties, timeout, queueManagerName, object.getName()));
111114
} catch (MQException e) {
112115
logger.error("Error during creating topic subscriber: ", e);
113116
}
@@ -118,10 +121,11 @@ private void addTopicSubscriber(MQObject object, PCFElement element) {
118121
* Adds topic subscriber
119122
*
120123
* @param element - PCFElement, received from MQ.
124+
* @param timeout - timeout for MQGET operation (milliseconds).
121125
*/
122-
private void addTopicSubscriber(PCFElement element) {
126+
private void addTopicSubscriber(PCFElement element, int timeout) {
123127
try {
124-
subscribers.add(new MQTopicSubscriber(element, queueManagerName, connectionProperties, queueManagerName));
128+
subscribers.add(new MQTopicSubscriber(element, queueManagerName, connectionProperties, timeout, queueManagerName));
125129
} catch (MQException e) {
126130
logger.error("Error during creating topic subscriber: ", e);
127131
}

src/main/java/ru/cinimex/exporter/mq/MQTopicSubscriber.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,23 @@ public class MQTopicSubscriber extends MQSubscriber {
2525
private PCFElement element;
2626
private MQConnection connection;
2727
private String[] labels;
28+
private int timeout;
2829
private boolean isRunning;
2930

3031
/**
3132
* Subscriber constructor
3233
*
33-
* @param element - PCF message data, which is required for parsing statistics.
34-
* @param labels - labels array, which should be used for metrics.
34+
* @param element - PCF message data, which is required for parsing statistics.
35+
* @param queueManagerName - queue manager name.
36+
* @param connectionProperties - structure, filled with connection properties.
37+
* @param timeout - timeout for MQGET operation (milliseconds).
38+
* @param labels - labels array, which should be used for metrics.
3539
*/
36-
public MQTopicSubscriber(PCFElement element, String queueManagerName, Hashtable<String, Object> connectionProperties, String... labels) throws MQException {
40+
public MQTopicSubscriber(PCFElement element, String queueManagerName, Hashtable<String, Object> connectionProperties, int timeout, String... labels) throws MQException {
3741
this.element = element;
38-
if (connection == null) {
39-
connection = new MQConnection();
40-
connection.establish(queueManagerName, connectionProperties);
41-
}
42+
this.timeout = timeout;
43+
this.connection = new MQConnection();
44+
this.connection.establish(queueManagerName, connectionProperties);
4245
this.labels = labels;
4346
}
4447

@@ -86,7 +89,7 @@ public void run() {
8689
topic = connection.createTopic(element.getTopicString());
8790
MQGetMessageOptions gmo = new MQGetMessageOptions();
8891
gmo.options = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_COMPLETE_MSG;
89-
gmo.waitInterval = 12000;
92+
gmo.waitInterval = timeout;
9093
isRunning = true;
9194
while (isRunning) {
9295
scrapeMetrics(gmo);

src/main/resources/exporter_config.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ qmgrConnectionParams:
1616
password: mqm
1717
# Use MQCSP for connection?
1818
mqscp: false
19+
# How long to wait until metrics are published by queue manager (milliseconds).
20+
# Value must be at least 10000 (periodicity with which metrics are published by MQ).
21+
connTimeout: 12000
1922

2023
# Prometheus connection information -------------------------------
2124
prometheusEndpointParams:

0 commit comments

Comments
 (0)