Skip to content

Add cert watcher for target allocator TLS config #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 13, 2025
Merged
81 changes: 73 additions & 8 deletions receiver/prometheusreceiver/targetallocator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sort"
"time"

"github.com/fsnotify/fsnotify"
commonconfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
promconfig "github.com/prometheus/prometheus/config"
Expand All @@ -36,6 +37,8 @@ type Manager struct {
scrapeManager *scrape.Manager
discoveryManager *discovery.Manager
enableNativeHistograms bool
watcher *fsnotify.Watcher
host component.Host
}

func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config, enableNativeHistograms bool) *Manager {
Expand All @@ -52,6 +55,8 @@ func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config,
func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Manager, dm *discovery.Manager) error {
m.scrapeManager = sm
m.discoveryManager = dm
m.host = host

err := m.applyCfg()
if err != nil {
m.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err))
Expand All @@ -61,23 +66,24 @@ func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Man
// the target allocator is disabled
return nil
}
httpClient, err := m.cfg.ClientConfig.ToClient(ctx, host, m.settings.TelemetrySettings)
if err != nil {
m.settings.Logger.Error("Failed to create http client", zap.Error(err))
return err
}
m.settings.Logger.Info("Starting target allocator discovery")
// immediately sync jobs, not waiting for the first tick
savedHash, err := m.sync(uint64(0), httpClient)
savedHash, err := m.sync(ctx, uint64(0))
if err != nil {
m.settings.Logger.Error("Failed to sync target allocator", zap.Error(err))
}

// Setup fsnotify watchers for TLS files
if err := m.setupTLSWatchers(ctx); err != nil {
m.settings.Logger.Error("Error setting up TLS watchers", zap.Error(err))
}

go func() {
targetAllocatorIntervalTicker := time.NewTicker(m.cfg.Interval)
for {
select {
case <-targetAllocatorIntervalTicker.C:
hash, newErr := m.sync(savedHash, httpClient)
hash, newErr := m.sync(ctx, savedHash)
if newErr != nil {
m.settings.Logger.Error(newErr.Error())
continue
Expand All @@ -95,14 +101,73 @@ func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Man

func (m *Manager) Shutdown() {
close(m.shutdown)
if err := m.watcher.Close(); err != nil {
m.settings.Logger.Warn("Error closing fsnotify watcher", zap.Error(err))
}
}

// setupTLSWatchers creates one fsnotify watcher and adds CAFile, CertFile, KeyFile
// so that we automatically re‐sync whenever they change.
func (m *Manager) setupTLSWatchers(ctx context.Context) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("failed to create fsnotify watcher: %w", err)
}
m.watcher = watcher

addFile := func(path string) {
if path == "" {
return
}
if err := watcher.Add(path); err != nil {
m.settings.Logger.Error("Failed to watch TLS file", zap.String("file", path), zap.Error(err))
}
}

addFile(m.cfg.TLSSetting.CAFile)
addFile(m.cfg.TLSSetting.CertFile)
addFile(m.cfg.TLSSetting.KeyFile)

go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename|fsnotify.Remove) != 0 {
m.settings.Logger.Info("TLS file changed; re-syncing",
zap.String("file", event.Name), zap.String("op", event.Op.String()))
if _, err := m.sync(ctx, uint64(0)); err != nil {
m.settings.Logger.Error("Failed to sync after TLS file change", zap.Error(err))
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
m.settings.Logger.Error("fsnotify watcher error", zap.Error(err))
case <-m.shutdown:
return
}
}
}()

return nil
}

// sync request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash.
// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs.
func (m *Manager) sync(compareHash uint64, httpClient *http.Client) (uint64, error) {
func (m *Manager) sync(ctx context.Context, compareHash uint64) (uint64, error) {
m.settings.Logger.Debug("Syncing target allocator jobs")
m.settings.Logger.Debug("endpoint", zap.String("endpoint", m.cfg.Endpoint))

httpClient, err := m.cfg.ClientConfig.ToClient(ctx, m.host, m.settings.TelemetrySettings)
if err != nil {
m.settings.Logger.Error("Failed to create http client", zap.Error(err))
return 0, err
}

scrapeConfigsResponse, err := getScrapeConfigsResponse(httpClient, m.cfg.Endpoint)
if err != nil {
m.settings.Logger.Error("Failed to retrieve job list", zap.Error(err))
Expand Down
Loading