Skip to content

Commit fdfc215

Browse files
authored
Merge pull request #125 from Cinimex-Informatica/feature/issue95_add_metric_postprocessing
Add new queue-specific metrics, that reflects average size of read messages
2 parents 81dff84 + f220795 commit fdfc215

File tree

5 files changed

+218
-16
lines changed

5 files changed

+218
-16
lines changed

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,24 @@ This section provides a description of metrics related to MQGET, MQCB and MQCTL
950950
<td>Shows the number of calls to MQCTL.</td>
951951
<td>MQCTL count</td>
952952
</tr>
953+
<tr>
954+
<td>mqobject_get_average_destructive_mqget_persistent_message_size_bytes</td>
955+
<td>gauge</td>
956+
<td>Shows an average amount of bytes per persistent message that are returned by MQGET.</td>
957+
<td>destructive MQGET persistent average message byte count</td>
958+
</tr>
959+
<tr>
960+
<td>mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes</td>
961+
<td>gauge</td>
962+
<td>Shows an average amount of bytes per non-persistent message that are returned by MQGET.</td>
963+
<td>destructive MQGET non-persistent average message byte count</td>
964+
</tr>
965+
<tr>
966+
<td>mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes</td>
967+
<td>gauge</td>
968+
<td>Shows an average amount of bytes per persistent and non-persistent messages that are returned by MQGET.</td>
969+
<td>destructive MQGET persistent and non-persistent average message byte count</td>
970+
</tr>
953971
</tbody>
954972
</table>
955973

src/main/java/ru/cinimex/exporter/prometheus/HTTPServer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
import java.util.HashSet;
1919
import java.util.List;
2020
import java.util.Set;
21-
import java.util.concurrent.*;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.FutureTask;
25+
import java.util.concurrent.ThreadFactory;
2226
import java.util.concurrent.atomic.AtomicInteger;
2327
import java.util.zip.GZIPOutputStream;
2428

@@ -125,6 +129,7 @@ public void handle(HttpExchange t) throws IOException {
125129
ByteArrayOutputStream streamResponse = this.response.get();
126130
streamResponse.reset();
127131
OutputStreamWriter osw = new OutputStreamWriter(streamResponse);
132+
MetricsManager.updateAdditionalMetrics(parseQuery(query));
128133
TextFormat.write004(osw, registry.filteredMetricFamilySamples(parseQuery(query)));
129134
osw.flush();
130135
osw.close();
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package ru.cinimex.exporter.prometheus.metrics;
2+
3+
import io.prometheus.client.CollectorRegistry;
4+
5+
import java.util.ArrayList;
6+
import java.util.Collections;
7+
import java.util.Enumeration;
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Objects;
12+
import java.util.Set;
13+
import java.util.function.Consumer;
14+
import java.util.function.Function;
15+
import java.util.stream.Collectors;
16+
17+
import static java.util.stream.Collectors.toMap;
18+
19+
/**
20+
* Util class is used to update metrics.
21+
*/
22+
public class MetricManagerUtils {
23+
private MetricManagerUtils() {
24+
}
25+
26+
27+
/**
28+
* @return - list of metric names, than will be updated
29+
*/
30+
public static List<String> getUpdatedMetricNames() {
31+
List<String> updatedMetricNames = new ArrayList<>();
32+
updatedMetricNames.add("mqobject_get_average_destructive_mqget_persistent_message_size_bytes");
33+
updatedMetricNames.add("mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes");
34+
updatedMetricNames.add("mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes");
35+
return updatedMetricNames;
36+
}
37+
38+
39+
/**
40+
* @param updatedMetricName - metric name, than will be updated
41+
* @return list of metric names, than will be used to update metric with name updatedMetricName
42+
*/
43+
public static List<String> getMetricsNamesUsedToUpdate(String updatedMetricName) {
44+
List<String> listWithNames = new ArrayList<>();
45+
switch (updatedMetricName) {
46+
case "mqobject_get_average_destructive_mqget_persistent_message_size_bytes":
47+
listWithNames.add("mqobject_get_destructive_mqget_persistent_byte_count_totalbytes");
48+
listWithNames.add("mqobject_get_destructive_mqget_persistent_message_count_totalmessages");
49+
break;
50+
case "mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes":
51+
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_byte_count_totalbytes");
52+
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_message_count_totalmessages");
53+
break;
54+
case "mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes":
55+
listWithNames.add("mqobject_get_destructive_mqget_persistent_byte_count_totalbytes");
56+
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_byte_count_totalbytes");
57+
listWithNames.add("mqobject_get_destructive_mqget_persistent_message_count_totalmessages");
58+
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_message_count_totalmessages");
59+
break;
60+
default:
61+
break;
62+
}
63+
return listWithNames;
64+
}
65+
66+
/**
67+
* @param updatedMetricName - metric name, than will be updated
68+
* @return function, that will be used for updating metric value
69+
*/
70+
public static Function<List<Double>, Double> getConversionFunction(String updatedMetricName) {
71+
switch (updatedMetricName) {
72+
case "mqobject_get_average_destructive_mqget_persistent_message_size_bytes":
73+
case "mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes":
74+
return MetricManagerUtils::division;
75+
case "mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes":
76+
return MetricManagerUtils::averageSum;
77+
default:
78+
return MetricManagerUtils::defaultConversion;
79+
}
80+
}
81+
82+
private static Double averageSum(List<Double> params) {
83+
Objects.requireNonNull(params);
84+
if (params.size() != 4) throw new IllegalArgumentException();
85+
return (params.get(2) + params.get(3)) == 0.0 ? 0.0 : (params.get(0) + params.get(1)) / (params.get(2) + params.get(3));
86+
}
87+
88+
private static Double division(List<Double> params) {
89+
Objects.requireNonNull(params);
90+
if (params.size() != 2) throw new IllegalArgumentException();
91+
return params.get(1) == 0.0 ? 0.0 : params.get(0) / params.get(1);
92+
}
93+
94+
private static Double defaultConversion(List<Double> params) {
95+
return Objects.requireNonNull(params).get(0);
96+
}
97+
98+
/**
99+
* @param parsedQuery - parameter, needed for getting metric in special family metrics
100+
* @param updatedMetricName - metric name, than will be updated
101+
* @return map with label list as key and double parameter list used for conversion function as value
102+
*/
103+
public static Map<List<String>, List<Double>> getMetricsUsedToUpdate(Set<String> parsedQuery, String updatedMetricName) {
104+
List<String> metricsNamesUsedToUpdate = getMetricsNamesUsedToUpdate(updatedMetricName);
105+
Map<String, Map<List<String>, Double>> mapWithValues = new HashMap<>();
106+
forEachRemaining(CollectorRegistry.defaultRegistry.filteredMetricFamilySamples(parsedQuery), metricFamilySamples -> {
107+
Map<List<String>, Double> arrayListDoubleMap =
108+
metricFamilySamples.samples.stream()
109+
.filter(sample -> metricsNamesUsedToUpdate.contains(sample.name))
110+
.collect(toMap(sample -> sample.labelValues, sample -> sample.value, (a, b) -> b));
111+
if (!arrayListDoubleMap.isEmpty()) mapWithValues.put(metricFamilySamples.name, arrayListDoubleMap);
112+
});
113+
114+
List<Map<List<String>, Double>> listWithValues = metricsNamesUsedToUpdate.stream()
115+
.map(mapWithValues::get)
116+
.collect(Collectors.toList());
117+
Map<List<String>, List<Double>> params = new HashMap<>();
118+
listWithValues.forEach(l -> l.forEach((k, v) -> {
119+
if (!params.containsKey(k)) params.put(k, new ArrayList<>(Collections.singletonList(v)));
120+
else {
121+
List<Double> paramList = params.get(k);
122+
paramList.add(v);
123+
params.replace(k, paramList);
124+
}
125+
}
126+
));
127+
return params;
128+
}
129+
130+
private static <T> void forEachRemaining(Enumeration<T> e, Consumer<? super T> c) {
131+
while (e.hasMoreElements()) c.accept(e.nextElement());
132+
}
133+
}

src/main/java/ru/cinimex/exporter/prometheus/metrics/MetricsManager.java

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,14 @@
77
import ru.cinimex.exporter.mq.pcf.PCFElementRow;
88

99
import java.util.ArrayList;
10+
import java.util.Arrays;
1011
import java.util.HashMap;
1112
import java.util.List;
13+
import java.util.Map;
14+
import java.util.Set;
15+
16+
import static ru.cinimex.exporter.prometheus.metrics.MetricsReference.getAdditionalMqObjectMetricsReference;
17+
import static ru.cinimex.exporter.prometheus.metrics.MetricManagerUtils.*;
1218

1319
/**
1420
* Class is used to manage work of all metrics.
@@ -25,6 +31,7 @@ public class MetricsManager {
2531
*/
2632
public static void initMetrics(List<PCFElement> elements, List<MQObject.MQType> types) {
2733
logger.debug("Preparing to initialize metrics. {} metrics will be received from MQ topics and {} metrics will be received via direct PCF commands.", elements.size(), types.size());
34+
String logString = " created! Name: {}, description: {}, labels: {}.";
2835
metrics = new HashMap<>();
2936
for (PCFElement element : elements) {
3037
for (PCFElementRow row : element.getRows()) {
@@ -33,32 +40,32 @@ public static void initMetrics(List<PCFElement> elements, List<MQObject.MQType>
3340
ArrayList<String> labels = new ArrayList<>();
3441
labels.add(Labels.QMGR_NAME.name());
3542
MetricInterface metric;
36-
if (element.requiresMQObject()) {
37-
labels.add(Labels.MQ_OBJECT_NAME.name());
38-
}
43+
if (element.requiresMQObject()) labels.add(Labels.MQ_OBJECT_NAME.name());
3944
switch (metricType) {
4045
case SIMPLE_GAUGE:
41-
metric = new SimpleGauge(metricName, row.getRowDesc(), labels.stream().toArray(String[]::new));
46+
metric = new SimpleGauge(metricName, row.getRowDesc(), labels.toArray(new String[0]));
4247
metrics.put(metricName, metric);
43-
logger.trace("New gauge created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
48+
logger.trace("New " + "gauge" + logString, metricName, row.getRowDesc(), labels);
4449
break;
4550
case SIMPLE_COUNTER:
46-
metric = new SimpleCounter(metricName, row.getRowDesc(), labels.stream().toArray(String[]::new));
51+
metric = new SimpleCounter(metricName, row.getRowDesc(), labels.toArray(new String[0]));
4752
metrics.put(metricName, metric);
48-
logger.trace("New counter created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
53+
logger.trace("New counter " + logString, metricName, row.getRowDesc(), labels);
4954
break;
5055
case EXTREME_GAUGE_MAX:
51-
metric = new ExtremeGauge(metricName, row.getRowDesc(), true, labels.stream().toArray(String[]::new));
56+
metric = new ExtremeGauge(metricName, row.getRowDesc(), true, labels.toArray(new String[0]));
5257
metrics.put(metricName, metric);
53-
logger.trace("New extreme gauge created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
58+
logger.trace("New extreme gauge" + logString, metricName, row.getRowDesc(), labels);
5459
break;
5560
case EXTREME_GAUGE_MIN:
56-
metric = new ExtremeGauge(metricName, row.getRowDesc(), false, labels.stream().toArray(String[]::new));
61+
metric = new ExtremeGauge(metricName, row.getRowDesc(), false, labels.toArray(new String[0]));
5762
metrics.put(metricName, metric);
58-
logger.trace("New extreme gauge created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
63+
logger.trace("New extreme gauge" + logString, metricName, row.getRowDesc(), labels);
5964
break;
6065
default:
61-
logger.error("Error during metrics initialization: Unknown metric type! Make sure it is one " + "of: {}", MetricsReference.Metric.Type.values());
66+
logger.error(
67+
"Error during metrics initialization: Unknown metric type! Make sure it is one " + "of: {}",
68+
(Object[]) MetricsReference.Metric.Type.values());
6269
}
6370
}
6471
}
@@ -67,9 +74,19 @@ public static void initMetrics(List<PCFElement> elements, List<MQObject.MQType>
6774
metrics.put(metricName, new SimpleGauge(metricName, MetricsReference.getMetricHelp(type), Labels.QMGR_NAME.name(), Labels.MQ_OBJECT_NAME.name()));
6875
logger.trace("New gauge created! Name: {}, description: {}, labels: {}.", metricName, MetricsReference.getMetricHelp(type), Labels.MQ_OBJECT_NAME.name());
6976
}
77+
initAdditionalMetrics();
78+
7079
logger.info("Successfully initialized {} metrics!", metrics.size());
7180
}
7281

82+
private static void initAdditionalMetrics() {
83+
getAdditionalMqObjectMetricsReference().forEach((metricInfo, metric) -> {
84+
List<String> labels = Arrays.asList(Labels.QMGR_NAME.name(), Labels.MQ_OBJECT_NAME.name());
85+
metrics.put(metric.name, new SimpleGauge(metric.name, metricInfo, labels.toArray(new String[0])));
86+
logger.trace("New gauge created! Name: {}, description: {}, labels: {}.", metric.name, metricInfo, labels);
87+
});
88+
}
89+
7390
/**
7491
* Updates specific metric
7592
*
@@ -81,13 +98,33 @@ public static void updateMetric(String metricName, Double value, String... label
8198
metrics.get(metricName).update(value, labels);
8299
}
83100

101+
/**
102+
* Updates additional metrics, for which need special conversion
103+
*
104+
* @param parsedQuery - parameter, needed for getting metric in special family metrics
105+
*/
106+
public static void updateAdditionalMetrics(Set<String> parsedQuery) {
107+
getUpdatedMetricNames().forEach(updatedMetricName -> complexUpdateMetrics(
108+
getMetricsUsedToUpdate(
109+
parsedQuery,
110+
updatedMetricName),
111+
updatedMetricName));
112+
logger.trace("Additional metrics was updated");
113+
}
114+
115+
private static void complexUpdateMetrics(
116+
Map<List<String>, List<Double>> metricsUsedToUpdate,
117+
String updatedMetricName) {
118+
metricsUsedToUpdate.forEach((k, l) -> updateMetric(
119+
updatedMetricName, getConversionFunction(updatedMetricName).apply(l), k.toArray(new String[0])));
120+
logger.trace("Additional metrics {} was updated", updatedMetricName);
121+
}
122+
84123
/**
85124
* Notifies all metrics after each Prometheus scrape.
86125
*/
87126
public static void notifyMetricsWereScraped() {
88-
for (MetricInterface metric : metrics.values()) {
89-
metric.notifyWasScraped();
90-
}
127+
metrics.values().forEach(MetricInterface::notifyWasScraped);
91128
}
92129

93130
/**

src/main/java/ru/cinimex/exporter/prometheus/metrics/MetricsReference.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.util.EnumMap;
99
import java.util.HashMap;
10+
import java.util.Map;
1011

1112
public class MetricsReference {
1213
private static final Logger logger = LogManager.getLogger(MetricsReference.class);
@@ -168,6 +169,14 @@ private static HashMap<String, Metric> getMqObjectMetricsReference() {
168169
return metrics;
169170
}
170171

172+
public static Map<String, Metric> getAdditionalMqObjectMetricsReference() {
173+
Map<String, Metric> metrics = new HashMap<>();
174+
metrics.put("destructive MQGET persistent average message byte count", new Metric("mqobject_get_average_destructive_mqget_persistent_message_size_bytes", Metric.Type.SIMPLE_GAUGE));
175+
metrics.put("destructive MQGET non-persistent average message byte count", new Metric("mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes", Metric.Type.SIMPLE_GAUGE));
176+
metrics.put("destructive MQGET persistent and non-persistent average message byte count", new Metric("mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes", Metric.Type.SIMPLE_GAUGE));
177+
return metrics;
178+
}
179+
171180
/**
172181
* Method is used to initialize ChannelStatuses.
173182
*

0 commit comments

Comments
 (0)