Skip to content

Add cert watcher for target allocator TLS config #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/promet
go 1.22.0

require (
github.com/fsnotify/fsnotify v1.8.0
github.com/go-kit/log v0.2.1
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
Expand Down Expand Up @@ -72,7 +73,6 @@ require (
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
89 changes: 82 additions & 7 deletions receiver/prometheusreceiver/targetallocator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sort"
"time"

"github.com/fsnotify/fsnotify"
commonconfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
promconfig "github.com/prometheus/prometheus/config"
Expand All @@ -36,6 +37,9 @@ type Manager struct {
scrapeManager *scrape.Manager
discoveryManager *discovery.Manager
enableNativeHistograms bool
watcher *fsnotify.Watcher
host component.Host
httpClient *http.Client
}

func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config, enableNativeHistograms bool) *Manager {
Expand All @@ -52,6 +56,8 @@ func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config,
func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Manager, dm *discovery.Manager) error {
m.scrapeManager = sm
m.discoveryManager = dm
m.host = host

err := m.applyCfg()
if err != nil {
m.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
Expand All @@ -61,23 +67,27 @@ func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Man
// the target allocator is disabled
return nil
}
httpClient, err := m.cfg.ClientConfig.ToClient(ctx, host, m.settings.TelemetrySettings)
if err != nil {
m.settings.Logger.Error("Failed to create http client", zap.Error(err))
if err = m.setHTTPClient(ctx); err != nil {
return err
}
m.settings.Logger.Info("Starting target allocator discovery")
// immediately sync jobs, not waiting for the first tick
savedHash, err := m.sync(uint64(0), httpClient)
savedHash, err := m.sync(ctx, uint64(0))
if err != nil {
m.settings.Logger.Error("Failed to sync target allocator", zap.Error(err))
}

// Setup fsnotify watchers for TLS files
if err := m.setupTLSWatchers(ctx); err != nil {
m.settings.Logger.Error("Error setting up TLS watchers", zap.Error(err))
}

go func() {
targetAllocatorIntervalTicker := time.NewTicker(m.cfg.Interval)
for {
select {
case <-targetAllocatorIntervalTicker.C:
hash, newErr := m.sync(savedHash, httpClient)
hash, newErr := m.sync(ctx, savedHash)
if newErr != nil {
m.settings.Logger.Error(newErr.Error())
continue
Expand All @@ -95,15 +105,80 @@ func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Man

func (m *Manager) Shutdown() {
close(m.shutdown)
if m.watcher != nil {
if err := m.watcher.Close(); err != nil {
m.settings.Logger.Warn("Error closing fsnotify watcher", zap.Error(err))
}
}
}

// setupTLSWatchers creates one fsnotify watcher and adds CAFile, CertFile, KeyFile
// so that we automatically re‐sync whenever they change.
func (m *Manager) setupTLSWatchers(ctx context.Context) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("failed to create fsnotify watcher: %w", err)
}
m.watcher = watcher

addFile := func(path string) {
if path == "" {
return
}
if err := watcher.Add(path); err != nil {
m.settings.Logger.Error("Failed to watch TLS file", zap.String("file", path), zap.Error(err))
}
}

addFile(m.cfg.TLSSetting.CAFile)
addFile(m.cfg.TLSSetting.CertFile)
addFile(m.cfg.TLSSetting.KeyFile)

go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename|fsnotify.Remove) != 0 {
m.settings.Logger.Info("TLS file changed; re-syncing",
zap.String("file", event.Name), zap.String("op", event.Op.String()))
if err := m.setHTTPClient(ctx); err != nil {
m.settings.Logger.Error("Failed to sync after TLS file change", zap.Error(err))
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
m.settings.Logger.Error("fsnotify watcher error", zap.Error(err))
case <-m.shutdown:
return
}
}
}()

return nil
}

func (m *Manager) setHTTPClient(ctx context.Context) error {
var err error
m.httpClient, err = m.cfg.ClientConfig.ToClient(ctx, m.host, m.settings.TelemetrySettings)
if err != nil {
m.settings.Logger.Error("Failed to create http client", zap.Error(err))
return err
}
return nil
}

// sync request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash.
// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs.
func (m *Manager) sync(compareHash uint64, httpClient *http.Client) (uint64, error) {
func (m *Manager) sync(ctx context.Context, compareHash uint64) (uint64, error) {
m.settings.Logger.Debug("Syncing target allocator jobs")
m.settings.Logger.Debug("endpoint", zap.String("endpoint", m.cfg.Endpoint))

scrapeConfigsResponse, err := getScrapeConfigsResponse(httpClient, m.cfg.Endpoint)
scrapeConfigsResponse, err := getScrapeConfigsResponse(m.httpClient, m.cfg.Endpoint)
if err != nil {
m.settings.Logger.Error("Failed to retrieve job list", zap.Error(err))
return 0, err
Expand Down
Loading