Skip to content

Commit b756a3e

Browse files
committed
Get message timeout added to config file
Resolves: #20
1 parent e9ce739 commit b756a3e

File tree

6 files changed

+38
-18
lines changed

6 files changed

+38
-18
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ qmgrConnectionParams:
9191
password: mqm
9292
# Use MQCSP for connection?
9393
mqscp: false
94+
# How long to wait until metrics are published by queue manager (milliseconds).
95+
# Value must be at least 10000 (periodicity with which metrics are published by MQ).
96+
connTimeout: 12000
9497

9598
# Prometheus connection information -------------------------------
9699
prometheusEndpointParams:

src/main/java/ru/cinimex/exporter/Config.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class Config {
2525
private String user;
2626
private String password;
2727
private boolean mqscp;
28+
private int connTimeout;
2829
private int endpPort;
2930
private String endpURL;
3031
private ArrayList<String> queues;
@@ -55,6 +56,7 @@ public Config(String path) {
5556
this.user = (String) qmgrConnectionParams.get("user");
5657
this.password = (String) qmgrConnectionParams.get("password");
5758
this.mqscp = (boolean) qmgrConnectionParams.get("mqscp");
59+
this.connTimeout = (Integer) qmgrConnectionParams.get("connTimeout");
5860
queues = (ArrayList<String>) config.get("queues");
5961
listeners = (ArrayList<String>) config.get("listeners");
6062
channels = (ArrayList<String>) config.get("channels");
@@ -114,6 +116,10 @@ public String getPassword() {
114116
return password;
115117
}
116118

119+
public int getConnTimeout() {
120+
return connTimeout;
121+
}
122+
117123
public int getEndpPort() {
118124
return endpPort;
119125
}

src/main/java/ru/cinimex/exporter/ExporterLauncher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public static void main(String[] args) {
6464

6565
MetricsManager.initMetrics(elements, monitoringTypes);
6666
MQSubscriberManager manager = new MQSubscriberManager(config.getQmgrHost(), config.getQmgrPort(), config.getQmgrChannel(), config.getQmgrName(), config.getUser(), config.getPassword(), config.useMqscp());
67-
manager.runSubscribers(elements, objects, config.sendPCFCommands(), config.usePCFWildcards(), config.getScrapeInterval());
67+
manager.runSubscribers(elements, objects, config.sendPCFCommands(), config.usePCFWildcards(),
68+
config.getScrapeInterval(), config.getConnTimeout());
6869
try {
6970
new HTTPServer(new InetSocketAddress("0.0.0.0", config.getEndpPort()), config.getEndpURL(), Registry.getRegistry(), false);
7071
} catch (IOException e) {

src/main/java/ru/cinimex/exporter/mq/MQSubscriberManager.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@ public MQSubscriberManager(String host, int port, String channel, String qmName,
4343
* @param sendPCFCommands - this flag indicates, if we should send additional PCF commands (To get queues max depth, channels and listeners statuses).
4444
* @param usePCFWildcards - this flag indicates, if we should use wildcards (uses only 1 connection per MQObject type, but longer response processing).
4545
* @param interval - interval in seconds, at which additional PCF commands are sent.
46+
* @param timeout - timeout for MQGET operation (milliseconds).
4647
*/
47-
public void runSubscribers(List<PCFElement> elements, List<MQObject> objects, boolean sendPCFCommands, boolean usePCFWildcards, int interval) {
48+
public void runSubscribers(List<PCFElement> elements, List<MQObject> objects, boolean sendPCFCommands, boolean usePCFWildcards, int interval, int timeout) {
4849
logger.info("Launching subscribers...");
4950
subscribers = new ArrayList<>();
50-
addTopicSubscribers(elements, objects);
51+
addTopicSubscribers(elements, objects, timeout);
5152
if (sendPCFCommands) {
5253
if (usePCFWildcards) {
5354
EnumMap<MQObject.MQType, ArrayList<MQObject>> groups = groupMQObjects(objects);
@@ -73,15 +74,16 @@ public void runSubscribers(List<PCFElement> elements, List<MQObject> objects, bo
7374
*
7475
* @param elements - list with PCFElements, received from MQ.
7576
* @param objects - list with monitored MQ objects.
77+
* @param timeout - timeout for MQGET operation (milliseconds).
7678
*/
77-
private void addTopicSubscribers(List<PCFElement> elements, List<MQObject> objects) {
79+
private void addTopicSubscribers(List<PCFElement> elements, List<MQObject> objects, int timeout) {
7880
for (PCFElement element : elements) {
7981
if (element.requiresMQObject()) {
8082
for (MQObject object : objects) {
81-
addTopicSubscriber(object, element);
83+
addTopicSubscriber(object, element, timeout);
8284
}
8385
} else {
84-
addTopicSubscriber(element);
86+
addTopicSubscriber(element, timeout);
8587
}
8688
}
8789
}
@@ -91,13 +93,14 @@ private void addTopicSubscribers(List<PCFElement> elements, List<MQObject> objec
9193
*
9294
* @param object - monitored MQ object.
9395
* @param element - PCFElement, received from MQ.
96+
* @param timeout - timeout for MQGET operation (milliseconds).
9497
*/
95-
private void addTopicSubscriber(MQObject object, PCFElement element) {
98+
private void addTopicSubscriber(MQObject object, PCFElement element, int timeout) {
9699
if (object.getType().equals(MQObject.MQType.QUEUE)) {
97100
PCFElement objElement = new PCFElement(element.getTopicString(), element.getRows());
98101
objElement.formatTopicString(object.getName());
99102
try {
100-
subscribers.add(new Thread(new MQTopicSubscriber(objElement, queueManagerName, connectionProperties, queueManagerName, object.getName())));
103+
subscribers.add(new Thread(new MQTopicSubscriber(objElement, queueManagerName, connectionProperties, timeout, queueManagerName, object.getName())));
101104
} catch (MQException e) {
102105
logger.error("Error during creating topic subscriber: ", e);
103106
}
@@ -108,10 +111,11 @@ private void addTopicSubscriber(MQObject object, PCFElement element) {
108111
* Adds topic subscriber
109112
*
110113
* @param element - PCFElement, received from MQ.
114+
* @param timeout - timeout for MQGET operation (milliseconds).
111115
*/
112-
private void addTopicSubscriber(PCFElement element) {
116+
private void addTopicSubscriber(PCFElement element, int timeout) {
113117
try {
114-
subscribers.add(new Thread(new MQTopicSubscriber(element, queueManagerName, connectionProperties, queueManagerName)));
118+
subscribers.add(new Thread(new MQTopicSubscriber(element, queueManagerName, connectionProperties, timeout, queueManagerName)));
115119
} catch (MQException e) {
116120
logger.error("Error during creating topic subscriber: ", e);
117121
}

src/main/java/ru/cinimex/exporter/mq/MQTopicSubscriber.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,22 @@ public class MQTopicSubscriber implements Runnable {
2525
private PCFElement element;
2626
private MQConnection connection;
2727
private String[] labels;
28+
private int timeout;
2829

2930
/**
3031
* Subscriber constructor
3132
*
32-
* @param element - PCF message data, which is required for parsing statistics.
33-
* @param labels - labels array, which should be used for metrics.
33+
* @param element - PCF message data, which is required for parsing statistics.
34+
* @param queueManagerName - queue manager name.
35+
* @param connectionProperties - structure, filled with connection properties.
36+
* @param timeout - timeout for MQGET operation (milliseconds).
37+
* @param labels - labels array, which should be used for metrics.
3438
*/
35-
public MQTopicSubscriber(PCFElement element, String queueManagerName, Hashtable<String, Object> connectionProperties, String... labels) throws MQException {
39+
public MQTopicSubscriber(PCFElement element, String queueManagerName, Hashtable<String, Object> connectionProperties, int timeout, String... labels) throws MQException {
3640
this.element = element;
37-
if (connection == null) {
38-
connection = new MQConnection();
39-
connection.establish(queueManagerName, connectionProperties);
40-
}
41+
this.timeout = timeout;
42+
this.connection = new MQConnection();
43+
this.connection.establish(queueManagerName, connectionProperties);
4144
this.labels = labels;
4245
}
4346

@@ -78,7 +81,7 @@ public void run() {
7881
topic = connection.createTopic(element.getTopicString());
7982
MQGetMessageOptions gmo = new MQGetMessageOptions();
8083
gmo.options = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_COMPLETE_MSG;
81-
gmo.waitInterval = 12000;
84+
gmo.waitInterval = timeout;
8285
while (true) {
8386
scrapeMetrics(gmo);
8487
}

src/main/resources/exporter_config.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ qmgrConnectionParams:
1616
password: mqm
1717
# Use MQCSP for connection?
1818
mqscp: false
19+
# How long to wait until metrics are published by queue manager (milliseconds).
20+
# Value must be at least 10000 (periodicity with which metrics are published by MQ).
21+
connTimeout: 12000
1922

2023
# Prometheus connection information -------------------------------
2124
prometheusEndpointParams:

0 commit comments

Comments
 (0)