Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/celian-garcia_39417.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/azuremonitor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: parallelize calls by subscriptions in Batch API mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [39417]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
117 changes: 117 additions & 0 deletions receiver/azuremonitorreceiver/concurrency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package azuremonitorreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver"

import (
"sync"

cmap "github.com/orcaman/concurrent-map/v2"
)

type concurrentMetricsBuilderMap[V any] interface {
Get(string) (V, bool)
Set(string, V)
Clear()
Range(func(string, V))
}

// Implementation with concurrent-map (generic API)
type concurrentMapImpl[V any] struct {
m cmap.ConcurrentMap[string, V]
}

func newConcurrentMapImpl[V any]() concurrentMetricsBuilderMap[V] {
return &concurrentMapImpl[V]{m: cmap.New[V]()}
}

func (c *concurrentMapImpl[V]) Get(key string) (V, bool) {
return c.m.Get(key)
}

func (c *concurrentMapImpl[V]) Set(key string, value V) {
c.m.Set(key, value)
}

func (c *concurrentMapImpl[V]) Clear() {
c.m.Clear()
}

func (c *concurrentMapImpl[V]) Range(f func(string, V)) {
c.m.IterCb(f)
}

// Implementation with sync.Map

type syncMapImpl[V any] struct {
m sync.Map
}

func NewSyncMapImpl[V any]() concurrentMetricsBuilderMap[V] {
return &syncMapImpl[V]{}
}

func (s *syncMapImpl[V]) Get(key string) (V, bool) {
v, ok := s.m.Load(key)
if !ok {
var zero V
return zero, false
}
return v.(V), true
}

func (s *syncMapImpl[V]) Set(key string, value V) {
s.m.Store(key, value)
}

func (s *syncMapImpl[V]) Clear() {
s.m.Range(func(k, _ any) bool {
s.m.Delete(k)
return true
})
}

func (s *syncMapImpl[V]) Range(f func(string, V)) {
s.m.Range(func(k, v any) bool {
f(k.(string), v.(V))
return true
})
}

// Implementation with classic map and mutex

type mutexMapImpl[V any] struct {
m map[string]V
mutex sync.RWMutex
}

func NewMutexMapImpl[V any]() concurrentMetricsBuilderMap[V] {
return &mutexMapImpl[V]{m: make(map[string]V)}
}

func (mm *mutexMapImpl[V]) Get(key string) (V, bool) {
mm.mutex.RLock()
defer mm.mutex.RUnlock()
v, ok := mm.m[key]
return v, ok
}

func (mm *mutexMapImpl[V]) Set(key string, value V) {
mm.mutex.Lock()
defer mm.mutex.Unlock()
mm.m[key] = value
}

func (mm *mutexMapImpl[V]) Clear() {
mm.mutex.Lock()
defer mm.mutex.Unlock()
mm.m = make(map[string]V)
}

func (mm *mutexMapImpl[V]) Range(f func(string, V)) {
mm.mutex.RLock()
defer mm.mutex.RUnlock()
for k, v := range mm.m {
f(k, v)
}
}
37 changes: 37 additions & 0 deletions receiver/azuremonitorreceiver/concurrency_bench_report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Concurrency Map Benchmark Report

## Context
This benchmark compares three concurrent map implementations in Go:
- **concurrentMapImpl**: Based on github.com/orcaman/concurrent-map (generic API)
- **syncMapImpl**: Based on Go's built-in sync.Map
- **mutexMapImpl**: Classic map protected by sync.RWMutex

Benchmarks were run with both small and large datasets (1 million pre-filled entries), using parallel Set/Get operations and multiple CPU counts (1, 2, 4, 8).

## Results Summary

### Small Dataset (Random keys)
- **concurrentMapImpl**: Fastest, minimal memory usage, scales well with CPU count.
- **syncMapImpl**: Slowest, highest memory allocation, scales with CPU but remains less efficient.
- **mutexMapImpl**: Intermediate performance, low memory usage, slightly less scalable with more CPUs.

### Large Dataset (1 million entries)
- **concurrentMapImpl**: Remains fastest, especially with 8 CPUs. Memory usage stays low (32–54 B/op, 1 alloc/op).
- **syncMapImpl**: Still slowest, with high memory allocation (107–110 B/op, 4 allocs/op).
- **mutexMapImpl**: Good for moderate concurrency, memory usage low, but performance drops with more CPUs.

## Recommendations
- For high concurrency and large datasets, **concurrentMapImpl** is the best choice.
- For simple or low-concurrency use cases, **mutexMapImpl** is efficient and easy to maintain.
- **syncMapImpl** is not recommended for performance-critical scenarios due to its overhead.

## Example Benchmark Output
```
BenchmarkConcurrentMapImplLarge-8 341.9 ns/op 32 B/op 1 allocs/op
BenchmarkSyncMapImplLarge-8 342.1 ns/op 107 B/op 4 allocs/op
BenchmarkMutexMapImplLarge-8 748.2 ns/op 31 B/op 1 allocs/op
```

## Conclusion
The generic concurrent-map implementation offers the best performance and scalability for concurrent workloads in Go. The classic mutex-protected map is a good fallback for simpler cases. Avoid sync.Map for intensive workloads.

73 changes: 73 additions & 0 deletions receiver/azuremonitorreceiver/concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package azuremonitorreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver"

import (
"math/rand/v2"
"strconv"
"testing"
)

func benchmarkMapImpl(b *testing.B, m concurrentMetricsBuilderMap[int]) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
key := strconv.Itoa(rand.IntN(100000))
m.Set(key, rand.Int())
_, _ = m.Get(key)
}
})
}

func BenchmarkConcurrentMapImpl(b *testing.B) {
m := newConcurrentMapImpl[int]()
benchmarkMapImpl(b, m)
}

func BenchmarkSyncMapImpl(b *testing.B) {
m := NewSyncMapImpl[int]()
benchmarkMapImpl(b, m)
}

func BenchmarkMutexMapImpl(b *testing.B) {
m := NewMutexMapImpl[int]()
benchmarkMapImpl(b, m)
}

func benchmarkMapImplLarge(b *testing.B, m concurrentMetricsBuilderMap[int]) {
// Pre-fill with 1 million entries
for i := 0; i < 1_000_000; i++ {
key := strconv.Itoa(i)
m.Set(key, i)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Randomly access existing and new keys
if rand.IntN(2) == 0 {
key := strconv.Itoa(rand.IntN(1_000_000)) // existing
m.Set(key, rand.Int())
_, _ = m.Get(key)
} else {
key := strconv.Itoa(rand.IntN(10_000_000)) // possibly new
m.Set(key, rand.Int())
_, _ = m.Get(key)
}
}
})
}

func BenchmarkConcurrentMapImplLarge(b *testing.B) {
m := newConcurrentMapImpl[int]()
benchmarkMapImplLarge(b, m)
}

func BenchmarkSyncMapImplLarge(b *testing.B) {
m := NewSyncMapImpl[int]()
benchmarkMapImplLarge(b, m)
}

func BenchmarkMutexMapImplLarge(b *testing.B) {
m := NewMutexMapImpl[int]()
benchmarkMapImplLarge(b, m)
}
3 changes: 2 additions & 1 deletion receiver/azuremonitorreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0
github.com/google/go-cmp v0.7.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.137.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.137.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.136.0
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.43.0
go.opentelemetry.io/collector/component/componenttest v0.137.0
Expand Down
2 changes: 2 additions & 0 deletions receiver/azuremonitorreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 21 additions & 11 deletions receiver/azuremonitorreceiver/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ type ClientOptionsResolver interface {
}

type clientOptionsResolver struct {
armOptions *arm.ClientOptions
azmetricsOptions *azmetrics.ClientOptions
cloud cloud.Configuration
}

// newClientOptionsResolver creates a resolver that will always return the same options.
// Unlike in the tests where there will be one option by API mock, here we don't need different options for each client.
// Note the fact that it recreates the options each time. It's because the options are mutable, they can be modified by the client ctor.
func newClientOptionsResolver(cloudStr string) ClientOptionsResolver {
var cloudToUse cloud.Configuration
switch cloudStr {
Expand All @@ -34,25 +34,35 @@ func newClientOptionsResolver(cloudStr string) ClientOptionsResolver {
default:
cloudToUse = cloud.AzurePublic
}
return &clientOptionsResolver{armOptions: &arm.ClientOptions{
ClientOptions: azcore.ClientOptions{
Cloud: cloudToUse,
},
}}
return &clientOptionsResolver{cloud: cloudToUse}
}

func (r *clientOptionsResolver) getClientOptions() azcore.ClientOptions {
return azcore.ClientOptions{
Cloud: r.cloud,
}
}

func (r *clientOptionsResolver) GetArmResourceClientOptions(_ string) *arm.ClientOptions {
return r.armOptions
return &arm.ClientOptions{
ClientOptions: r.getClientOptions(),
}
}

func (r *clientOptionsResolver) GetArmSubscriptionsClientOptions() *arm.ClientOptions {
return r.armOptions
return &arm.ClientOptions{
ClientOptions: r.getClientOptions(),
}
}

func (r *clientOptionsResolver) GetArmMonitorClientOptions() *arm.ClientOptions {
return r.armOptions
return &arm.ClientOptions{
ClientOptions: r.getClientOptions(),
}
}

func (r *clientOptionsResolver) GetAzMetricsClientOptions() *azmetrics.ClientOptions {
return r.azmetricsOptions
return &azmetrics.ClientOptions{
ClientOptions: r.getClientOptions(),
}
}
4 changes: 4 additions & 0 deletions receiver/azuremonitorreceiver/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func newMockClientOptionsResolver(
}
armResourcesClientOptions[subID] = &arm.ClientOptions{
ClientOptions: azcore.ClientOptions{
Cloud: cloud.AzurePublic, // Ensure Cloud client options is set. This is important to prevent race condition in the client constructor.
Transport: armresourcesfake.NewServerTransport(&resourceServer),
},
}
Expand All @@ -123,6 +124,7 @@ func newMockClientOptionsResolver(
}
armSubscriptionsClientOptions := &arm.ClientOptions{
ClientOptions: azcore.ClientOptions{
Cloud: cloud.AzurePublic, // Ensure Cloud client options is set. This is important to prevent race condition in the client constructor.
Transport: armsubscriptionsfake.NewServerTransport(&subscriptionsServer),
},
}
Expand All @@ -138,6 +140,7 @@ func newMockClientOptionsResolver(
}
armMonitorClientOptions := &arm.ClientOptions{
ClientOptions: azcore.ClientOptions{
Cloud: cloud.AzurePublic, // Ensure Cloud client options is set. This is important to prevent race condition in the client constructor.
Transport: armmonitorfake.NewServerFactoryTransport(&armMonitorServerFactory),
},
}
Expand All @@ -148,6 +151,7 @@ func newMockClientOptionsResolver(
}
azMetricsClientOptions := &azmetrics.ClientOptions{
ClientOptions: azcore.ClientOptions{
Cloud: cloud.AzurePublic, // Ensure Cloud client options is set. This is important to prevent race condition in the client constructor.
Transport: azmetricsfake.NewServerTransport(&azMetricsServer),
},
}
Expand Down
Loading