Skip to content

Commit d79b20d

Browse files
committed
Dictionary with metrics added, additional metrics implementation started
1 parent af87d36 commit d79b20d

13 files changed

+623
-96
lines changed

src/main/java/ru/cinimex/exporter/mq/MQConnection.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void establish(String host, int port, String channel, String qmName, Stri
6363
try {
6464
queueManager = new MQQueueManager(qmName, connectionProperties);
6565
} catch (MQException e) {
66-
System.err.println(e.getStackTrace());
66+
System.err.println(e.getMessage());
6767
}
6868
}
6969

@@ -78,7 +78,7 @@ public void establish(String qmNqme, Hashtable<String, Object> connectionPropert
7878
try {
7979
queueManager = new MQQueueManager(qmNqme, connectionProperties);
8080
} catch (MQException e) {
81-
System.err.println(e.getStackTrace());
81+
System.err.println(e.getMessage());
8282
}
8383
}
8484

@@ -90,7 +90,7 @@ public void close() {
9090
try {
9191
queueManager.disconnect();
9292
} catch (MQException e) {
93-
System.err.println(e.getStackTrace());
93+
System.err.println(e.getMessage());
9494
}
9595
}
9696
}
@@ -105,4 +105,8 @@ public void close() {
105105
public MQTopic createTopic(String topic) throws MQException {
106106
return queueManager.accessTopic(topic, "", CMQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, CMQC.MQSO_CREATE | CMQC.MQSO_NON_DURABLE | CMQC.MQSO_MANAGED);
107107
}
108+
109+
public MQQueueManager getQueueManager(){
110+
return this.queueManager;
111+
}
108112
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package ru.cinimex.exporter.mq;
2+
3+
import com.ibm.mq.constants.MQConstants;
4+
5+
import java.util.HashMap;
6+
7+
public class MQObject {
8+
private String name;
9+
private MQType type;
10+
private int PCFCmd;
11+
private int PCFHeader;
12+
private HashMap<Integer, Object> PCFParameters = new HashMap<>();
13+
14+
15+
public MQObject(String name, MQType type) {
16+
this.name = name;
17+
this.type = type;
18+
19+
switch (type) {
20+
case QUEUE:
21+
PCFCmd = MQConstants.MQCMD_INQUIRE_Q;
22+
PCFParameters.put(MQConstants.MQCA_Q_NAME, name);
23+
PCFHeader = MQConstants.MQIA_MAX_Q_DEPTH;
24+
break;
25+
case LISTENER:
26+
PCFCmd = MQConstants.MQCMD_INQUIRE_LISTENER_STATUS;
27+
PCFParameters.put(MQConstants.MQCACH_LISTENER_NAME, name);
28+
PCFHeader = MQConstants.MQIACH_LISTENER_STATUS;
29+
break;
30+
case CHANNEL:
31+
PCFCmd = MQConstants.MQCMD_INQUIRE_CHANNEL_STATUS;
32+
PCFParameters.put(MQConstants.MQCACH_CHANNEL_NAME, name);
33+
PCFHeader = MQConstants.MQIACH_LISTENER_STATUS;
34+
break;
35+
default:
36+
//TODO:Exception
37+
}
38+
}
39+
40+
public String getName() {
41+
return name;
42+
}
43+
44+
public int getPCFHeader() {
45+
return PCFHeader;
46+
}
47+
48+
public MQType getType() {
49+
return type;
50+
}
51+
52+
public int getPCFCmd() {
53+
return PCFCmd;
54+
}
55+
56+
public HashMap<Integer, Object> getPCFParameters() {
57+
return PCFParameters;
58+
}
59+
60+
public enum MQType {
61+
QUEUE, CHANNEL, LISTENER;
62+
}
63+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package ru.cinimex.exporter.mq;
2+
3+
import com.ibm.mq.MQException;
4+
import com.ibm.mq.constants.MQConstants;
5+
import com.ibm.mq.pcf.PCFMessage;
6+
import com.ibm.mq.pcf.PCFMessageAgent;
7+
import ru.cinimex.exporter.mq.pcf.PCFFieldReference;
8+
import ru.cinimex.exporter.prometheus.metrics.MetricsManager;
9+
import ru.cinimex.exporter.prometheus.metrics.MetricsReference;
10+
11+
import java.io.IOException;
12+
import java.util.*;
13+
14+
public class MQPCFSubscriber implements Runnable {
15+
private MQConnection connection;
16+
private String queueManagerName;
17+
private MQObject object;
18+
private String[] labels;
19+
private PCFMessage pcfMessage;
20+
private PCFMessageAgent agent;
21+
private boolean useWildcards;
22+
private int intervalSeconds = 10;
23+
private ArrayList<String> objectNames;
24+
25+
public MQPCFSubscriber(String queueManagerName, Hashtable<String, Object> connectionProperties, MQObject object) {
26+
if (connection == null) {
27+
connection = new MQConnection();
28+
connection.establish(queueManagerName, connectionProperties);
29+
}
30+
//TODO: error handling
31+
try {
32+
this.agent = new PCFMessageAgent(connection.getQueueManager());
33+
} catch (MQException e) {
34+
e.printStackTrace();
35+
}
36+
this.object = object;
37+
this.queueManagerName = queueManagerName;
38+
39+
}
40+
41+
public static PCFMessage getPCFCommand(MQObject object) {
42+
PCFMessage pcfCmd = new PCFMessage(object.getPCFCmd());
43+
HashMap<Integer, Object> parameters = object.getPCFParameters();
44+
Iterator it = parameters.entrySet().iterator();
45+
while (it.hasNext()) {
46+
Map.Entry pair = (Map.Entry) it.next();
47+
pcfCmd.addParameter((Integer) pair.getKey(), (String) pair.getValue());
48+
}
49+
return pcfCmd;
50+
}
51+
52+
@Override
53+
public void run() {
54+
try {
55+
PCFMessage request = getPCFCommand(object);
56+
PCFMessage[] pcfResponse = agent.send(request);
57+
PCFMessage response = pcfResponse[0];
58+
59+
Object result = response.getParameterValue(object.getPCFHeader());
60+
if (result.getClass() == Integer.class) {
61+
double prometheusValue = PCFFieldReference.getMetricValue(object.getType(), (Integer) result);
62+
MetricsManager.updateMetric(MetricsReference.getMetricName(object), prometheusValue, queueManagerName,
63+
object.getName());
64+
} else {
65+
System.err.println("Unknown PCF metric: " + result.toString());
66+
}
67+
68+
} catch (MQException e) {
69+
//TODO: error handling
70+
e.printStackTrace();
71+
} catch (IOException e) {
72+
e.printStackTrace();
73+
}
74+
}
75+
76+
}
Lines changed: 30 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
package ru.cinimex.exporter.mq;
22

3-
import com.ibm.mq.MQException;
4-
import com.ibm.mq.MQGetMessageOptions;
5-
import com.ibm.mq.MQMessage;
6-
import com.ibm.mq.MQTopic;
7-
import com.ibm.mq.constants.MQConstants;
8-
import ru.cinimex.exporter.mq.pcf.PCFDataParser;
93
import ru.cinimex.exporter.mq.pcf.PCFElement;
10-
import ru.cinimex.exporter.prometheus.metrics.GaugeManager;
114

12-
import java.util.*;
5+
import java.util.ArrayList;
6+
import java.util.Hashtable;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.ScheduledExecutorService;
9+
import java.util.concurrent.TimeUnit;
1310

1411
/**
1512
* Class is used to manage work of all subscribers.
@@ -39,91 +36,50 @@ public MQSubscriberManager(String host, int port, String channel, String qmName,
3936
/**
4037
* Creates subscribers and starts them.
4138
*
42-
* @param elements - all elements, retrieved from target queue manager.
43-
* @param monitoringObjects - all monitoring objects.
39+
* @param elements - all elements, retrieved from target queue manager.
40+
* @param monitoringQueues - all monitoring objects.
4441
*/
45-
public void runSubscribers(ArrayList<PCFElement> elements, ArrayList<String> monitoringObjects) {
42+
public void runSubscribers(ArrayList<PCFElement> elements, ArrayList<String> monitoringQueues, ArrayList<String> monitoringListeners, ArrayList<String> monitoringChannels, boolean sendPCFCommands, boolean collectQueueMaxDepth, boolean usePCFWildcards) {
4643
subscribers = new ArrayList<>();
4744
int i = 0;
4845
for (PCFElement element : elements) {
4946
if (!element.requiresMQObject()) {
50-
subscribers.add(i, new Thread(new Subscriber(element, queueManagerName)));
47+
subscribers.add(i, new Thread(new MQTopicSubscriber(element, queueManagerName, connectionProperties, queueManagerName)));
5148
subscribers.get(i).start();
5249
i++;
5350
} else {
54-
for (String object : monitoringObjects) {
51+
for (String object : monitoringQueues) {
5552
PCFElement objElement = new PCFElement(element.getTopicString(), element.getRows());
5653
objElement.formatTopicString(object);
57-
subscribers.add(i, new Thread(new Subscriber(objElement, queueManagerName, object)));
54+
subscribers.add(i, new Thread(new MQTopicSubscriber(objElement, queueManagerName, connectionProperties, queueManagerName, object)));
5855
subscribers.get(i).start();
5956
i++;
6057
}
6158
}
6259
}
63-
}
64-
65-
/**
66-
* Subscriber is used to subscribe to specific topic.
67-
*/
68-
class Subscriber implements Runnable {
69-
private MQTopic topic;
70-
private PCFElement element;
71-
private MQConnection connection;
72-
private String[] labels;
7360

74-
/**
75-
* Subscriber constructor
76-
*
77-
* @param element - PCF message data, which is required for parsing statistics.
78-
* @param labels - labels array, which should be used for metrics.
79-
*/
80-
public Subscriber(PCFElement element, String... labels) {
81-
this.element = element;
82-
if (connection == null) {
83-
connection = new MQConnection();
84-
connection.establish(queueManagerName, connectionProperties);
85-
}
86-
this.labels = labels;
61+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
62+
for(String queue : monitoringQueues){
63+
MQPCFSubscriber subscriber = new MQPCFSubscriber(queueManagerName, connectionProperties,
64+
new MQObject(queue, MQObject.MQType.QUEUE));
65+
subscribers.add(i, new Thread(subscriber));
66+
executor.scheduleAtFixedRate(subscriber, 0, 10, TimeUnit.SECONDS);
8767
}
88-
89-
/**
90-
* Starts subscriber.
91-
*/
92-
public void run() {
93-
try {
94-
topic = connection.createTopic(element.getTopicString());
95-
MQGetMessageOptions gmo = new MQGetMessageOptions();
96-
gmo.options = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_COMPLETE_MSG;
97-
gmo.waitInterval = 12000;
98-
while (true) {
99-
try {
100-
MQMessage msg = new MQMessage();
101-
topic.get(msg, gmo);
102-
HashMap<Integer, Double> receivedMetrics = PCFDataParser.getParsedData(PCFDataParser.convertToPCF(msg));
103-
Iterator<Map.Entry<Integer, Double>> it = receivedMetrics.entrySet().iterator();
104-
while (it.hasNext()) {
105-
Map.Entry<Integer, Double> pair = it.next();
106-
//TODO: There is some exception during startup. Need do define, why it occurs (there is unmapped id)
107-
GaugeManager.updateMetric(PCFDataParser.getMetricName(element.getMetricDescription(pair.getKey()), element.requiresMQObject()), pair.getValue(), labels);
108-
it.remove();
109-
}
110-
} catch (MQException e) {
111-
if (e.getReason() == 2033) System.out.println("No messages in " + element.getTopicString());
112-
}
113-
}
114-
} catch (MQException e) {
115-
System.out.println("An error occured while trying to get queue object " + element.getTopicString());
116-
System.err.println(e.getStackTrace());
117-
} finally {
118-
if (topic != null && topic.isOpen()) {
119-
try {
120-
topic.close();
121-
connection.close();
122-
} catch (MQException e) {
123-
System.err.println(e.getStackTrace());
68+
/* if (sendPCFCommands) {
69+
if (collectQueueMaxDepth) {
70+
if (usePCFWildcards) {
71+
subscribers.add(i, new Thread(new MQPCFSubscriber(String queueManagerName, Hashtable<String, Object> connectionProperties, MQObject object)));
72+
subscribers.get(i).start();
73+
i++;
74+
} else {
75+
for (String object : monitoringQueues) {
76+
subscribers.add(i, new Thread(new MQPCFSubscriber()));
77+
subscribers.get(i).start();
78+
i++;
12479
}
12580
}
12681
}
127-
}
82+
}*/
83+
12884
}
12985
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package ru.cinimex.exporter.mq;
2+
3+
import com.ibm.mq.MQException;
4+
import com.ibm.mq.MQGetMessageOptions;
5+
import com.ibm.mq.MQMessage;
6+
import com.ibm.mq.MQTopic;
7+
import com.ibm.mq.constants.MQConstants;
8+
import ru.cinimex.exporter.mq.pcf.PCFDataParser;
9+
import ru.cinimex.exporter.mq.pcf.PCFElement;
10+
import ru.cinimex.exporter.prometheus.metrics.MetricsManager;
11+
import ru.cinimex.exporter.prometheus.metrics.MetricsReference;
12+
13+
import java.util.HashMap;
14+
import java.util.Hashtable;
15+
import java.util.Iterator;
16+
import java.util.Map;
17+
18+
public class MQTopicSubscriber implements Runnable {
19+
/**
20+
* Subscriber is used to subscribe to specific topic.
21+
*/
22+
private MQTopic topic;
23+
private PCFElement element;
24+
private MQConnection connection;
25+
private String[] labels;
26+
27+
/**
28+
* Subscriber constructor
29+
*
30+
* @param element - PCF message data, which is required for parsing statistics.
31+
* @param labels - labels array, which should be used for metrics.
32+
*/
33+
public MQTopicSubscriber(PCFElement element, String queueManagerName,
34+
Hashtable<String, Object> connectionProperties, String... labels) {
35+
this.element = element;
36+
if (connection == null) {
37+
connection = new MQConnection();
38+
connection.establish(queueManagerName, connectionProperties);
39+
}
40+
this.labels = labels;
41+
}
42+
43+
/**
44+
* Starts subscriber.
45+
*/
46+
public void run() {
47+
try {
48+
topic = connection.createTopic(element.getTopicString());
49+
MQGetMessageOptions gmo = new MQGetMessageOptions();
50+
gmo.options = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_COMPLETE_MSG;
51+
gmo.waitInterval = 12000;
52+
while (true) {
53+
try {
54+
MQMessage msg = new MQMessage();
55+
topic.get(msg, gmo);
56+
HashMap<Integer, Double> receivedMetrics = PCFDataParser.getParsedData(PCFDataParser.convertToPCF(msg));
57+
Iterator<Map.Entry<Integer, Double>> it = receivedMetrics.entrySet().iterator();
58+
while (it.hasNext()) {
59+
Map.Entry<Integer, Double> pair = it.next();
60+
MetricsManager.updateMetric(MetricsReference.getMetricName(element.getMetricDescription(pair.getKey()), element.requiresMQObject(), element.getRowDatatype(pair.getKey().intValue())), pair.getValue(), labels);
61+
it.remove();
62+
}
63+
} catch (MQException e) {
64+
if (e.getReason() == MQConstants.MQRC_NO_MSG_AVAILABLE)
65+
System.out.println("No messages in " + element.getTopicString());
66+
}
67+
}
68+
} catch (MQException e) {
69+
System.out.println("An error occured while trying to get queue object " + element.getTopicString());
70+
System.err.println(e.getMessage());
71+
} finally {
72+
if (topic != null && topic.isOpen()) {
73+
try {
74+
topic.close();
75+
connection.close();
76+
} catch (MQException e) {
77+
System.err.println(e.getMessage());
78+
}
79+
}
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)