Skip to content

Commit 1290431

Browse files
committed
implement parallel queries
Signed-off-by: Markus Blaschke <mblaschke82@gmail.com>
1 parent 594495c commit 1290431

File tree

6 files changed

+157
-71
lines changed

6 files changed

+157
-71
lines changed

config/opts.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type (
2121

2222
Loganalytics struct {
2323
Workspace []string `long:"loganalytics.workspace" env:"LOGANALYTICS_WORKSPACE" env-delim:" " description:"Loganalytics workspace IDs" required:"true"`
24+
Parallel int `long:"loganalytics.parallel" env:"LOGANALYTICS_PARALLEL" description:"Specifies how many workspaces should be queried in parallel" default:"5"`
2425
}
2526

2627
// config

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/patrickmn/go-cache v2.1.0+incompatible
1313
github.com/prometheus/client_golang v1.10.0
1414
github.com/prometheus/common v0.23.0 // indirect
15+
github.com/remeh/sizedwaitgroup v1.0.0
1516
github.com/sirupsen/logrus v1.8.1
1617
github.com/webdevops/azure-resourcegraph-exporter v0.0.0-20210505184535-837efac736c6
1718
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,8 @@ github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
281281
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
282282
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
283283
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
284+
github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E=
285+
github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo=
284286
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
285287
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
286288
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=

loganalytics.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package main
2+
3+
import (
4+
"context"
5+
operationalinsights "github.com/Azure/azure-sdk-for-go/services/operationalinsights/v1/operationalinsights"
6+
"github.com/Azure/go-autorest/autorest"
7+
"github.com/Azure/go-autorest/autorest/to"
8+
log "github.com/sirupsen/logrus"
9+
"github.com/webdevops/azure-resourcegraph-exporter/kusto"
10+
"net/http"
11+
)
12+
13+
const (
14+
OPINSIGHTS_URL_SUFFIX = "/v1"
15+
)
16+
17+
type (
18+
probeResult struct {
19+
Name string
20+
Metrics []kusto.MetricRow
21+
}
22+
)
23+
24+
func NewLoganalyticsQueryClient() operationalinsights.QueryClient {
25+
// Create and authorize a operationalinsights client
26+
client := operationalinsights.NewQueryClientWithBaseURI(AzureEnvironment.ResourceIdentifiers.OperationalInsights + OPINSIGHTS_URL_SUFFIX)
27+
client.Authorizer = OpInsightsAuthorizer
28+
client.ResponseInspector = respondDecorator()
29+
return client
30+
}
31+
32+
func SendQueryToLoganalyticsWorkspace(ctx context.Context, logger *log.Entry, workspaceId string, queryClient operationalinsights.QueryClient, queryConfig kusto.ConfigQuery, result chan<- probeResult) {
33+
workspaceLogger := logger.WithField("workspaceId", workspaceId)
34+
35+
// Set options
36+
workspaces := []string{}
37+
queryBody := operationalinsights.QueryBody{
38+
Query: &queryConfig.Query,
39+
Timespan: queryConfig.Timespan,
40+
Workspaces: &workspaces,
41+
}
42+
43+
workspaceLogger.WithField("query", queryConfig.Query).Debug("send query to loganaltyics workspace")
44+
var results, queryErr = queryClient.Execute(ctx, workspaceId, queryBody)
45+
46+
if queryErr == nil {
47+
workspaceLogger.Debug("fetched query result")
48+
resultTables := *results.Tables
49+
50+
if len(resultTables) >= 1 {
51+
for _, table := range resultTables {
52+
if table.Rows == nil || table.Columns == nil {
53+
// no results found, skip table
54+
continue
55+
}
56+
57+
for _, v := range *table.Rows {
58+
resultRow := map[string]interface{}{}
59+
60+
for colNum, colName := range *resultTables[0].Columns {
61+
resultRow[to.String(colName.Name)] = v[colNum]
62+
}
63+
64+
for metricName, metric := range kusto.BuildPrometheusMetricList(queryConfig.Metric, queryConfig.MetricConfig, resultRow) {
65+
// inject workspaceId
66+
for num := range metric {
67+
metric[num].Labels["workspaceTable"] = to.String(table.Name)
68+
metric[num].Labels["workspaceID"] = workspaceId
69+
}
70+
71+
result <- probeResult{
72+
Name: metricName,
73+
Metrics: metric,
74+
}
75+
}
76+
}
77+
}
78+
}
79+
80+
workspaceLogger.Debug("metrics parsed")
81+
} else {
82+
workspaceLogger.Panic(queryErr.Error())
83+
}
84+
}
85+
86+
func respondDecorator() autorest.RespondDecorator {
87+
return func(p autorest.Responder) autorest.Responder {
88+
return autorest.ResponderFunc(func(r *http.Response) error {
89+
return nil
90+
})
91+
}
92+
}

misc.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package main
2+
3+
import (
4+
"github.com/remeh/sizedwaitgroup"
5+
"net/http"
6+
"sync"
7+
)
8+
9+
func NewWaitGroup() sync.WaitGroup {
10+
return sync.WaitGroup{}
11+
}
12+
13+
func NewWaitGroupWithSize(r *http.Request) sizedwaitgroup.SizedWaitGroup {
14+
size := opts.Loganalytics.Parallel
15+
return sizedwaitgroup.New(size)
16+
}

probe.go

Lines changed: 45 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
operationalinsights "github.com/Azure/azure-sdk-for-go/services/operationalinsights/v1/operationalinsights"
8-
"github.com/Azure/go-autorest/autorest"
9-
"github.com/Azure/go-autorest/autorest/to"
107
"github.com/prometheus/client_golang/prometheus"
118
"github.com/prometheus/client_golang/prometheus/promhttp"
129
log "github.com/sirupsen/logrus"
@@ -15,11 +12,15 @@ import (
1512
"time"
1613
)
1714

18-
const (
19-
OPINSIGHTS_URL_SUFFIX = "/v1"
20-
)
21-
2215
func handleProbeRequest(w http.ResponseWriter, r *http.Request) {
16+
defer func() {
17+
if r := recover(); r != nil {
18+
log.Errorf(fmt.Sprintf("%v", r))
19+
http.Error(w, fmt.Sprintf("%v", r), http.StatusBadRequest)
20+
return
21+
}
22+
}()
23+
2324
registry := prometheus.NewRegistry()
2425

2526
requestTime := time.Now()
@@ -42,11 +43,7 @@ func handleProbeRequest(w http.ResponseWriter, r *http.Request) {
4243
}
4344

4445
ctx := context.Background()
45-
46-
// Create and authorize a operationalinsights client
47-
queryClient := operationalinsights.NewQueryClientWithBaseURI(AzureEnvironment.ResourceIdentifiers.OperationalInsights + OPINSIGHTS_URL_SUFFIX)
48-
queryClient.Authorizer = OpInsightsAuthorizer
49-
queryClient.ResponseInspector = respondDecorator()
46+
queryClient := NewLoganalyticsQueryClient()
5047

5148
metricList := kusto.MetricList{}
5249
metricList.Init()
@@ -69,7 +66,9 @@ func handleProbeRequest(w http.ResponseWriter, r *http.Request) {
6966

7067
if executeQuery {
7168
w.Header().Add("X-metrics-cached", "false")
72-
for _, queryConfig := range Config.Queries {
69+
for _, queryRow := range Config.Queries {
70+
queryConfig := queryRow
71+
7372
// check if query matches module name
7473
if queryConfig.Module != moduleName {
7574
continue
@@ -88,63 +87,46 @@ func handleProbeRequest(w http.ResponseWriter, r *http.Request) {
8887
contextLogger.Debug("starting query")
8988

9089
resultTotalRecords := 0
91-
for _, workspaceId := range opts.Loganalytics.Workspace {
92-
workspaceLogger := contextLogger.WithField("workspaceId", workspaceId)
93-
94-
// Set options
95-
workspaces := []string{}
96-
queryBody := operationalinsights.QueryBody{
97-
Query: &queryConfig.Query,
98-
Timespan: queryConfig.Timespan,
99-
Workspaces: &workspaces,
90+
91+
resultChannel := make(chan probeResult)
92+
wgProbes := NewWaitGroupWithSize(r)
93+
wgProcess := NewWaitGroup()
94+
95+
// collect metrics
96+
wgProcess.Add(1)
97+
go func() {
98+
defer wgProcess.Done()
99+
for result := range resultChannel {
100+
resultTotalRecords++
101+
metricList.Add(result.Name, result.Metrics...)
100102
}
103+
}()
101104

105+
// query workspaces
106+
for _, row := range opts.Loganalytics.Workspace {
107+
workspaceId := row
102108
// Run the query and get the results
103109
prometheusQueryRequests.With(prometheus.Labels{"workspace": workspaceId, "module": moduleName, "metric": queryConfig.Metric}).Inc()
104110

105-
var results, queryErr = queryClient.Execute(ctx, workspaceId, queryBody)
106-
resultTotalRecords = 1
107-
108-
if queryErr == nil {
109-
contextLogger.Debug("parsing result")
110-
resultTables := *results.Tables
111-
112-
if len(resultTables) >= 1 {
113-
for _, table := range resultTables {
114-
if table.Rows == nil || table.Columns == nil {
115-
// no results found, skip table
116-
continue
117-
}
118-
119-
for _, v := range *table.Rows {
120-
resultTotalRecords++
121-
resultRow := map[string]interface{}{}
122-
123-
for colNum, colName := range *resultTables[0].Columns {
124-
resultRow[to.String(colName.Name)] = v[colNum]
125-
}
126-
127-
for metricName, metric := range kusto.BuildPrometheusMetricList(queryConfig.Metric, queryConfig.MetricConfig, resultRow) {
128-
// inject workspaceId
129-
for num := range metric {
130-
metric[num].Labels["workspaceTable"] = to.String(table.Name)
131-
metric[num].Labels["workspaceID"] = workspaceId
132-
}
133-
134-
metricList.Add(metricName, metric...)
135-
}
136-
}
137-
}
138-
}
139-
140-
workspaceLogger.Debug("metrics parsed")
141-
} else {
142-
workspaceLogger.Errorln(queryErr.Error())
143-
http.Error(w, queryErr.Error(), http.StatusBadRequest)
144-
return
145-
}
111+
wgProbes.Add()
112+
go func() {
113+
defer wgProbes.Done()
114+
SendQueryToLoganalyticsWorkspace(
115+
ctx,
116+
contextLogger,
117+
workspaceId,
118+
queryClient,
119+
queryConfig,
120+
resultChannel,
121+
)
122+
}()
146123
}
147124

125+
// wait until queries are done for closing channel and waiting for result process
126+
wgProbes.Wait()
127+
close(resultChannel)
128+
wgProcess.Wait()
129+
148130
elapsedTime := time.Since(startTime)
149131
contextLogger.WithField("results", resultTotalRecords).Debugf("fetched %v results", resultTotalRecords)
150132
prometheusQueryTime.With(prometheus.Labels{"module": moduleName, "metric": queryConfig.Metric}).Observe(elapsedTime.Seconds())
@@ -186,11 +168,3 @@ func handleProbeRequest(w http.ResponseWriter, r *http.Request) {
186168
h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
187169
h.ServeHTTP(w, r)
188170
}
189-
190-
func respondDecorator() autorest.RespondDecorator {
191-
return func(p autorest.Responder) autorest.Responder {
192-
return autorest.ResponderFunc(func(r *http.Response) error {
193-
return nil
194-
})
195-
}
196-
}

0 commit comments

Comments
 (0)