Skip to content

Commit 3ea19c9

Browse files
committed
implement servicediscovery caching and soft fail for workspaces
- add metric azure_loganalytics_status - add metric azure_loganalytics_last_query_successfull Signed-off-by: Markus Blaschke <mblaschke82@gmail.com>
1 parent 4d84c98 commit 3ea19c9

File tree

5 files changed

+161
-47
lines changed

5 files changed

+161
-47
lines changed

README.md

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ Azure LogAnalytics exporter
77

88
Prometheus exporter for Azure LogAnalytics Kusto queries with configurable fields and transformations.
99

10+
`azure-loganalytics-exporter` can query configured workspaces or all workspaces in one or multiple subscriptions.
11+
The exporter can also cache metrics and servicediscovery information to reduce requests against workspaces and Azure API.
12+
1013
Usage
1114
-----
1215

@@ -15,18 +18,20 @@ Usage:
1518
azure-loganalytics-exporter [OPTIONS]
1619
1720
Application Options:
18-
--debug debug mode [$DEBUG]
19-
-v, --verbose verbose mode [$VERBOSE]
20-
--log.json Switch log output to json format [$LOG_JSON]
21-
--azure.environment= Azure environment name (default: AZUREPUBLICCLOUD) [$AZURE_ENVIRONMENT]
22-
--loganalytics.workspace= Loganalytics workspace IDs [$LOGANALYTICS_WORKSPACE]
23-
--loganalytics.parallel= Specifies how many workspaces should be queried in parallel (default: 5)
24-
[$LOGANALYTICS_PARALLEL]
25-
-c, --config= Config path [$CONFIG]
26-
--bind= Server address (default: :8080) [$SERVER_BIND]
21+
--debug debug mode [$DEBUG]
22+
-v, --verbose verbose mode [$VERBOSE]
23+
--log.json Switch log output to json format [$LOG_JSON]
24+
--azure.environment= Azure environment name (default: AZUREPUBLICCLOUD) [$AZURE_ENVIRONMENT]
25+
--azure.servicediscovery.cache= Duration for caching Azure ServiceDiscovery of workspaces to reduce API
26+
calls (time.Duration) (default: 30m) [$AZURE_SERVICEDISCOVERY_CACHE]
27+
--loganalytics.workspace= Loganalytics workspace IDs [$LOGANALYTICS_WORKSPACE]
28+
--loganalytics.parallel= Specifies how many workspaces should be queried in parallel (default: 5)
29+
[$LOGANALYTICS_PARALLEL]
30+
-c, --config= Config path [$CONFIG]
31+
--bind= Server address (default: :8080) [$SERVER_BIND]
2732
2833
Help Options:
29-
-h, --help Show this help message
34+
-h, --help Show this help message
3035
```
3136

3237
for Azure API authentication (using ENV vars) see https://github.com/Azure/azure-sdk-for-go#authentication
@@ -50,6 +55,8 @@ HINT: parameters of type `multiple` can be either specified multiple times and/o
5055

5156
#### /probe parameters
5257

58+
uses predefined workspace list defined as parameter/environment variable on startup
59+
5360
| GET parameter | Default | Required | Multiple | Description |
5461
|------------------------|---------------------------|----------|----------|----------------------------------------------------------------------|
5562
| `module` | | no | no | Filter queries by module name |
@@ -58,6 +65,8 @@ HINT: parameters of type `multiple` can be either specified multiple times and/o
5865

5966
#### /probe/workspace parameters
6067

68+
uses dynamically passed workspaces via HTTP query parameter
69+
6170
| GET parameter | Default | Required | Multiple | Description |
6271
|------------------------|---------------------------|----------|----------|----------------------------------------------------------------------|
6372
| `module` | | no | no | Filter queries by module name |
@@ -67,6 +76,8 @@ HINT: parameters of type `multiple` can be either specified multiple times and/o
6776

6877
#### /probe/subscription parameters
6978

79+
uses Azure service discovery to find all workspaces in one or multiple subscriptions
80+
7081
| GET parameter | Default | Required | Multiple | Description |
7182
|------------------------|---------------------------|----------|----------|----------------------------------------------------------------------|
7283
| `module` | | no | no | Filter queries by module name |
@@ -79,11 +90,13 @@ Global metrics
7990

8091
available on `/metrics`
8192

82-
| Metric | Description |
83-
|--------------------------------------|--------------------------------------------------------------------------------|
84-
| `azure_loganalytics_query_time` | Summary metric about query execution time (incl. all subqueries) |
85-
| `azure_loganalytics_query_results` | Number of results from query |
86-
| `azure_loganalytics_query_requests` | Count of requests (eg paged subqueries) per query |
93+
| Metric | Description |
94+
|---------------------------------------------|--------------------------------------------------------------------------------|
95+
| `azure_loganalytics_status` | Status if query was successfull (per workspace, module, metric) |
96+
| `azure_loganalytics_last_query_successfull` | Timestamp of last successfull query (per workspace, module, metric) |
97+
| `azure_loganalytics_query_time` | Summary metric about query execution time (incl. all subqueries) |
98+
| `azure_loganalytics_query_results` | Number of results from query |
99+
| `azure_loganalytics_query_requests` | Count of requests (eg paged subqueries) per query |
87100

88101

89102
Examples

config/opts.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package config
33
import (
44
"encoding/json"
55
log "github.com/sirupsen/logrus"
6+
"time"
67
)
78

89
type (
@@ -16,7 +17,10 @@ type (
1617

1718
// azure
1819
Azure struct {
19-
Environment *string `long:"azure.environment" env:"AZURE_ENVIRONMENT" description:"Azure environment name" default:"AZUREPUBLICCLOUD"`
20+
Environment *string `long:"azure.environment" env:"AZURE_ENVIRONMENT" description:"Azure environment name" default:"AZUREPUBLICCLOUD"`
21+
ServiceDiscovery struct {
22+
CacheDuration *time.Duration `long:"azure.servicediscovery.cache" env:"AZURE_SERVICEDISCOVERY_CACHE" description:"Duration for caching Azure ServiceDiscovery of workspaces to reduce API calls (time.Duration)" default:"30m"`
23+
}
2024
}
2125

2226
Loganalytics struct {

loganalytics/global-metrics.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package loganalytics
33
import "github.com/prometheus/client_golang/prometheus"
44

55
var (
6-
prometheusQueryTime *prometheus.SummaryVec
7-
prometheusQueryResults *prometheus.GaugeVec
8-
prometheusQueryRequests *prometheus.CounterVec
6+
prometheusQueryTime *prometheus.SummaryVec
7+
prometheusQueryResults *prometheus.GaugeVec
8+
prometheusQueryRequests *prometheus.CounterVec
9+
prometheusQueryStatus *prometheus.GaugeVec
10+
prometheusQueryLastSuccessfull *prometheus.GaugeVec
911
)
1012

1113
func InitGlobalMetrics() {
@@ -39,10 +41,36 @@ func InitGlobalMetrics() {
3941
Help: "Azure loganalytics query request count",
4042
},
4143
[]string{
42-
"workspace",
44+
"workspaceID",
4345
"module",
4446
"metric",
4547
},
4648
)
4749
prometheus.MustRegister(prometheusQueryRequests)
50+
51+
prometheusQueryStatus = prometheus.NewGaugeVec(
52+
prometheus.GaugeOpts{
53+
Name: "azure_loganalytics_status",
54+
Help: "Azure loganalytics workspace status",
55+
},
56+
[]string{
57+
"workspaceID",
58+
"module",
59+
"metric",
60+
},
61+
)
62+
prometheus.MustRegister(prometheusQueryStatus)
63+
64+
prometheusQueryLastSuccessfull = prometheus.NewGaugeVec(
65+
prometheus.GaugeOpts{
66+
Name: "azure_loganalytics_last_query_successfull",
67+
Help: "Azure loganalytics workspace last successfull scrape time",
68+
},
69+
[]string{
70+
"workspaceID",
71+
"module",
72+
"metric",
73+
},
74+
)
75+
prometheus.MustRegister(prometheusQueryLastSuccessfull)
4876
}

loganalytics/prober.go

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package loganalytics
22

33
import (
44
"context"
5+
"crypto/sha1"
56
"encoding/json"
67
"fmt"
78
"github.com/Azure/azure-sdk-for-go/services/operationalinsights/v1/operationalinsights"
@@ -59,9 +60,10 @@ type (
5960
}
6061

6162
LogAnalyticsProbeResult struct {
62-
Name string
63-
Metrics []kusto.MetricRow
64-
Error error
63+
WorkspaceId string
64+
Name string
65+
Metrics []kusto.MetricRow
66+
Error error
6567
}
6668

6769
LogAnalyticsPanicStop struct {
@@ -103,13 +105,12 @@ func (p *LogAnalyticsProber) Init() {
103105
if cacheTime.Seconds() > 0 {
104106
p.config.cacheEnabled = true
105107
p.config.cacheDuration = &cacheTime
106-
107-
cacheKey := fmt.Sprintf(
108-
"cache:%s",
109-
p.request.RequestURI,
108+
p.config.cacheKey = to.StringPtr(
109+
fmt.Sprintf(
110+
"metrics:%x",
111+
string(sha1.New().Sum([]byte(p.request.RequestURI))),
112+
),
110113
)
111-
p.config.cacheKey = &cacheKey
112-
fmt.Println(*p.config.cacheKey)
113114
}
114115
}
115116

@@ -146,11 +147,11 @@ func (p *LogAnalyticsProber) Run() {
146147
if v, ok := p.cache.Get(*p.config.cacheKey); ok {
147148
if cacheData, ok := v.([]byte); ok {
148149
if err := json.Unmarshal(cacheData, &p.metricList); err == nil {
149-
p.logger.Debug("fetched from cache")
150+
p.logger.Debug("fetched metrics from cache")
150151
p.response.Header().Add("X-metrics-cached", "true")
151152
executeQuery = false
152153
} else {
153-
p.logger.Debug("unable to parse cache data")
154+
p.logger.Debug("unable to parse cached metrics")
154155
}
155156
}
156157
}
@@ -160,7 +161,7 @@ func (p *LogAnalyticsProber) Run() {
160161
p.response.Header().Add("X-metrics-cached", "false")
161162

162163
if p.ServiceDiscovery.enabled {
163-
p.ServiceDiscovery.Find()
164+
p.ServiceDiscovery.ServiceDiscovery()
164165
}
165166

166167
p.executeQueries()
@@ -230,7 +231,7 @@ func (p *LogAnalyticsProber) executeQueries() {
230231
for _, row := range p.workspaceList {
231232
workspaceId := row
232233
// Run the query and get the results
233-
prometheusQueryRequests.With(prometheus.Labels{"workspace": workspaceId, "module": p.config.moduleName, "metric": queryConfig.Metric}).Inc()
234+
prometheusQueryRequests.With(prometheus.Labels{"workspaceID": workspaceId, "module": p.config.moduleName, "metric": queryConfig.Metric}).Inc()
234235

235236
wgProbes.Add()
236237
go func() {
@@ -255,9 +256,26 @@ func (p *LogAnalyticsProber) executeQueries() {
255256
if result.Error == nil {
256257
resultTotalRecords++
257258
p.metricList.Add(result.Name, result.Metrics...)
259+
260+
prometheusQueryStatus.With(prometheus.Labels{
261+
"module": p.config.moduleName,
262+
"metric": queryConfig.Metric,
263+
"workspaceID": result.WorkspaceId,
264+
}).Set(1)
265+
266+
prometheusQueryLastSuccessfull.With(prometheus.Labels{
267+
"module": p.config.moduleName,
268+
"metric": queryConfig.Metric,
269+
"workspaceID": result.WorkspaceId,
270+
}).SetToCurrentTime()
258271
} else {
272+
prometheusQueryStatus.With(prometheus.Labels{
273+
"module": p.config.moduleName,
274+
"metric": queryConfig.Metric,
275+
"workspaceID": result.WorkspaceId,
276+
}).Set(0)
277+
259278
contextLogger.Error(result.Error)
260-
panic(LogAnalyticsPanicStop{Message: result.Error.Error()})
261279
}
262280
}
263281

@@ -308,8 +326,9 @@ func (p *LogAnalyticsProber) sendQueryToWorkspace(logger *log.Entry, workspaceId
308326
}
309327

310328
result <- LogAnalyticsProbeResult{
311-
Name: metricName,
312-
Metrics: metric,
329+
WorkspaceId: workspaceId,
330+
Name: metricName,
331+
Metrics: metric,
313332
}
314333
}
315334
}

loganalytics/servicediscovery.go

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package loganalytics
22

33
import (
4+
"crypto/sha1"
5+
"encoding/json"
6+
"fmt"
47
operationalinsightsProfile "github.com/Azure/azure-sdk-for-go/profiles/latest/operationalinsights/mgmt/operationalinsights"
58
"github.com/Azure/go-autorest/autorest/to"
69
log "github.com/sirupsen/logrus"
10+
"time"
711
)
812

913
type (
@@ -14,9 +18,11 @@ type (
1418
)
1519

1620
func (sd *LogAnalyticsServiceDiscovery) ResourcesClient(subscriptionId string) *operationalinsightsProfile.WorkspacesClient {
17-
client := operationalinsightsProfile.NewWorkspacesClientWithBaseURI(sd.prober.Azure.Environment.ResourceManagerEndpoint, subscriptionId)
18-
client.Authorizer = sd.prober.Azure.AzureAuthorizer
19-
client.ResponseInspector = sd.prober.respondDecorator(&subscriptionId)
21+
prober := sd.prober
22+
23+
client := operationalinsightsProfile.NewWorkspacesClientWithBaseURI(prober.Azure.Environment.ResourceManagerEndpoint, subscriptionId)
24+
client.Authorizer = prober.Azure.AzureAuthorizer
25+
client.ResponseInspector = prober.respondDecorator(&subscriptionId)
2026

2127
return &client
2228
}
@@ -25,37 +31,81 @@ func (sd *LogAnalyticsServiceDiscovery) Use() {
2531
sd.enabled = true
2632
}
2733

28-
func (sd *LogAnalyticsServiceDiscovery) Find() {
29-
contextLogger := sd.prober.logger
34+
func (sd *LogAnalyticsServiceDiscovery) ServiceDiscovery() {
35+
var serviceDiscoveryCacheDuration *time.Duration
36+
cacheKey := ""
37+
prober := sd.prober
3038

31-
contextLogger.Debug("requesting list for workspaces via Azure API")
39+
contextLogger := prober.logger
3240

33-
params := sd.prober.request.URL.Query()
41+
params := prober.request.URL.Query()
3442

3543
subscriptionList, err := ParamsGetListRequired(params, "subscription")
3644
if err != nil {
3745
contextLogger.Error(err)
3846
panic(LogAnalyticsPanicStop{Message: err.Error()})
3947
}
4048

49+
if prober.cache != nil && prober.Conf.Azure.ServiceDiscovery.CacheDuration != nil && prober.Conf.Azure.ServiceDiscovery.CacheDuration.Seconds() > 0 {
50+
serviceDiscoveryCacheDuration = prober.Conf.Azure.ServiceDiscovery.CacheDuration
51+
cacheKey = fmt.Sprintf(
52+
"sd:%x",
53+
string(sha1.New().Sum([]byte(fmt.Sprintf("%v", subscriptionList)))),
54+
)
55+
fmt.Println(cacheKey)
56+
}
57+
58+
// try cache
59+
if serviceDiscoveryCacheDuration != nil {
60+
if v, ok := prober.cache.Get(cacheKey); ok {
61+
if cacheData, ok := v.([]byte); ok {
62+
if err := json.Unmarshal(cacheData, &prober.workspaceList); err == nil {
63+
contextLogger.Debug("fetched servicediscovery from cache")
64+
prober.response.Header().Add("X-servicediscovery-cached", "true")
65+
return
66+
} else {
67+
prober.logger.Debug("unable to parse cached servicediscovery")
68+
}
69+
}
70+
}
71+
}
72+
73+
contextLogger.Debug("requesting list for workspaces via Azure API")
74+
sd.requestWorkspacesFromAzure(contextLogger, subscriptionList)
75+
76+
// store to cache (if enabeld)
77+
if serviceDiscoveryCacheDuration != nil {
78+
contextLogger.Debug("saving servicedisccovery to cache")
79+
if cacheData, err := json.Marshal(prober.workspaceList); err == nil {
80+
prober.response.Header().Add("X-servicediscovery-cached-until", time.Now().Add(*serviceDiscoveryCacheDuration).Format(time.RFC3339))
81+
prober.cache.Set(cacheKey, cacheData, *serviceDiscoveryCacheDuration)
82+
contextLogger.Debugf("saved servicediscovery to cache for %s", serviceDiscoveryCacheDuration.String())
83+
}
84+
}
85+
}
86+
87+
func (sd *LogAnalyticsServiceDiscovery) requestWorkspacesFromAzure(logger *log.Entry, subscriptionList []string) {
88+
prober := sd.prober
89+
4190
for _, subscriptionId := range subscriptionList {
42-
subscriptionLogger := contextLogger.WithFields(log.Fields{
91+
subscriptionLogger := logger.WithFields(log.Fields{
4392
"subscription": subscriptionId,
4493
})
4594

46-
list, err := sd.ResourcesClient(subscriptionId).List(sd.prober.ctx)
95+
list, err := sd.ResourcesClient(subscriptionId).List(prober.ctx)
4796
if err != nil {
4897
subscriptionLogger.Error(err)
4998
panic(LogAnalyticsPanicStop{Message: err.Error()})
5099
}
51100

52101
for _, val := range *list.Value {
53102
if val.CustomerID != nil {
54-
sd.prober.workspaceList = append(
55-
sd.prober.workspaceList,
103+
prober.workspaceList = append(
104+
prober.workspaceList,
56105
to.String(val.CustomerID),
57106
)
58107
}
59108
}
60109
}
110+
61111
}

0 commit comments

Comments
 (0)