Skip to content

Commit 913b6e0

Browse files
committed
add metric post processing when handle request from prometheus + add new queue-specific metrics
1 parent 81dff84 commit 913b6e0

File tree

4 files changed

+187
-10
lines changed

4 files changed

+187
-10
lines changed

src/main/java/ru/cinimex/exporter/prometheus/HTTPServer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.sun.net.httpserver.HttpExchange;
44
import com.sun.net.httpserver.HttpHandler;
55
import com.sun.net.httpserver.HttpServer;
6+
import io.prometheus.client.Collector;
67
import io.prometheus.client.CollectorRegistry;
78
import io.prometheus.client.exporter.common.TextFormat;
89
import org.apache.logging.log4j.LogManager;
@@ -15,6 +16,7 @@
1516
import java.net.HttpURLConnection;
1617
import java.net.InetSocketAddress;
1718
import java.net.URLDecoder;
19+
import java.util.Enumeration;
1820
import java.util.HashSet;
1921
import java.util.List;
2022
import java.util.Set;
@@ -125,6 +127,7 @@ public void handle(HttpExchange t) throws IOException {
125127
ByteArrayOutputStream streamResponse = this.response.get();
126128
streamResponse.reset();
127129
OutputStreamWriter osw = new OutputStreamWriter(streamResponse);
130+
MetricsManager.updateAdditionalMetrics(parseQuery(query));
128131
TextFormat.write004(osw, registry.filteredMetricFamilySamples(parseQuery(query)));
129132
osw.flush();
130133
osw.close();

src/main/java/ru/cinimex/exporter/prometheus/metrics/MetricsManager.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package ru.cinimex.exporter.prometheus.metrics;
22

3+
import io.prometheus.client.CollectorRegistry;
34
import org.apache.logging.log4j.LogManager;
45
import org.apache.logging.log4j.Logger;
56
import ru.cinimex.exporter.mq.MQObject;
@@ -9,6 +10,12 @@
910
import java.util.ArrayList;
1011
import java.util.HashMap;
1112
import java.util.List;
13+
import java.util.Map;
14+
import java.util.Set;
15+
import java.util.function.Function;
16+
17+
import static ru.cinimex.exporter.prometheus.metrics.MetricsReference.getAdditionalMqObjectMetricsReference;
18+
import static ru.cinimex.exporter.prometheus.metrics.UpdateMetricUtils.*;
1219

1320
/**
1421
* Class is used to manage work of all metrics.
@@ -33,27 +40,25 @@ public static void initMetrics(List<PCFElement> elements, List<MQObject.MQType>
3340
ArrayList<String> labels = new ArrayList<>();
3441
labels.add(Labels.QMGR_NAME.name());
3542
MetricInterface metric;
36-
if (element.requiresMQObject()) {
37-
labels.add(Labels.MQ_OBJECT_NAME.name());
38-
}
43+
if (element.requiresMQObject()) labels.add(Labels.MQ_OBJECT_NAME.name());
3944
switch (metricType) {
4045
case SIMPLE_GAUGE:
41-
metric = new SimpleGauge(metricName, row.getRowDesc(), labels.stream().toArray(String[]::new));
46+
metric = new SimpleGauge(metricName, row.getRowDesc(), labels.toArray(new String[0]));
4247
metrics.put(metricName, metric);
4348
logger.trace("New gauge created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
4449
break;
4550
case SIMPLE_COUNTER:
46-
metric = new SimpleCounter(metricName, row.getRowDesc(), labels.stream().toArray(String[]::new));
51+
metric = new SimpleCounter(metricName, row.getRowDesc(), labels.toArray(new String[0]));
4752
metrics.put(metricName, metric);
4853
logger.trace("New counter created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
4954
break;
5055
case EXTREME_GAUGE_MAX:
51-
metric = new ExtremeGauge(metricName, row.getRowDesc(), true, labels.stream().toArray(String[]::new));
56+
metric = new ExtremeGauge(metricName, row.getRowDesc(), true, labels.toArray(new String[0]));
5257
metrics.put(metricName, metric);
5358
logger.trace("New extreme gauge created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
5459
break;
5560
case EXTREME_GAUGE_MIN:
56-
metric = new ExtremeGauge(metricName, row.getRowDesc(), false, labels.stream().toArray(String[]::new));
61+
metric = new ExtremeGauge(metricName, row.getRowDesc(), false, labels.toArray(new String[0]));
5762
metrics.put(metricName, metric);
5863
logger.trace("New extreme gauge created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
5964
break;
@@ -67,6 +72,14 @@ public static void initMetrics(List<PCFElement> elements, List<MQObject.MQType>
6772
metrics.put(metricName, new SimpleGauge(metricName, MetricsReference.getMetricHelp(type), Labels.QMGR_NAME.name(), Labels.MQ_OBJECT_NAME.name()));
6873
logger.trace("New gauge created! Name: {}, description: {}, labels: {}.", metricName, MetricsReference.getMetricHelp(type), Labels.MQ_OBJECT_NAME.name());
6974
}
75+
getAdditionalMqObjectMetricsReference().forEach((metricInfo, metric) -> {
76+
ArrayList<String> labels = new ArrayList<>();
77+
labels.add(Labels.QMGR_NAME.name());
78+
labels.add(Labels.MQ_OBJECT_NAME.name());
79+
metrics.put(metric.name, new SimpleGauge(metric.name, metricInfo, labels.toArray(new String[0])));
80+
logger.trace("New gauge created! Name: {}, description: {}, labels: {}.", metric.name, metricInfo, labels);
81+
});
82+
7083
logger.info("Successfully initialized {} metrics!", metrics.size());
7184
}
7285

@@ -81,13 +94,33 @@ public static void updateMetric(String metricName, Double value, String... label
8194
metrics.get(metricName).update(value, labels);
8295
}
8396

97+
/**
98+
* Updates additional metrics, for which need special conversion
99+
*
100+
* @param parsedQuery - parameter, needed for getting metric in special family metrics
101+
*/
102+
public static void updateAdditionalMetrics(Set<String> parsedQuery) {
103+
getUpdatedMetricNames().forEach(updatedMetricName -> complexUpdateMetrics(
104+
getMetricsUsedToUpdate(
105+
parsedQuery,
106+
updatedMetricName),
107+
updatedMetricName));
108+
logger.trace("Additional metrics was updated");
109+
}
110+
111+
private static void complexUpdateMetrics(
112+
Map<List<String>, List<Double>> metricsUsedToUpdate,
113+
String updatedMetricName) {
114+
metricsUsedToUpdate.forEach((k, l) -> updateMetric(
115+
updatedMetricName, getConversionFunction(updatedMetricName).apply(l), k.toArray(new String[0])));
116+
logger.trace("Additional metrics {} was updated", updatedMetricName);
117+
}
118+
84119
/**
85120
* Notifies all metrics after each Prometheus scrape.
86121
*/
87122
public static void notifyMetricsWereScraped() {
88-
for (MetricInterface metric : metrics.values()) {
89-
metric.notifyWasScraped();
90-
}
123+
metrics.values().forEach(MetricInterface::notifyWasScraped);
91124
}
92125

93126
/**

src/main/java/ru/cinimex/exporter/prometheus/metrics/MetricsReference.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.util.EnumMap;
99
import java.util.HashMap;
10+
import java.util.Map;
1011

1112
public class MetricsReference {
1213
private static final Logger logger = LogManager.getLogger(MetricsReference.class);
@@ -168,6 +169,13 @@ private static HashMap<String, Metric> getMqObjectMetricsReference() {
168169
return metrics;
169170
}
170171

172+
public static Map<String, Metric> getAdditionalMqObjectMetricsReference() {
173+
Map<String, Metric> metrics = new HashMap<>();
174+
metrics.put("destructive MQGET persistent average message byte count", new Metric("mqobject_get_average_destructive_mqget_persistent_message_size_bytes", Metric.Type.SIMPLE_GAUGE));
175+
metrics.put("destructive MQGET non-persistent average message byte count", new Metric("mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes", Metric.Type.SIMPLE_GAUGE));
176+
metrics.put("destructive MQGET persistent and non-persistent average message byte count", new Metric("mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes", Metric.Type.SIMPLE_GAUGE));
177+
return metrics;
178+
}
171179
/**
172180
* Method is used to initialize ChannelStatuses.
173181
*
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package ru.cinimex.exporter.prometheus.metrics;
2+
3+
import io.prometheus.client.CollectorRegistry;
4+
5+
import java.util.ArrayList;
6+
import java.util.Collections;
7+
import java.util.Enumeration;
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Objects;
12+
import java.util.Set;
13+
import java.util.function.Consumer;
14+
import java.util.function.Function;
15+
import java.util.stream.Collectors;
16+
17+
import static java.util.stream.Collectors.toMap;
18+
19+
/**
20+
* Util class is used to update metrics.
21+
*/
22+
public class UpdateMetricUtils {
23+
private UpdateMetricUtils() {
24+
}
25+
26+
27+
/**
28+
* @return - list of metric names, than will be updated
29+
*/
30+
public static List<String> getUpdatedMetricNames() {
31+
List<String> updatedMetricNames = new ArrayList<>();
32+
updatedMetricNames.add("mqobject_get_average_destructive_mqget_persistent_message_size_bytes");
33+
updatedMetricNames.add("mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes");
34+
updatedMetricNames.add("mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes");
35+
return updatedMetricNames;
36+
}
37+
38+
39+
/**
40+
* @param updatedMetricName - metric name, than will be updated
41+
* @return list of metric names, than will be used to update metric with name updatedMetricName
42+
*/
43+
public static List<String> getMetricsNamesUsedToUpdate(String updatedMetricName) {
44+
List<String> listWithNames = new ArrayList<>();
45+
switch (updatedMetricName) {
46+
case "mqobject_get_average_destructive_mqget_persistent_message_size_bytes":
47+
listWithNames.add("mqobject_get_destructive_mqget_persistent_byte_count_totalbytes");
48+
listWithNames.add("mqobject_get_destructive_mqget_persistent_message_count_totalmessages");
49+
break;
50+
case "mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes":
51+
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_byte_count_totalbytes");
52+
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_message_count_totalmessages");
53+
break;
54+
case "mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes":
55+
listWithNames.add("mqobject_get_destructive_mqget_persistent_byte_count_totalbytes");
56+
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_byte_count_totalbytes");
57+
listWithNames.add("mqobject_get_destructive_mqget_persistent_message_count_totalmessages");
58+
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_message_count_totalmessages");
59+
break;
60+
default:
61+
break;
62+
}
63+
return listWithNames;
64+
}
65+
66+
/**
67+
* @param updatedMetricName - metric name, than will be updated
68+
* @return function, that will be used for updating metric value
69+
*/
70+
public static Function<List<Double>, Double> getConversionFunction(String updatedMetricName) {
71+
switch (updatedMetricName) {
72+
case "mqobject_get_average_destructive_mqget_persistent_message_size_bytes":
73+
case "mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes":
74+
return UpdateMetricUtils::division;
75+
case "mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes":
76+
return UpdateMetricUtils::averageSum;
77+
default:
78+
return UpdateMetricUtils::defaultConversion;
79+
}
80+
}
81+
82+
private static Double averageSum(List<Double> params) {
83+
Objects.requireNonNull(params);
84+
if (params.size() != 4) throw new IllegalArgumentException();
85+
return (params.get(2) + params.get(3)) == 0.0 ? 0.0 : (params.get(0) + params.get(1)) / (params.get(2) + params.get(3));
86+
}
87+
88+
private static Double division(List<Double> params) {
89+
Objects.requireNonNull(params);
90+
if (params.size() != 2) throw new IllegalArgumentException();
91+
return params.get(1) == 0.0 ? 0.0 : params.get(0) / params.get(1);
92+
}
93+
94+
private static Double defaultConversion(List<Double> params) {
95+
return Objects.requireNonNull(params).get(0);
96+
}
97+
98+
/**
99+
* @param parsedQuery - parameter, needed for getting metric in special family metrics
100+
* @param updatedMetricName - metric name, than will be updated
101+
* @return map with label list as key and double parameter list used for conversion function as value
102+
*/
103+
public static Map<List<String>, List<Double>> getMetricsUsedToUpdate(Set<String> parsedQuery, String updatedMetricName) {
104+
List<String> metricsNamesUsedToUpdate = getMetricsNamesUsedToUpdate(updatedMetricName);
105+
Map<String, Map<List<String>, Double>> mapWithValues = new HashMap<>();
106+
forEachRemaining(CollectorRegistry.defaultRegistry.filteredMetricFamilySamples(parsedQuery), metricFamilySamples -> {
107+
Map<List<String>, Double> arrayListDoubleMap =
108+
metricFamilySamples.samples.stream()
109+
.filter(sample -> metricsNamesUsedToUpdate.contains(sample.name))
110+
.collect(toMap(sample -> sample.labelValues, sample -> sample.value, (a, b) -> b));
111+
if (!arrayListDoubleMap.isEmpty()) mapWithValues.put(metricFamilySamples.name, arrayListDoubleMap);
112+
});
113+
114+
List<Map<List<String>, Double>> listWithValues = metricsNamesUsedToUpdate.stream()
115+
.map(mapWithValues::get)
116+
.collect(Collectors.toList());
117+
Map<List<String>, List<Double>> params = new HashMap<>();
118+
listWithValues.forEach(l -> l.forEach((k, v) -> {
119+
if (!params.containsKey(k)) params.put(k, new ArrayList<>(Collections.singletonList(v)));
120+
else {
121+
List<Double> paramList = params.get(k);
122+
paramList.add(v);
123+
params.replace(k, paramList);
124+
}
125+
}
126+
));
127+
return params;
128+
}
129+
130+
private static <T> void forEachRemaining(Enumeration<T> e, Consumer<? super T> c) {
131+
while (e.hasMoreElements()) c.accept(e.nextElement());
132+
}
133+
}

0 commit comments

Comments
 (0)