Skip to content

Commit 33b6077

Browse files
authored
Refactor cluster settings collector (#656)
* Refactor cluster settings collector Refactor cluster settings collector into new Collector interface. Update tests to use prometheus/testutil to validate the output instead of checking the parsed elasticsearch response. Signed-off-by: Joe Adams <github@joeadams.io> * Fix rebase Signed-off-by: Joe Adams <github@joeadams.io> --------- Signed-off-by: Joe Adams <github@joeadams.io>
1 parent 687e75e commit 33b6077

File tree

8 files changed

+186
-223
lines changed

8 files changed

+186
-223
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## Unreleased
2+
* [BREAKING] Rename --es.cluster_settings to --collector.clustersettings
3+
14
## 1.5.0 / 2022-07-28
25

36
* [FEATURE] Add metrics collection for data stream statistics #592

collector/cluster_settings.go

Lines changed: 99 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -14,170 +14,151 @@
1414
package collector
1515

1616
import (
17+
"context"
1718
"encoding/json"
18-
"fmt"
1919
"io/ioutil"
2020
"net/http"
2121
"net/url"
22-
"path"
2322
"strconv"
2423

2524
"github.com/go-kit/log"
26-
"github.com/go-kit/log/level"
2725
"github.com/imdario/mergo"
2826
"github.com/prometheus/client_golang/prometheus"
2927
)
3028

31-
// ClusterSettings information struct
32-
type ClusterSettings struct {
33-
logger log.Logger
34-
client *http.Client
35-
url *url.URL
29+
func init() {
30+
registerCollector("clustersettings", defaultDisabled, NewClusterSettings)
31+
}
3632

37-
up prometheus.Gauge
38-
shardAllocationEnabled prometheus.Gauge
39-
maxShardsPerNode prometheus.Gauge
40-
totalScrapes, jsonParseFailures prometheus.Counter
33+
type ClusterSettingsCollector struct {
34+
logger log.Logger
35+
u *url.URL
36+
hc *http.Client
4137
}
4238

43-
// NewClusterSettings defines Cluster Settings Prometheus metrics
44-
func NewClusterSettings(logger log.Logger, client *http.Client, url *url.URL) *ClusterSettings {
45-
return &ClusterSettings{
39+
func NewClusterSettings(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) {
40+
return &ClusterSettingsCollector{
4641
logger: logger,
47-
client: client,
48-
url: url,
49-
50-
up: prometheus.NewGauge(prometheus.GaugeOpts{
51-
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "up"),
52-
Help: "Was the last scrape of the Elasticsearch cluster settings endpoint successful.",
53-
}),
54-
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
55-
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "total_scrapes"),
56-
Help: "Current total Elasticsearch cluster settings scrapes.",
57-
}),
58-
shardAllocationEnabled: prometheus.NewGauge(prometheus.GaugeOpts{
59-
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"),
60-
Help: "Current mode of cluster wide shard routing allocation settings.",
61-
}),
62-
maxShardsPerNode: prometheus.NewGauge(prometheus.GaugeOpts{
63-
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"),
64-
Help: "Current maximum number of shards per node setting.",
65-
}),
66-
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
67-
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "json_parse_failures"),
68-
Help: "Number of errors while parsing JSON.",
69-
}),
70-
}
42+
u: u,
43+
hc: hc,
44+
}, nil
7145
}
7246

73-
// Describe add Snapshots metrics descriptions
74-
func (cs *ClusterSettings) Describe(ch chan<- *prometheus.Desc) {
75-
ch <- cs.up.Desc()
76-
ch <- cs.totalScrapes.Desc()
77-
ch <- cs.shardAllocationEnabled.Desc()
78-
ch <- cs.maxShardsPerNode.Desc()
79-
ch <- cs.jsonParseFailures.Desc()
47+
var clusterSettingsDesc = map[string]*prometheus.Desc{
48+
"shardAllocationEnabled": prometheus.NewDesc(
49+
prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"),
50+
"Current mode of cluster wide shard routing allocation settings.",
51+
nil, nil,
52+
),
53+
54+
"maxShardsPerNode": prometheus.NewDesc(
55+
prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"),
56+
"Current maximum number of shards per node setting.",
57+
nil, nil,
58+
),
8059
}
8160

82-
func (cs *ClusterSettings) getAndParseURL(u *url.URL, data interface{}) error {
83-
res, err := cs.client.Get(u.String())
84-
if err != nil {
85-
return fmt.Errorf("failed to get from %s://%s:%s%s: %s",
86-
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
87-
}
88-
89-
defer func() {
90-
err = res.Body.Close()
91-
if err != nil {
92-
_ = level.Warn(cs.logger).Log(
93-
"msg", "failed to close http.Client",
94-
"err", err,
95-
)
96-
}
97-
}()
61+
// clusterSettingsResponse is a representation of a Elasticsearch Cluster Settings
62+
type clusterSettingsResponse struct {
63+
Defaults clusterSettingsSection `json:"defaults"`
64+
Persistent clusterSettingsSection `json:"persistent"`
65+
Transient clusterSettingsSection `json:"transient"`
66+
}
9867

99-
if res.StatusCode != http.StatusOK {
100-
return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
101-
}
68+
// clusterSettingsSection is a representation of a Elasticsearch Cluster Settings
69+
type clusterSettingsSection struct {
70+
Cluster clusterSettingsCluster `json:"cluster"`
71+
}
10272

103-
bts, err := ioutil.ReadAll(res.Body)
104-
if err != nil {
105-
cs.jsonParseFailures.Inc()
106-
return err
107-
}
73+
// clusterSettingsCluster is a representation of a Elasticsearch clusterSettingsCluster Settings
74+
type clusterSettingsCluster struct {
75+
Routing clusterSettingsRouting `json:"routing"`
76+
// This can be either a JSON object (which does not contain the value we are interested in) or a string
77+
MaxShardsPerNode interface{} `json:"max_shards_per_node"`
78+
}
10879

109-
if err := json.Unmarshal(bts, data); err != nil {
110-
cs.jsonParseFailures.Inc()
111-
return err
112-
}
80+
// clusterSettingsRouting is a representation of a Elasticsearch Cluster shard routing configuration
81+
type clusterSettingsRouting struct {
82+
Allocation clusterSettingsAllocation `json:"allocation"`
83+
}
11384

114-
return nil
85+
// clusterSettingsAllocation is a representation of a Elasticsearch Cluster shard routing allocation settings
86+
type clusterSettingsAllocation struct {
87+
Enabled string `json:"enable"`
11588
}
11689

117-
func (cs *ClusterSettings) fetchAndDecodeClusterSettingsStats() (ClusterSettingsResponse, error) {
90+
// ClusterSettings information struct
91+
type ClusterSettings struct {
92+
logger log.Logger
93+
client *http.Client
94+
url *url.URL
11895

119-
u := *cs.url
120-
u.Path = path.Join(u.Path, "/_cluster/settings")
96+
maxShardsPerNode prometheus.Gauge
97+
}
98+
99+
func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
100+
u := c.u.ResolveReference(&url.URL{Path: "_cluster/settings"})
121101
q := u.Query()
122102
q.Set("include_defaults", "true")
123103
u.RawQuery = q.Encode()
124-
u.RawPath = q.Encode()
125-
var csfr ClusterSettingsFullResponse
126-
var csr ClusterSettingsResponse
127-
err := cs.getAndParseURL(&u, &csfr)
104+
105+
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
106+
if err != nil {
107+
return err
108+
}
109+
110+
resp, err := c.hc.Do(req)
128111
if err != nil {
129-
return csr, err
112+
return err
130113
}
131-
err = mergo.Merge(&csr, csfr.Defaults, mergo.WithOverride)
114+
defer resp.Body.Close()
115+
b, err := ioutil.ReadAll(resp.Body)
132116
if err != nil {
133-
return csr, err
117+
return err
134118
}
135-
err = mergo.Merge(&csr, csfr.Persistent, mergo.WithOverride)
119+
var data clusterSettingsResponse
120+
err = json.Unmarshal(b, &data)
136121
if err != nil {
137-
return csr, err
122+
return err
138123
}
139-
err = mergo.Merge(&csr, csfr.Transient, mergo.WithOverride)
140124

141-
return csr, err
142-
}
125+
// Merge all settings into one struct
126+
merged := data.Defaults
143127

144-
// Collect gets cluster settings metric values
145-
func (cs *ClusterSettings) Collect(ch chan<- prometheus.Metric) {
146-
147-
cs.totalScrapes.Inc()
148-
defer func() {
149-
ch <- cs.up
150-
ch <- cs.totalScrapes
151-
ch <- cs.jsonParseFailures
152-
ch <- cs.shardAllocationEnabled
153-
ch <- cs.maxShardsPerNode
154-
}()
155-
156-
csr, err := cs.fetchAndDecodeClusterSettingsStats()
128+
err = mergo.Merge(&merged, data.Persistent, mergo.WithOverride)
129+
if err != nil {
130+
return err
131+
}
132+
err = mergo.Merge(&merged, data.Transient, mergo.WithOverride)
157133
if err != nil {
158-
cs.shardAllocationEnabled.Set(0)
159-
cs.up.Set(0)
160-
_ = level.Warn(cs.logger).Log(
161-
"msg", "failed to fetch and decode cluster settings stats",
162-
"err", err,
163-
)
164-
return
134+
return err
135+
}
136+
137+
// Max shards per node
138+
if maxShardsPerNodeString, ok := merged.Cluster.MaxShardsPerNode.(string); ok {
139+
maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64)
140+
if err == nil {
141+
ch <- prometheus.MustNewConstMetric(
142+
clusterSettingsDesc["maxShardsPerNode"],
143+
prometheus.GaugeValue,
144+
float64(maxShardsPerNode),
145+
)
146+
}
165147
}
166-
cs.up.Set(1)
167148

149+
// Shard allocation enabled
168150
shardAllocationMap := map[string]int{
169151
"all": 0,
170152
"primaries": 1,
171153
"new_primaries": 2,
172154
"none": 3,
173155
}
174156

175-
cs.shardAllocationEnabled.Set(float64(shardAllocationMap[csr.Cluster.Routing.Allocation.Enabled]))
157+
ch <- prometheus.MustNewConstMetric(
158+
clusterSettingsDesc["shardAllocationEnabled"],
159+
prometheus.GaugeValue,
160+
float64(shardAllocationMap[merged.Cluster.Routing.Allocation.Enabled]),
161+
)
176162

177-
if maxShardsPerNodeString, ok := csr.Cluster.MaxShardsPerNode.(string); ok {
178-
maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64)
179-
if err == nil {
180-
cs.maxShardsPerNode.Set(float64(maxShardsPerNode))
181-
}
182-
}
163+
return nil
183164
}

collector/cluster_settings_response.go

Lines changed: 0 additions & 43 deletions
This file was deleted.

0 commit comments

Comments
 (0)