Skip to content

Commit 8bd7304

Browse files
committed
Create a new cluster info retriever
This is an alternative implementation of the previous retriever in pkg/clusterinfo. The previous implementation required registering collectors and receiving updates over a channel. This InfoProvider instead provides a public method to get the cluster info with a cache to reduce network calls. The Collector.Update signature was also updated to take an UpdateContext. This UpdateContext can provide with an easy way to extend the context available during the Update calls on Collectors. Signed-off-by: Joe Adams <github@joeadams.io>
1 parent 00dfe05 commit 8bd7304

14 files changed

+308
-13
lines changed

cluster/provider.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package cluster
15+
16+
import (
17+
"context"
18+
"encoding/json"
19+
"fmt"
20+
"io"
21+
"log/slog"
22+
"net/http"
23+
"net/url"
24+
"path"
25+
"sync"
26+
"time"
27+
)
28+
29+
type Info struct {
30+
ClusterName string `json:"cluster_name"`
31+
}
32+
33+
type InfoProvider struct {
34+
logger *slog.Logger
35+
client *http.Client
36+
url *url.URL
37+
interval time.Duration
38+
lastClusterInfo Info
39+
lastError error
40+
cachedAt time.Time // Time when the last cluster info was fetched
41+
mu sync.RWMutex // Protects lastClusterInfo, lastError, and cachedAt
42+
}
43+
44+
// New creates a new Retriever.
45+
func NewInfoProvider(logger *slog.Logger, client *http.Client, u *url.URL, interval time.Duration) *InfoProvider {
46+
return &InfoProvider{
47+
logger: logger,
48+
client: client,
49+
url: u,
50+
interval: interval,
51+
}
52+
}
53+
54+
func (i *InfoProvider) GetInfo(ctx context.Context) (Info, error) {
55+
i.mu.RLock()
56+
info := i.lastClusterInfo
57+
err := i.lastError
58+
cachedAt := i.cachedAt
59+
60+
i.mu.RUnlock()
61+
62+
// If the cached info is recent enough, return it.
63+
if !cachedAt.IsZero() && time.Since(cachedAt) < i.interval {
64+
65+
if err != nil {
66+
return Info{}, err
67+
}
68+
69+
if info.ClusterName != "" {
70+
return info, nil
71+
}
72+
}
73+
74+
i.mu.Lock()
75+
defer i.mu.Unlock()
76+
77+
// If we reach here, we need to fetch the cluster info. The cache is either empty or stale.
78+
u := *i.url
79+
u.Path = path.Join(u.Path, "/")
80+
81+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
82+
if err != nil {
83+
return Info{}, err
84+
}
85+
86+
resp, err := i.client.Do(req)
87+
if err != nil {
88+
i.logger.Error("failed to fetch cluster info", "err", err)
89+
return Info{}, err
90+
}
91+
defer resp.Body.Close()
92+
93+
if resp.StatusCode != http.StatusOK {
94+
err = fmt.Errorf("unexpected status code: %d", resp.StatusCode)
95+
i.lastError = err
96+
return Info{}, err
97+
}
98+
99+
var infoResponse Info
100+
body, err := io.ReadAll(resp.Body)
101+
if err != nil {
102+
i.lastError = err
103+
return Info{}, err
104+
}
105+
106+
if err := json.Unmarshal(body, &infoResponse); err != nil {
107+
i.lastError = err
108+
return Info{}, fmt.Errorf("failed to unmarshal cluster info: %w", err)
109+
}
110+
111+
info = Info{ClusterName: infoResponse.ClusterName}
112+
i.lastClusterInfo = info
113+
i.lastError = nil
114+
i.cachedAt = time.Now()
115+
116+
return info, nil
117+
}

cluster/provider_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package cluster
15+
16+
import (
17+
"context"
18+
"net/http"
19+
"net/http/httptest"
20+
"net/url"
21+
"os"
22+
"reflect"
23+
"testing"
24+
"time"
25+
26+
"github.com/prometheus/common/promslog"
27+
)
28+
29+
func TestInfoProvider_GetInfo(t *testing.T) {
30+
timesURLCalled := 0
31+
expectedInfo := Info{
32+
ClusterName: "test-cluster-1",
33+
}
34+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
35+
if r.URL.Path != "/" {
36+
http.NotFound(w, r)
37+
return
38+
}
39+
w.Header().Set("Content-Type", "application/json")
40+
timesURLCalled++
41+
_, _ = w.Write([]byte(`{
42+
"name": "test-node-abcd",
43+
"cluster_name": "test-cluster-1",
44+
"cluster_uuid": "r1bT9sBrR7S9-CamE41Qqg",
45+
"version": {
46+
"number": "5.6.9",
47+
"build_hash": "877a590",
48+
"build_date": "2018-04-12T16:25:14.838Z",
49+
"build_snapshot": false,
50+
"lucene_version": "6.6.1"
51+
}
52+
}`))
53+
}))
54+
tsURL, err := url.Parse(ts.URL)
55+
if err != nil {
56+
t.Fatalf("failed to parse test server URL: %v", err)
57+
}
58+
defer ts.Close()
59+
60+
i := NewInfoProvider(promslog.New(&promslog.Config{Writer: os.Stdout}), http.DefaultClient, tsURL, time.Second)
61+
62+
if timesURLCalled != 0 {
63+
t.Errorf("expected no initial URL calls, got %d", timesURLCalled)
64+
}
65+
66+
got, err := i.GetInfo(context.Background())
67+
if err != nil {
68+
t.Errorf("InfoProvider.GetInfo() error = %v, wantErr %v", err, false)
69+
return
70+
}
71+
72+
if !reflect.DeepEqual(got, expectedInfo) {
73+
t.Errorf("InfoProvider.GetInfo() = %v, want %v", got, expectedInfo)
74+
}
75+
76+
if timesURLCalled != 1 {
77+
t.Errorf("expected URL to be called once, got %d", timesURLCalled)
78+
}
79+
80+
// Call again to ensure cached value is returned
81+
got, err = i.GetInfo(context.Background())
82+
if err != nil {
83+
t.Errorf("InfoProvider.GetInfo() error on second call = %v, wantErr %v", err, false)
84+
return
85+
}
86+
if !reflect.DeepEqual(got, expectedInfo) {
87+
t.Errorf("InfoProvider.GetInfo() on second call = %v, want %v", got, expectedInfo)
88+
}
89+
if timesURLCalled != 1 {
90+
t.Errorf("expected URL to be called only once, got %d", timesURLCalled)
91+
}
92+
93+
// Call again after delay to ensure we refresh the cache
94+
time.Sleep(2 * time.Second)
95+
got, err = i.GetInfo(context.Background())
96+
if err != nil {
97+
t.Errorf("InfoProvider.GetInfo() error on second call = %v, wantErr %v", err, false)
98+
return
99+
}
100+
if !reflect.DeepEqual(got, expectedInfo) {
101+
t.Errorf("InfoProvider.GetInfo() on second call = %v, want %v", got, expectedInfo)
102+
}
103+
if timesURLCalled != 2 {
104+
t.Errorf("expected URL to be called only once, got %d", timesURLCalled)
105+
}
106+
107+
}

collector/cluster_info.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ type VersionInfo struct {
7777
LuceneVersion semver.Version `json:"lucene_version"`
7878
}
7979

80-
func (c *ClusterInfoCollector) Update(_ context.Context, ch chan<- prometheus.Metric) error {
80+
func (c *ClusterInfoCollector) Update(_ context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
8181
resp, err := c.hc.Get(c.u.String())
8282
if err != nil {
8383
return err

collector/cluster_settings.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ type clusterSettingsWatermark struct {
145145
Low string `json:"low"`
146146
}
147147

148-
func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
148+
func (c *ClusterSettingsCollector) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
149149
u := c.u.ResolveReference(&url.URL{Path: "_cluster/settings"})
150150
q := u.Query()
151151
q.Set("include_defaults", "true")

collector/collector.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"github.com/alecthomas/kingpin/v2"
28+
"github.com/prometheus-community/elasticsearch_exporter/cluster"
2829
"github.com/prometheus/client_golang/prometheus"
2930
)
3031

@@ -64,7 +65,7 @@ var (
6465
// Collector is the interface a collector has to implement.
6566
type Collector interface {
6667
// Get new metrics and expose them via prometheus registry.
67-
Update(context.Context, chan<- prometheus.Metric) error
68+
Update(context.Context, UpdateContext, chan<- prometheus.Metric) error
6869
}
6970

7071
func registerCollector(name string, isDefaultEnabled bool, createFunc factoryFunc) {
@@ -92,6 +93,7 @@ type ElasticsearchCollector struct {
9293
logger *slog.Logger
9394
esURL *url.URL
9495
httpClient *http.Client
96+
cluserInfo *cluster.InfoProvider
9597
}
9698

9799
type Option func(*ElasticsearchCollector) error
@@ -106,6 +108,10 @@ func NewElasticsearchCollector(logger *slog.Logger, filters []string, options ..
106108
}
107109
}
108110

111+
if e.cluserInfo == nil {
112+
return nil, fmt.Errorf("cluster info provider is not set")
113+
}
114+
109115
f := make(map[string]bool)
110116
for _, filter := range filters {
111117
enabled, exist := collectorState[filter]
@@ -155,6 +161,13 @@ func WithHTTPClient(hc *http.Client) Option {
155161
}
156162
}
157163

164+
func WithClusterInfoProvider(cl *cluster.InfoProvider) Option {
165+
return func(e *ElasticsearchCollector) error {
166+
e.cluserInfo = cl
167+
return nil
168+
}
169+
}
170+
158171
// Describe implements the prometheus.Collector interface.
159172
func (e ElasticsearchCollector) Describe(ch chan<- *prometheus.Desc) {
160173
ch <- scrapeDurationDesc
@@ -163,21 +176,22 @@ func (e ElasticsearchCollector) Describe(ch chan<- *prometheus.Desc) {
163176

164177
// Collect implements the prometheus.Collector interface.
165178
func (e ElasticsearchCollector) Collect(ch chan<- prometheus.Metric) {
179+
uc := NewDefaultUpdateContext(e.cluserInfo)
166180
wg := sync.WaitGroup{}
167181
ctx := context.TODO()
168182
wg.Add(len(e.Collectors))
169183
for name, c := range e.Collectors {
170184
go func(name string, c Collector) {
171-
execute(ctx, name, c, ch, e.logger)
185+
execute(ctx, name, c, ch, e.logger, uc)
172186
wg.Done()
173187
}(name, c)
174188
}
175189
wg.Wait()
176190
}
177191

178-
func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus.Metric, logger *slog.Logger) {
192+
func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus.Metric, logger *slog.Logger, uc UpdateContext) {
179193
begin := time.Now()
180-
err := c.Update(ctx, ch)
194+
err := c.Update(ctx, uc, ch)
181195
duration := time.Since(begin)
182196
var success float64
183197

collector/collector_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package collector
1616
import (
1717
"context"
1818

19+
"github.com/prometheus-community/elasticsearch_exporter/cluster"
1920
"github.com/prometheus/client_golang/prometheus"
2021
)
2122

@@ -32,5 +33,11 @@ func (w wrapCollector) Describe(_ chan<- *prometheus.Desc) {
3233
}
3334

3435
func (w wrapCollector) Collect(ch chan<- prometheus.Metric) {
35-
w.c.Update(context.Background(), ch)
36+
w.c.Update(context.Background(), &mockUpdateContext{}, ch)
37+
}
38+
39+
type mockUpdateContext struct{}
40+
41+
func (m *mockUpdateContext) GetClusterInfo(_ context.Context) (cluster.Info, error) {
42+
return cluster.Info{}, nil
3643
}

collector/data_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ type DataStreamStatsDataStream struct {
8383
MaximumTimestamp int64 `json:"maximum_timestamp"`
8484
}
8585

86-
func (ds *DataStream) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
86+
func (ds *DataStream) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
8787
var dsr DataStreamStatsResponse
8888

8989
u := ds.u.ResolveReference(&url.URL{Path: "/_data_stream/*/_stats"})

collector/health_report.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ func statusValue(value string, color string) float64 {
300300
return 0
301301
}
302302

303-
func (c *HealthReport) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
303+
func (c *HealthReport) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
304304
u := c.url.ResolveReference(&url.URL{Path: "/_health_report"})
305305
var healthReportResponse HealthReportResponse
306306

collector/ilm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type IlmStatusResponse struct {
7474
OperationMode string `json:"operation_mode"`
7575
}
7676

77-
func (i *ILM) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
77+
func (i *ILM) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
7878
var ir IlmResponse
7979

8080
indexURL := i.u.ResolveReference(&url.URL{Path: "/_all/_ilm/explain"})

collector/slm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ type SLMStatusResponse struct {
143143
OperationMode string `json:"operation_mode"`
144144
}
145145

146-
func (s *SLM) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
146+
func (s *SLM) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
147147
u := s.u.ResolveReference(&url.URL{Path: "/_slm/status"})
148148
var slmStatusResp SLMStatusResponse
149149

0 commit comments

Comments
 (0)