diff --git a/collector/cluster_info.go b/collector/cluster_info.go index 5dafb9b9..8a6a3a42 100644 --- a/collector/cluster_info.go +++ b/collector/cluster_info.go @@ -23,6 +23,8 @@ import ( "github.com/blang/semver/v4" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) func init() { @@ -35,7 +37,7 @@ type ClusterInfoCollector struct { hc *http.Client } -func NewClusterInfo(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, error) { +func NewClusterInfo(logger *slog.Logger, u *url.URL, hc *http.Client, ci *clusterinfo.Retriever) (Collector, error) { return &ClusterInfoCollector{ logger: logger, u: u, diff --git a/collector/cluster_info_test.go b/collector/cluster_info_test.go index d7d12bc2..77ee59bb 100644 --- a/collector/cluster_info_test.go +++ b/collector/cluster_info_test.go @@ -21,9 +21,12 @@ import ( "os" "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/promslog" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) func TestClusterInfo(t *testing.T) { @@ -80,7 +83,9 @@ func TestClusterInfo(t *testing.T) { t.Fatal(err) } - c, err := NewClusterInfo(promslog.NewNopLogger(), u, http.DefaultClient) + logger := promslog.NewNopLogger() + ci := clusterinfo.New(logger, http.DefaultClient, u, time.Duration(300000000000)) + c, err := NewClusterInfo(logger, u, http.DefaultClient, ci) if err != nil { t.Fatal(err) } diff --git a/collector/cluster_settings.go b/collector/cluster_settings.go index 6e7cf174..fc33486e 100644 --- a/collector/cluster_settings.go +++ b/collector/cluster_settings.go @@ -26,6 +26,8 @@ import ( "github.com/imdario/mergo" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) func init() { @@ -38,7 +40,7 @@ type ClusterSettingsCollector struct { hc *http.Client } -func NewClusterSettings(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, error) { +func NewClusterSettings(logger *slog.Logger, u *url.URL, hc *http.Client, ci *clusterinfo.Retriever) (Collector, error) { return &ClusterSettingsCollector{ logger: logger, u: u, diff --git a/collector/cluster_settings_test.go b/collector/cluster_settings_test.go index 52c41a1e..eb95621a 100644 --- a/collector/cluster_settings_test.go +++ b/collector/cluster_settings_test.go @@ -21,9 +21,12 @@ import ( "os" "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/promslog" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) func TestClusterSettingsStats(t *testing.T) { @@ -136,7 +139,9 @@ elasticsearch_clustersettings_allocation_watermark_low_bytes 5.24288e+07 t.Fatal(err) } - c, err := NewClusterSettings(promslog.NewNopLogger(), u, http.DefaultClient) + logger := promslog.NewNopLogger() + ci := clusterinfo.New(logger, http.DefaultClient, u, time.Duration(300000000000)) + c, err := NewClusterSettings(logger, u, http.DefaultClient, ci) if err != nil { t.Fatal(err) } diff --git a/collector/collector.go b/collector/collector.go index 38de96ce..2b044757 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -26,6 +26,8 @@ import ( "github.com/alecthomas/kingpin/v2" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) const ( @@ -36,7 +38,7 @@ const ( defaultDisabled = false ) -type factoryFunc func(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, error) +type factoryFunc func(logger *slog.Logger, u *url.URL, hc *http.Client, ci *clusterinfo.Retriever) (Collector, error) var ( factories = make(map[string]factoryFunc) @@ -97,7 +99,7 @@ type ElasticsearchCollector struct { type Option func(*ElasticsearchCollector) error // NewElasticsearchCollector creates a new ElasticsearchCollector -func NewElasticsearchCollector(logger *slog.Logger, filters []string, options ...Option) (*ElasticsearchCollector, error) { +func NewElasticsearchCollector(logger *slog.Logger, filters []string, clusterInfoRetriever *clusterinfo.Retriever, options ...Option) (*ElasticsearchCollector, error) { e := &ElasticsearchCollector{logger: logger} // Apply options to customize the collector for _, o := range options { @@ -127,7 +129,7 @@ func NewElasticsearchCollector(logger *slog.Logger, filters []string, options .. if collector, ok := initiatedCollectors[key]; ok { collectors[key] = collector } else { - collector, err := factories[key](logger.With("collector", key), e.esURL, e.httpClient) + collector, err := factories[key](logger.With("collector", key), e.esURL, e.httpClient, clusterInfoRetriever) if err != nil { return nil, err } diff --git a/collector/data_stream.go b/collector/data_stream.go index 8672c5a1..49981383 100644 --- a/collector/data_stream.go +++ b/collector/data_stream.go @@ -22,6 +22,8 @@ import ( "net/url" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) var ( @@ -51,7 +53,7 @@ type DataStream struct { } // NewDataStream defines DataStream Prometheus metrics -func NewDataStream(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, error) { +func NewDataStream(logger *slog.Logger, u *url.URL, hc *http.Client, ci *clusterinfo.Retriever) (Collector, error) { return &DataStream{ logger: logger, hc: hc, diff --git a/collector/data_stream_test.go b/collector/data_stream_test.go index 71324d5b..3ac9a7c3 100644 --- a/collector/data_stream_test.go +++ b/collector/data_stream_test.go @@ -21,9 +21,12 @@ import ( "os" "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/promslog" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) func TestDataStream(t *testing.T) { @@ -64,7 +67,9 @@ func TestDataStream(t *testing.T) { t.Fatal(err) } - c, err := NewDataStream(promslog.NewNopLogger(), u, http.DefaultClient) + logger := promslog.NewNopLogger() + ci := clusterinfo.New(logger, http.DefaultClient, u, time.Duration(300000000000)) + c, err := NewDataStream(logger, u, http.DefaultClient, ci) if err != nil { t.Fatal(err) } diff --git a/collector/health_report.go b/collector/health_report.go index 4933d98c..5e5dd548 100644 --- a/collector/health_report.go +++ b/collector/health_report.go @@ -21,6 +21,8 @@ import ( "net/url" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) var ( @@ -166,7 +168,7 @@ type HealthReport struct { url *url.URL } -func NewHealthReport(logger *slog.Logger, url *url.URL, client *http.Client) (Collector, error) { +func NewHealthReport(logger *slog.Logger, url *url.URL, client *http.Client, ci *clusterinfo.Retriever) (Collector, error) { return &HealthReport{ logger: logger, client: client, diff --git a/collector/health_report_test.go b/collector/health_report_test.go index 012afbfd..9c034a7c 100644 --- a/collector/health_report_test.go +++ b/collector/health_report_test.go @@ -21,9 +21,12 @@ import ( "os" "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/promslog" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) func TestHealthReport(t *testing.T) { @@ -156,7 +159,9 @@ func TestHealthReport(t *testing.T) { t.Fatal(err) } - c, err := NewHealthReport(promslog.NewNopLogger(), u, http.DefaultClient) + logger := promslog.NewNopLogger() + ci := clusterinfo.New(logger, http.DefaultClient, u, time.Duration(300000000000)) + c, err := NewHealthReport(logger, u, http.DefaultClient, ci) if err != nil { t.Fatal(err) } diff --git a/collector/ilm.go b/collector/ilm.go index 7891e5d0..55923dca 100644 --- a/collector/ilm.go +++ b/collector/ilm.go @@ -22,6 +22,8 @@ import ( "net/url" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) var ( @@ -49,7 +51,7 @@ type ILM struct { u *url.URL } -func NewILM(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, error) { +func NewILM(logger *slog.Logger, u *url.URL, hc *http.Client, ci *clusterinfo.Retriever) (Collector, error) { return &ILM{ logger: logger, hc: hc, diff --git a/collector/ilm_test.go b/collector/ilm_test.go index 11292aff..e4e3a7c8 100644 --- a/collector/ilm_test.go +++ b/collector/ilm_test.go @@ -22,9 +22,12 @@ import ( "path" "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/promslog" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) func TestILM(t *testing.T) { @@ -81,7 +84,9 @@ func TestILM(t *testing.T) { t.Fatal(err) } - c, err := NewILM(promslog.NewNopLogger(), u, http.DefaultClient) + logger := promslog.NewNopLogger() + ci := clusterinfo.New(logger, http.DefaultClient, u, time.Duration(300000000000)) + c, err := NewILM(logger, u, http.DefaultClient, ci) if err != nil { t.Fatal(err) } diff --git a/collector/indices.go b/collector/indices.go index fcb3b771..0fa13981 100644 --- a/collector/indices.go +++ b/collector/indices.go @@ -461,17 +461,6 @@ func NewIndices(logger *slog.Logger, client *http.Client, url *url.URL, shards b }, } - // start go routine to fetch clusterinfo updates and save them to lastClusterinfo - go func() { - logger.Debug("starting cluster info receive loop") - for ci := range indices.clusterInfoCh { - if ci != nil { - logger.Debug("received cluster info update", "cluster", ci.ClusterName) - indices.lastClusterInfo = ci - } - } - logger.Debug("exiting cluster info receive loop") - }() return indices } @@ -481,6 +470,10 @@ func (i *Indices) ClusterLabelUpdates() *chan *clusterinfo.Response { return &i.clusterInfoCh } +func (i *Indices) SetClusterInfo(r *clusterinfo.Response) { + i.lastClusterInfo = r +} + // String implements the stringer interface. It is part of the clusterinfo.consumer interface func (i *Indices) String() string { return namespace + "indices" diff --git a/collector/shards.go b/collector/shards.go index d4483e38..32a8881b 100644 --- a/collector/shards.go +++ b/collector/shards.go @@ -52,6 +52,10 @@ func (s *Shards) ClusterLabelUpdates() *chan *clusterinfo.Response { return &s.clusterInfoCh } +func (s *Shards) SetClusterInfo(r *clusterinfo.Response) { + s.lastClusterInfo = r +} + // String implements the stringer interface. It is part of the clusterinfo.consumer interface func (s *Shards) String() string { return namespace + "shards" @@ -111,18 +115,6 @@ func NewShards(logger *slog.Logger, client *http.Client, url *url.URL) *Shards { }), } - // start go routine to fetch clusterinfo updates and save them to lastClusterinfo - go func() { - logger.Debug("starting cluster info receive loop") - for ci := range shards.clusterInfoCh { - if ci != nil { - logger.Debug("received cluster info update", "cluster", ci.ClusterName) - shards.lastClusterInfo = ci - } - } - logger.Debug("exiting cluster info receive loop") - }() - return shards } diff --git a/collector/slm.go b/collector/slm.go index 3ac51f22..9c87a416 100644 --- a/collector/slm.go +++ b/collector/slm.go @@ -21,6 +21,8 @@ import ( "net/url" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) var statuses = []string{"RUNNING", "STOPPING", "STOPPED"} @@ -29,69 +31,81 @@ var ( slmRetentionRunsTotal = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "retention_runs_total"), "Total retention runs", - nil, nil, + []string{"cluster"}, nil, ) slmRetentionFailedTotal = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "retention_failed_total"), "Total failed retention runs", - nil, nil, + []string{"cluster"}, nil, ) slmRetentionTimedOutTotal = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "retention_timed_out_total"), "Total timed out retention runs", - nil, nil, + []string{"cluster"}, nil, ) slmRetentionDeletionTimeSeconds = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "retention_deletion_time_seconds"), "Retention run deletion time", - nil, nil, + []string{"cluster"}, nil, ) slmTotalSnapshotsTaken = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "total_snapshots_taken_total"), "Total snapshots taken", - nil, nil, + []string{"cluster"}, nil, ) slmTotalSnapshotsFailed = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "total_snapshots_failed_total"), "Total snapshots failed", - nil, nil, + []string{"cluster"}, nil, ) slmTotalSnapshotsDeleted = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "total_snapshots_deleted_total"), "Total snapshots deleted", - nil, nil, + []string{"cluster"}, nil, ) slmTotalSnapshotsDeleteFailed = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "total_snapshot_deletion_failures_total"), "Total snapshot deletion failures", - nil, nil, + []string{"cluster"}, nil, ) slmOperationMode = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "operation_mode"), "Operating status of SLM", - []string{"operation_mode"}, nil, + []string{"cluster", "operation_mode"}, nil, ) slmSnapshotsTaken = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "snapshots_taken_total"), "Total snapshots taken", - []string{"policy"}, nil, + []string{ + "policy", + "cluster", + }, nil, ) slmSnapshotsFailed = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "snapshots_failed_total"), "Total snapshots failed", - []string{"policy"}, nil, + []string{ + "policy", + "cluster", + }, nil, ) slmSnapshotsDeleted = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "snapshots_deleted_total"), "Total snapshots deleted", - []string{"policy"}, nil, + []string{ + "policy", + "cluster", + }, nil, ) slmSnapshotsDeletionFailure = prometheus.NewDesc( prometheus.BuildFQName(namespace, "slm_stats", "snapshot_deletion_failures_total"), "Total snapshot deletion failures", - []string{"policy"}, nil, + []string{ + "policy", + "cluster", + }, nil, ) ) @@ -101,18 +115,59 @@ func init() { // SLM information struct type SLM struct { - logger *slog.Logger - hc *http.Client - u *url.URL + logger *slog.Logger + hc *http.Client + u *url.URL + clusterInfoCh chan *clusterinfo.Response + lastClusterInfo *clusterinfo.Response } // NewSLM defines SLM Prometheus metrics -func NewSLM(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, error) { - return &SLM{ - logger: logger, - hc: hc, - u: u, - }, nil +func NewSLM(logger *slog.Logger, u *url.URL, hc *http.Client, ci *clusterinfo.Retriever) (Collector, error) { + slm := &SLM{ + logger: logger, + hc: hc, + u: u, + clusterInfoCh: make(chan *clusterinfo.Response), + lastClusterInfo: &clusterinfo.Response{ + ClusterName: "unknown_cluster", + }, + } + + err := ci.RegisterConsumer(slm) + if err != nil { + return slm, err + } + + return slm, nil +} + +func (s *SLM) Describe(ch chan<- *prometheus.Desc) { + ch <- slmRetentionRunsTotal + ch <- slmRetentionFailedTotal + ch <- slmRetentionTimedOutTotal + ch <- slmRetentionDeletionTimeSeconds + ch <- slmTotalSnapshotsTaken + ch <- slmTotalSnapshotsFailed + ch <- slmTotalSnapshotsDeleted + ch <- slmTotalSnapshotsDeleteFailed + ch <- slmOperationMode + ch <- slmSnapshotsTaken + ch <- slmSnapshotsFailed + ch <- slmSnapshotsDeleted + ch <- slmSnapshotsDeletionFailure +} + +func (s *SLM) ClusterLabelUpdates() *chan *clusterinfo.Response { + return &s.clusterInfoCh +} + +func (s *SLM) SetClusterInfo(r *clusterinfo.Response) { + s.lastClusterInfo = r +} + +func (s *SLM) String() string { + return namespace + "slm" } // SLMStatsResponse is a representation of the SLM stats @@ -179,6 +234,7 @@ func (s *SLM) Update(ctx context.Context, ch chan<- prometheus.Metric) error { slmOperationMode, prometheus.GaugeValue, value, + s.lastClusterInfo.ClusterName, status, ) } @@ -187,43 +243,51 @@ func (s *SLM) Update(ctx context.Context, ch chan<- prometheus.Metric) error { slmRetentionRunsTotal, prometheus.CounterValue, float64(slmStatsResp.RetentionRuns), + s.lastClusterInfo.ClusterName, ) ch <- prometheus.MustNewConstMetric( slmRetentionFailedTotal, prometheus.CounterValue, float64(slmStatsResp.RetentionFailed), + s.lastClusterInfo.ClusterName, ) ch <- prometheus.MustNewConstMetric( slmRetentionTimedOutTotal, prometheus.CounterValue, float64(slmStatsResp.RetentionTimedOut), + s.lastClusterInfo.ClusterName, ) ch <- prometheus.MustNewConstMetric( slmRetentionDeletionTimeSeconds, prometheus.GaugeValue, float64(slmStatsResp.RetentionDeletionTimeMillis)/1000, + s.lastClusterInfo.ClusterName, ) ch <- prometheus.MustNewConstMetric( slmTotalSnapshotsTaken, prometheus.CounterValue, float64(slmStatsResp.TotalSnapshotsTaken), + s.lastClusterInfo.ClusterName, ) ch <- prometheus.MustNewConstMetric( slmTotalSnapshotsFailed, prometheus.CounterValue, float64(slmStatsResp.TotalSnapshotsFailed), + s.lastClusterInfo.ClusterName, ) ch <- prometheus.MustNewConstMetric( slmTotalSnapshotsDeleted, prometheus.CounterValue, float64(slmStatsResp.TotalSnapshotsDeleted), + s.lastClusterInfo.ClusterName, ) ch <- prometheus.MustNewConstMetric( slmTotalSnapshotsDeleteFailed, prometheus.CounterValue, float64(slmStatsResp.TotalSnapshotDeletionFailures), + s.lastClusterInfo.ClusterName, ) for _, policy := range slmStatsResp.PolicyStats { @@ -232,24 +296,28 @@ func (s *SLM) Update(ctx context.Context, ch chan<- prometheus.Metric) error { prometheus.CounterValue, float64(policy.SnapshotsTaken), policy.Policy, + s.lastClusterInfo.ClusterName, ) ch <- prometheus.MustNewConstMetric( slmSnapshotsFailed, prometheus.CounterValue, float64(policy.SnapshotsFailed), policy.Policy, + s.lastClusterInfo.ClusterName, ) ch <- prometheus.MustNewConstMetric( slmSnapshotsDeleted, prometheus.CounterValue, float64(policy.SnapshotsDeleted), policy.Policy, + s.lastClusterInfo.ClusterName, ) ch <- prometheus.MustNewConstMetric( slmSnapshotsDeletionFailure, prometheus.CounterValue, float64(policy.SnapshotDeletionFailures), policy.Policy, + s.lastClusterInfo.ClusterName, ) } diff --git a/collector/slm_test.go b/collector/slm_test.go index 9c4834a9..fe20cdce 100644 --- a/collector/slm_test.go +++ b/collector/slm_test.go @@ -22,9 +22,12 @@ import ( "path" "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/promslog" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) func TestSLM(t *testing.T) { @@ -45,45 +48,45 @@ func TestSLM(t *testing.T) { file: "7.15.0.json", want: `# HELP elasticsearch_slm_stats_operation_mode Operating status of SLM # TYPE elasticsearch_slm_stats_operation_mode gauge - elasticsearch_slm_stats_operation_mode{operation_mode="RUNNING"} 0 - elasticsearch_slm_stats_operation_mode{operation_mode="STOPPED"} 0 - elasticsearch_slm_stats_operation_mode{operation_mode="STOPPING"} 0 + elasticsearch_slm_stats_operation_mode{cluster="unknown_cluster",operation_mode="RUNNING"} 0 + elasticsearch_slm_stats_operation_mode{cluster="unknown_cluster",operation_mode="STOPPED"} 0 + elasticsearch_slm_stats_operation_mode{cluster="unknown_cluster",operation_mode="STOPPING"} 0 # HELP elasticsearch_slm_stats_retention_deletion_time_seconds Retention run deletion time # TYPE elasticsearch_slm_stats_retention_deletion_time_seconds gauge - elasticsearch_slm_stats_retention_deletion_time_seconds 72.491 + elasticsearch_slm_stats_retention_deletion_time_seconds{cluster="unknown_cluster"} 72.491 # HELP elasticsearch_slm_stats_retention_failed_total Total failed retention runs # TYPE elasticsearch_slm_stats_retention_failed_total counter - elasticsearch_slm_stats_retention_failed_total 0 + elasticsearch_slm_stats_retention_failed_total{cluster="unknown_cluster"} 0 # HELP elasticsearch_slm_stats_retention_runs_total Total retention runs # TYPE elasticsearch_slm_stats_retention_runs_total counter - elasticsearch_slm_stats_retention_runs_total 9 + elasticsearch_slm_stats_retention_runs_total{cluster="unknown_cluster"} 9 # HELP elasticsearch_slm_stats_retention_timed_out_total Total timed out retention runs # TYPE elasticsearch_slm_stats_retention_timed_out_total counter - elasticsearch_slm_stats_retention_timed_out_total 0 + elasticsearch_slm_stats_retention_timed_out_total{cluster="unknown_cluster"} 0 # HELP elasticsearch_slm_stats_snapshot_deletion_failures_total Total snapshot deletion failures # TYPE elasticsearch_slm_stats_snapshot_deletion_failures_total counter - elasticsearch_slm_stats_snapshot_deletion_failures_total{policy="everything"} 0 + elasticsearch_slm_stats_snapshot_deletion_failures_total{cluster="unknown_cluster",policy="everything"} 0 # HELP elasticsearch_slm_stats_snapshots_deleted_total Total snapshots deleted # TYPE elasticsearch_slm_stats_snapshots_deleted_total counter - elasticsearch_slm_stats_snapshots_deleted_total{policy="everything"} 20 + elasticsearch_slm_stats_snapshots_deleted_total{cluster="unknown_cluster",policy="everything"} 20 # HELP elasticsearch_slm_stats_snapshots_failed_total Total snapshots failed # TYPE elasticsearch_slm_stats_snapshots_failed_total counter - elasticsearch_slm_stats_snapshots_failed_total{policy="everything"} 2 + elasticsearch_slm_stats_snapshots_failed_total{cluster="unknown_cluster",policy="everything"} 2 # HELP elasticsearch_slm_stats_snapshots_taken_total Total snapshots taken # TYPE elasticsearch_slm_stats_snapshots_taken_total counter - elasticsearch_slm_stats_snapshots_taken_total{policy="everything"} 50 + elasticsearch_slm_stats_snapshots_taken_total{cluster="unknown_cluster",policy="everything"} 50 # HELP elasticsearch_slm_stats_total_snapshot_deletion_failures_total Total snapshot deletion failures # TYPE elasticsearch_slm_stats_total_snapshot_deletion_failures_total counter - elasticsearch_slm_stats_total_snapshot_deletion_failures_total 0 + elasticsearch_slm_stats_total_snapshot_deletion_failures_total{cluster="unknown_cluster"} 0 # HELP elasticsearch_slm_stats_total_snapshots_deleted_total Total snapshots deleted # TYPE elasticsearch_slm_stats_total_snapshots_deleted_total counter - elasticsearch_slm_stats_total_snapshots_deleted_total 20 + elasticsearch_slm_stats_total_snapshots_deleted_total{cluster="unknown_cluster"} 20 # HELP elasticsearch_slm_stats_total_snapshots_failed_total Total snapshots failed # TYPE elasticsearch_slm_stats_total_snapshots_failed_total counter - elasticsearch_slm_stats_total_snapshots_failed_total 2 + elasticsearch_slm_stats_total_snapshots_failed_total{cluster="unknown_cluster"} 2 # HELP elasticsearch_slm_stats_total_snapshots_taken_total Total snapshots taken # TYPE elasticsearch_slm_stats_total_snapshots_taken_total counter - elasticsearch_slm_stats_total_snapshots_taken_total 103 + elasticsearch_slm_stats_total_snapshots_taken_total{cluster="unknown_cluster"} 103 `, }, } @@ -123,7 +126,10 @@ func TestSLM(t *testing.T) { t.Fatalf("Failed to parse URL: %s", err) } - s, err := NewSLM(promslog.NewNopLogger(), u, http.DefaultClient) + logger := promslog.NewNopLogger() + ci := clusterinfo.New(logger, http.DefaultClient, u, time.Duration(300000000000)) + + s, err := NewSLM(logger, u, http.DefaultClient, ci) if err != nil { t.Fatal(err) } diff --git a/collector/snapshots.go b/collector/snapshots.go index 9482dbde..455e5059 100644 --- a/collector/snapshots.go +++ b/collector/snapshots.go @@ -23,6 +23,8 @@ import ( "path" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) var ( @@ -102,7 +104,7 @@ type Snapshots struct { } // NewSnapshots defines Snapshots Prometheus metrics -func NewSnapshots(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, error) { +func NewSnapshots(logger *slog.Logger, u *url.URL, hc *http.Client, ci *clusterinfo.Retriever) (Collector, error) { return &Snapshots{ logger: logger, u: u, diff --git a/collector/snapshots_test.go b/collector/snapshots_test.go index cf4b2d6a..4d23d492 100644 --- a/collector/snapshots_test.go +++ b/collector/snapshots_test.go @@ -22,9 +22,12 @@ import ( "os" "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/promslog" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) func TestSnapshots(t *testing.T) { @@ -209,7 +212,9 @@ func TestSnapshots(t *testing.T) { t.Fatal(err) } - c, err := NewSnapshots(promslog.NewNopLogger(), u, http.DefaultClient) + logger := promslog.NewNopLogger() + ci := clusterinfo.New(logger, http.DefaultClient, u, time.Duration(300000000000)) + c, err := NewSnapshots(logger, u, http.DefaultClient, ci) if err != nil { t.Fatal(err) } diff --git a/collector/tasks.go b/collector/tasks.go index faaef2da..b2829986 100644 --- a/collector/tasks.go +++ b/collector/tasks.go @@ -24,6 +24,8 @@ import ( "github.com/alecthomas/kingpin/v2" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) // filterByTask global required because collector interface doesn't expose any way to take @@ -50,7 +52,7 @@ type TaskCollector struct { } // NewTaskCollector defines Task Prometheus metrics -func NewTaskCollector(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, error) { +func NewTaskCollector(logger *slog.Logger, u *url.URL, hc *http.Client, ci *clusterinfo.Retriever) (Collector, error) { logger.Info("task collector created", "actionFilter", actionFilter, ) diff --git a/collector/tasks_test.go b/collector/tasks_test.go index e471eba4..fddfd2a9 100644 --- a/collector/tasks_test.go +++ b/collector/tasks_test.go @@ -20,9 +20,12 @@ import ( "net/url" "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/promslog" + + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) func TestTasks(t *testing.T) { @@ -65,7 +68,9 @@ elasticsearch_task_stats_action{action="indices:data/write/index"} 1 t.Fatalf("Failed to parse URL: %s", err) } - c, err := NewTaskCollector(promslog.NewNopLogger(), u, ts.Client()) + logger := promslog.NewNopLogger() + ci := clusterinfo.New(logger, http.DefaultClient, u, time.Duration(300000000000)) + c, err := NewTaskCollector(promslog.NewNopLogger(), u, ts.Client(), ci) if err != nil { t.Fatalf("Failed to create collector: %v", err) } diff --git a/main.go b/main.go index 9ab67e2c..3266847d 100644 --- a/main.go +++ b/main.go @@ -177,10 +177,15 @@ func main() { // version metric prometheus.MustRegister(versioncollector.NewCollector(name)) + // TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors. + // cluster info retriever + clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval) + // create the exporter exporter, err := collector.NewElasticsearchCollector( logger, []string{}, + clusterInfoRetriever, collector.WithElasticsearchURL(esURL), collector.WithHTTPClient(httpClient), ) @@ -190,10 +195,6 @@ func main() { } prometheus.MustRegister(exporter) - // TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors. - // cluster info retriever - clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval) - prometheus.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL)) prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode)) diff --git a/pkg/clusterinfo/clusterinfo.go b/pkg/clusterinfo/clusterinfo.go index f47659c3..665ff197 100644 --- a/pkg/clusterinfo/clusterinfo.go +++ b/pkg/clusterinfo/clusterinfo.go @@ -46,6 +46,7 @@ type consumer interface { ClusterLabelUpdates() *chan *Response // String implements the stringer interface String() string + SetClusterInfo(*Response) } // Retriever periodically gets the cluster info from the / endpoint and @@ -160,6 +161,18 @@ func (r *Retriever) RegisterConsumer(c consumer) error { return ErrConsumerAlreadyRegistered } r.consumerChannels[c.String()] = c.ClusterLabelUpdates() + + // start go routine to fetch clusterinfo updates and save them to the consumer + go func() { + r.logger.Debug("starting cluster info receive loop", "consumer", c.String()) + for ci := range *c.ClusterLabelUpdates() { + if ci != nil { + r.logger.Debug("received cluster info update", "consumer", c.String(), "cluster", ci.ClusterName) + c.SetClusterInfo(ci) + } + } + r.logger.Debug("exiting cluster info receive loop", "consumer", c.String()) + }() return nil } diff --git a/pkg/clusterinfo/clusterinfo_test.go b/pkg/clusterinfo/clusterinfo_test.go index 0593a5ed..6dc1507b 100644 --- a/pkg/clusterinfo/clusterinfo_test.go +++ b/pkg/clusterinfo/clusterinfo_test.go @@ -113,6 +113,12 @@ func (mc *mockConsumer) ClusterLabelUpdates() *chan *Response { return &mc.ch } +func (mc *mockConsumer) SetClusterInfo(r *Response) { + mc.mu.Lock() + mc.data = r + mc.mu.Unlock() +} + func TestNew(t *testing.T) { u, err := url.Parse("http://localhost:9200") if err != nil {