Skip to content

Add ability to refresh service account token to AWS Container Insights Kueue Receiver #284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion receiver/awscontainerinsightskueuereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ func createMetricsReceiver(
consumer consumer.Metrics,
) (receiver.Metrics, error) {
rCfg := baseCfg.(*Config)
return newAWSContainerInsightReceiver(params.TelemetrySettings, rCfg, consumer)
return newAWSContainerInsightsKueueReceiver(params.TelemetrySettings, rCfg, consumer)
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type KueuePrometheusScraper struct {
host component.Host
clusterName string
prometheusReceiver receiver.Metrics
scrapeConfig *config.ScrapeConfig
running bool
}

Expand All @@ -83,6 +84,31 @@ func NewKueuePrometheusScraper(opts KueuePrometheusScraperOpts) (*KueuePrometheu
return nil, errors.New("cluster name cannot be empty")
}

scrapeConfig := GetScrapeConfig(
opts.ClusterName,
opts.BearerToken,
opts.TelemetrySettings.Logger,
)

scraper := &KueuePrometheusScraper{
ctx: opts.Ctx,
settings: opts.TelemetrySettings,
host: opts.Host,
clusterName: opts.ClusterName,
scrapeConfig: scrapeConfig,
}

promReceiver, err := scraper.createPrometheusReceiver(opts.Consumer)
if err != nil {
return nil, err
}

scraper.prometheusReceiver = promReceiver

return scraper, nil
}

func GetScrapeConfig(clusterName string, bearerToken string, logger *zap.Logger) *config.ScrapeConfig {
scrapeConfig := &config.ScrapeConfig{
HTTPClientConfig: configutil.HTTPClientConfig{
TLSConfig: configutil.TLSConfig{
Expand Down Expand Up @@ -111,39 +137,16 @@ func NewKueuePrometheusScraper(opts KueuePrometheusScraperOpts) (*KueuePrometheu
},
},
},
MetricRelabelConfigs: GetKueueRelabelConfigs(opts.ClusterName),
MetricRelabelConfigs: GetKueueRelabelConfigs(clusterName),
}

if opts.BearerToken != "" {
scrapeConfig.HTTPClientConfig.BearerToken = configutil.Secret(opts.BearerToken)
if bearerToken != "" {
scrapeConfig.HTTPClientConfig.BearerToken = configutil.Secret(bearerToken)
} else {
opts.TelemetrySettings.Logger.Warn("bearer token is not set, kueue metrics will not be published")
}

promConfig := prometheusreceiver.Config{
PrometheusConfig: &prometheusreceiver.PromConfig{
ScrapeConfigs: []*config.ScrapeConfig{scrapeConfig},
},
}

params := receiver.Settings{
ID: component.MustNewID(kmJobName),
TelemetrySettings: opts.TelemetrySettings,
}

promFactory := prometheusreceiver.NewFactory()
promReceiver, err := promFactory.CreateMetrics(opts.Ctx, params, &promConfig, opts.Consumer)
if err != nil {
return nil, fmt.Errorf("failed to create prometheus receiver for kueue metrics: %w", err)
logger.Warn("bearer token is not set, kueue metrics will not be published")
}

return &KueuePrometheusScraper{
ctx: opts.Ctx,
settings: opts.TelemetrySettings,
host: opts.Host,
clusterName: opts.ClusterName,
prometheusReceiver: promReceiver,
}, nil
return scrapeConfig
}

func GetKueueRelabelConfigs(clusterName string) []*relabel.Config {
Expand Down Expand Up @@ -193,6 +196,27 @@ func GetKueueRelabelConfigs(clusterName string) []*relabel.Config {
return relabelConfigs
}

func (kps *KueuePrometheusScraper) createPrometheusReceiver(consumer consumer.Metrics) (receiver.Metrics, error) {
promConfig := prometheusreceiver.Config{
PrometheusConfig: &prometheusreceiver.PromConfig{
ScrapeConfigs: []*config.ScrapeConfig{kps.scrapeConfig},
},
}

params := receiver.Settings{
ID: component.MustNewID(kmJobName),
TelemetrySettings: kps.settings,
}

promFactory := prometheusreceiver.NewFactory()
promReceiver, err := promFactory.CreateMetrics(kps.ctx, params, &promConfig, consumer)
if err != nil {
return nil, fmt.Errorf("failed to create prometheus receiver for kueue metrics: %w", err)
}

return promReceiver, nil
}

func (kps *KueuePrometheusScraper) GetMetrics() []pmetric.Metrics {
// This method will never return metrics because the metrics are collected by the scraper.

Expand All @@ -209,6 +233,7 @@ func (kps *KueuePrometheusScraper) GetMetrics() []pmetric.Metrics {

func (kps *KueuePrometheusScraper) Shutdown() {
if kps.running {
kps.settings.Logger.Info("Shutting down the Kueue metrics scraper")
err := kps.prometheusReceiver.Shutdown(kps.ctx)
if err != nil {
kps.settings.Logger.Error("Unable to shutdown Kueue PrometheusReceiver", zap.Error(err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func TestNewKueuePrometheusScraperEndToEnd(t *testing.T) {
Consumer: mConsumer,
Host: componenttest.NewNopHost(),
ClusterName: "DummyCluster",
BearerToken: "",
},
)
assert.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package tokenprovider // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightskueuereceiver/internal/tokenprovider"

import (
"fmt"
"os"
"strings"
)

const (
serviceAccountTokenDefaultPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
)

type BearerTokenProvider struct {
tokenGeneration int
loadedToken string
RetrieveToken func() (string, error)
}

func NewBearerTokenProvider() *BearerTokenProvider {
var provider *BearerTokenProvider = &BearerTokenProvider{
tokenGeneration: 0,
loadedToken: "",
RetrieveToken: defaultRetrieveToken,
}

return provider
}

func (provider *BearerTokenProvider) GetToken() (string, error) {
newToken, err := provider.RetrieveToken()
if err != nil {
return "", err
}
if newToken != provider.loadedToken {
provider.tokenGeneration += 1
provider.loadedToken = newToken
}
return provider.loadedToken, nil
}

func (provider *BearerTokenProvider) TokenGeneration() int {
return provider.tokenGeneration
}

func defaultRetrieveToken() (string, error) {
return retrieveTokenFromFileSystem(serviceAccountTokenDefaultPath)
}

func retrieveTokenFromFileSystem(tokenPath string) (string, error) {
tokenBytes, err := os.ReadFile(tokenPath)
if err != nil {
return "", fmt.Errorf("failed to read bearer token: %w", err)
}
return strings.TrimSpace(string(tokenBytes)), nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package tokenprovider // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightskueuereceiver/internal/tokenprovider"

import (
"fmt"
"os"
"path/filepath"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewBearerTokenProvider(t *testing.T) {
testCases := []struct {
caseName string
}{
{
caseName: "Success Case",
},
}
for _, testCase := range testCases {
t.Run(testCase.caseName, func(t *testing.T) {
var testProvider *BearerTokenProvider = NewBearerTokenProvider()

assert.NotNil(t, testProvider)
assert.Equal(t, 0, testProvider.tokenGeneration)
assert.Empty(t, testProvider.loadedToken)

expectedPtr := reflect.ValueOf(defaultRetrieveToken).Pointer()
actualPtr := reflect.ValueOf(testProvider.RetrieveToken).Pointer()
assert.Equal(t, expectedPtr, actualPtr)
})
}
}

func TestRetrieveToken(t *testing.T) {
var mockToken string = "dummy-token"
var mockError error = fmt.Errorf("dummy error")

testCases := []struct {
caseName string
initialToken string
mockProvider func() (string, error)
expectedError error
expectedToken string
expectedGeneration int
}{
{
caseName: "New Token Case",
initialToken: "",
mockProvider: func() (string, error) {
return mockToken, nil
},
expectedError: nil,
expectedToken: mockToken,
expectedGeneration: 1,
},
{
caseName: "Same Token Case",
initialToken: mockToken,
mockProvider: func() (string, error) {
return mockToken, nil
},
expectedError: nil,
expectedToken: mockToken,
expectedGeneration: 0,
},
{
caseName: "Error Case",
initialToken: "",
mockProvider: func() (string, error) {
return "", mockError
},
expectedError: mockError,
expectedToken: "",
expectedGeneration: 0,
},
}

for _, testCase := range testCases {
t.Run(testCase.caseName, func(t *testing.T) {
var testProvider *BearerTokenProvider = &BearerTokenProvider{
tokenGeneration: 0,
loadedToken: testCase.initialToken,
RetrieveToken: testCase.mockProvider,
}

assert.Equal(t, 0, testProvider.tokenGeneration)
assert.Equal(t, testCase.initialToken, testProvider.loadedToken)

yieldedToken, err := testProvider.GetToken()

assert.Equal(t, testCase.expectedGeneration, testProvider.tokenGeneration)
assert.Equal(t, testCase.expectedToken, yieldedToken)
assert.Equal(t, testCase.expectedToken, testProvider.loadedToken)
assert.Equal(t, testCase.expectedGeneration, testProvider.tokenGeneration)
assert.Equal(t, testCase.expectedError, err)
})
}
}

func TestTokenGeneration(t *testing.T) {
provider := &BearerTokenProvider{
tokenGeneration: 42,
}

if gen := provider.TokenGeneration(); gen != 42 {
t.Errorf("Expected token generation 42, got %d", gen)
}
}

func TestRetrieveTokenFromFileSystem2(t *testing.T) {
var tmpDir string = t.TempDir()
var dummyToken string = "dummy-token-content"
var dummyTokenPath string = filepath.Join(tmpDir, "test-token")

err := os.WriteFile(dummyTokenPath, []byte(dummyToken+"\n"), 0600)
if err != nil {
t.Fatalf("Failed to create test token file: %v", err)
}

testCases := []struct {
caseName string
tokenPath string
expectedToken string
errorExpected bool
}{
{
caseName: "Success Case",
tokenPath: dummyTokenPath,
expectedToken: dummyToken,
errorExpected: false,
},
{
caseName: "Nonexistent File Case",
tokenPath: filepath.Join(tmpDir, "nonexistent-token"),
expectedToken: "",
errorExpected: true,
},
}

for _, testCase := range testCases {
t.Run(testCase.caseName, func(t *testing.T) {
token, err := retrieveTokenFromFileSystem(testCase.tokenPath)
assert.Equal(t, testCase.expectedToken, token)
if testCase.errorExpected {
assert.NotNil(t, err)
} else {
assert.Nil(t, err)
}
})
}
}
Loading
Loading