Skip to content
Draft
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
f8a5c3b
WIP - add OIDC support to Kafka auth
richscott Jul 21, 2025
fa2ffeb
WIP - add OIDC support to Kafka auth
richscott Jul 21, 2025
0b1125a
Move OIDC client and test into sub-package.
richscott Jul 22, 2025
bd6fdcf
Merge branch 'feature/kafa-openid-auth' of github.com:richscott/opent…
richscott Jul 22, 2025
eca5df4
Start new implementation of OIDC server.
richscott Jul 22, 2025
65393c3
golangci-lint fixes; fixes to use new OIDC server
richscott Jul 22, 2025
6a6e112
Get OIDC mock server goroutine working.
richscott Jul 23, 2025
379fb01
Reduce wait for mock server startup to 100ms.
richscott Jul 23, 2025
57f9113
Set our expiresIn from either token's ExpiresIn or Expiry
richscott Jul 25, 2025
360412c
Merge branch 'main' into feature/kafa-openid-auth
richscott Jul 25, 2025
31f0a0f
Simplify mock OIDC server invocation args; remove dead code
richscott Jul 25, 2025
c7f7018
Use dynamic port allocation for mock OIDC server
richscott Jul 28, 2025
f965e0f
Add test to verify token-caching.
richscott Jul 28, 2025
6b14e44
Add test for expired token; lint fixes
richscott Jul 28, 2025
c6c24cd
Merge branch 'main' into feature/kafa-openid-auth
richscott Jul 28, 2025
f0fed8f
Tidy up go.mod and go.sum
richscott Jul 28, 2025
e5a79a5
Remove global expireSecs, pass down into handler and generator
richscott Jul 29, 2025
a1fc704
Start background refresher when OIDC provider is created
richscott Jul 29, 2025
d17757a
Merge branch 'main' into feature/kafa-openid-auth
richscott Jul 29, 2025
ac3f072
Merge branch 'main' into feature/kafa-openid-auth
richscott Jul 30, 2025
462d478
Use a var for token expire; add refreshAhead test
richscott Jul 30, 2025
ed1a135
Add more assertions for refreshAhead test.
richscott Aug 1, 2025
cfb5165
Merge branch 'main' into feature/kafa-openid-auth
richscott Aug 1, 2025
d924d93
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 4, 2025
8a33c50
Add wiring struct and logic for configureSASL()
richscott Aug 6, 2025
5b023a4
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 6, 2025
99ad70e
Remove old unused MockOauthProvider type and member func
richscott Aug 6, 2025
4f29b83
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 6, 2025
811b583
Merge branch 'feature/kafka-oidc-auth' of github.com:richscott/opente…
richscott Aug 6, 2025
5ae0501
Fix missing oauth2 module in kafkametricsreceiver package
richscott Aug 6, 2025
1b30400
Un-camelcase variable name, per golangci-lint.
richscott Aug 7, 2025
16db2ef
Add oauth2 module to Kafka exporter and observer
richscott Aug 7, 2025
a178305
Use separate err var for server shutdown.
richscott Aug 7, 2025
425986a
Add cancellation of background token refresher goroutine
richscott Aug 8, 2025
9dec276
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 8, 2025
9d39786
Add initial draft of change log notes.
richscott Aug 8, 2025
b07483c
Finish change log entries.
richscott Aug 8, 2025
4009f94
Remove debugging messages.
richscott Aug 8, 2025
3aa6ab8
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 8, 2025
8f98841
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 11, 2025
a31a5e3
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 12, 2025
6585885
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 13, 2025
d7f977e
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 15, 2025
2ab594c
Fix formatting warning from golangci-lint.
richscott Aug 15, 2025
bd388ce
Set and use refreshCoolDown on OIDC token provider.
richscott Aug 19, 2025
114f1ea
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 19, 2025
bbd3f72
Fix go.mod for kafkareceiver
richscott Aug 19, 2025
842cf07
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 22, 2025
f05b1c3
Merge branch 'main' into feature/kafka-oidc-auth
richscott Aug 25, 2025
8a4705d
Merge branch 'main' into feature/kafka-oidc-auth
richscott Sep 2, 2025
28ac00d
Merge branch 'main' into feature/kafka-oidc-auth
richscott Sep 8, 2025
694ab48
Rewrite and simplify OIDC token background refresher logic.
richscott Sep 12, 2025
b0686f8
Merge branch 'main' into feature/kafka-oidc-auth
richscott Sep 12, 2025
339869c
Merge branch 'main' into feature/kafka-oidc-auth
richscott Sep 15, 2025
50a4bd7
Use t.Context() in kafka OIDC client tests.
richscott Sep 15, 2025
ac02343
Merge branch 'main' into feature/kafka-oidc-auth
richscott Sep 23, 2025
bb5857e
Merge branch 'main' into feature/kafka-oidc-auth
richscott Sep 29, 2025
3478222
Merge branch 'main' into feature/kafka-oidc-auth
richscott Oct 13, 2025
f13d0a5
Migrate OIDC file-based token auth to use franz client
richscott Oct 14, 2025
a7d360d
Merge branch 'main' into feature/kafka-oidc-auth
richscott Oct 14, 2025
d82cabf
Merge branch 'main' into feature/kafka-oidc-auth
richscott Oct 15, 2025
4063b85
Rework if/else to switch to fix lint error.
richscott Oct 16, 2025
2ede506
Replace logic in internal/kafka/oidc/* with existing oauth2 extension
richscott Oct 16, 2025
e366d03
Update description of NewOIDCTokenProvider func.
richscott Oct 17, 2025
05e3ac5
Merge branch 'main' into feature/kafka-oidc-auth
richscott Oct 17, 2025
7c5a5ae
Move internal/kafka/oidc/* to internal/kafka
richscott Oct 17, 2025
bdd2b46
Merge branch 'main' into feature/kafka-oidc-auth
richscott Oct 17, 2025
d91ca1e
go.mod fixes for each kafka component
richscott Oct 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .chloggen/feature_kafka-oidc-auth.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafka

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: This change adds support for authentication via OIDC to the Kafka client.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [41872]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
It provides an implementation of the sarama.AccessTokenProvider interface, supporting the
client_credentials Grant Type, and a background token refresh.
Comment on lines +19 to +20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI franz-go will soon be the default for both receiver and exporter, and when all the kinks are ironed out (in the not too distant future) the Sarama client will be removed.

Copy link
Author

@richscott richscott Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@axw Thanks, and I agree completely with all your points/questions. This whole effort was animated by a request from another team of my employer, and they suggested this implementation to extend the Sarama client mechanism to do OIDC, and notably, to get the client_secret from a file (in usage scenario of using a Kubernetes pod secret file, which is dynamic and rotated frequently). I believe they may be using an older revision that does not have these facilities. I see now that the oauth2clientextension already (very closely) has this support, including client_id_file and client_secret_file. (Full disclosure: I'm new to this project/repo, and working with OIDC as well.) Also, as the franz-go client migration will occur soon(ish), I'll go ahead and change this code to use that in lieu of Sarama. I think I can excise a lot of this, work through the call-chain from the franz_client.go constructors, and then modify configkafka.SASLConfig and its users to use the oauth2clientextension.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @richscott! That sounds good. I'll move this PR back to draft in the mean time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @axw - I have substantially reworked the branch, as you suggested:

  • The OIDC logic has been cut down, the code that maintained a background goroutine to periodically refresh tokens, etc - all that has been ripped out and replaced by references to types/functions in the repo's extension/oauth2clientauthextension package, so the changes are considerably smaller/simpler. I had to change a couple small things in the oauth2clientauthextension package code - make a public ClientCredentialsConfig so it was available to internal/kafka/oidc_client.go, added a TokenSource() constructor, etc, but other than that I have not modified anything in oauth2clientauthextension.
  • The OIDC code remaining in internal/kafka now uses franz-go in lieu of the soon-deprecated sarama.
  • I had to add a replace directive to the go.mod in each kafka-related component package in the repo, so the compiler would find and use the latest in the repo source, rather than downloading, so it would build.
  • I made a few other miscellaneous tweaks to satisfying golangci-lint.

Running the golint and gotest Make targets are successful, and an external toy integration program I wrote to just verify that the client does indeed connect with a OIDC token to a Kafka instance, do a few basic list/delete/create topic operations, sleep for a bit, then repeat infinitely, does work correctly - it refreshes the token as needed, etc.

Probably there will be more work needed on this - I will mark this ready for review again and welcome comment/suggestions from everyone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richscott thanks for the changes. I'm glad to see most of the logic now lives in oauth2clientauthextension. I'd like to see if we can go even further.

What I have in mind is that users will configure oauth2clientauthextension independently of the Kafka components, and then reference it via a new config attribute in the Kafka components. Something like this:

extensions:
  oauth2client:
    client_id_file: /path/to/client_id_file
    client_secret: /path/to/client_secret_file

exporters:
  kafka:
    auth:
      sasl:
        mechanism: OAUTHBEARER
        oauthbearer_token_source: oauth2client

The "oauthbearer_token_source" config setting would be the ID of an extension that implements a token source interface along these lines:

import "golang.org/x/oauth2"

type TokenSource interface {
    Token(context.Context) (*oauth2.Token, error)
}

This way we're not duplicating any oauth2 config, and would be closer to how the oauth2client extension works with HTTP/gRPC-based exporters -- see https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ require (
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.41.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.29.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
Expand Down
4 changes: 4 additions & 0 deletions exporter/kafkaexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions extension/observer/kafkatopicsobserver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ require (
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.41.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
Expand Down
4 changes: 4 additions & 0 deletions extension/observer/kafkatopicsobserver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions internal/kafka/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"context"
"crypto/sha256"
"crypto/sha512"
"time"

"github.com/IBM/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/oidc"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka"
)

Expand Down Expand Up @@ -57,6 +59,12 @@ func configureSASL(ctx context.Context, config configkafka.SASLConfig, saramaCon
case AWSMSKIAMOAUTHBEARER:
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
saramaConfig.Net.SASL.TokenProvider = &awsMSKTokenProvider{ctx: ctx, region: config.AWSMSK.Region}
case OIDCFILE:
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
saramaConfig.Net.SASL.TokenProvider, _ = oidc.NewOIDCfileTokenProvider(ctx, saramaConfig.ClientID,
config.OIDCFILE.ClientSecretFilePath, config.OIDCFILE.TokenURL,
config.OIDCFILE.Scopes, time.Duration(config.OIDCFILE.RefreshAheadSecs)*time.Second,
config.OIDCFILE.EndPointParams, config.OIDCFILE.AuthStyle)
}
}

Expand Down
38 changes: 38 additions & 0 deletions internal/kafka/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
package kafka

import (
"net/url"
"testing"
"time"

"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"golang.org/x/oauth2"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/oidc"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka"
)

Expand Down Expand Up @@ -51,6 +55,16 @@ func TestAuthentication(t *testing.T) {
region: "region",
}

saramaSASLOIDCFILEConfig := &sarama.Config{}
saramaSASLOIDCFILEConfig.Net.SASL.Enable = true
saramaSASLOIDCFILEConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
// Specify 0 seconds for the RefreshAhead, as we don't want to have it launch
// the background refresher goroutine - we are just verifying authentication
// configuration setup here.
saramaSASLOIDCFILEConfig.Net.SASL.TokenProvider, _ = oidc.NewOIDCfileTokenProvider(
t.Context(), saramaSASLOIDCFILEConfig.ClientID, "/etc/hosts", "http://127.0.0.1:3000/oidc",
[]string{"mock-scope"}, time.Duration(0)*time.Second, url.Values{}, oauth2.AuthStyleAutoDetect)

saramaKerberosCfg := &sarama.Config{}
saramaKerberosCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
saramaKerberosCfg.Net.SASL.Enable = true
Expand Down Expand Up @@ -138,6 +152,22 @@ func TestAuthentication(t *testing.T) {
},
saramaConfig: saramaSASLAWSIAMOAUTHConfig,
},
{
auth: configkafka.AuthenticationConfig{
SASL: &configkafka.SASLConfig{
Mechanism: "OIDCFILE",
OIDCFILE: configkafka.OIDCFileConfig{
ClientSecretFilePath: "/etc/hosts",
TokenURL: "http://127.0.0.1:3000/oidc",
Scopes: []string{"mock-scope"},
RefreshAheadSecs: 0,
EndPointParams: url.Values{},
AuthStyle: oauth2.AuthStyleAutoDetect,
},
},
},
saramaConfig: saramaSASLOIDCFILEConfig,
},
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
Expand All @@ -146,6 +176,14 @@ func TestAuthentication(t *testing.T) {

// equalizes SCRAMClientGeneratorFunc to do assertion with the same reference.
config.Net.SASL.SCRAMClientGeneratorFunc = test.saramaConfig.Net.SASL.SCRAMClientGeneratorFunc

// For OIDC token provider, we need to compare fields individually since the context
// contains non-deterministic channels that will cause the comparison to fail
if config.Net.SASL.TokenProvider != nil && test.saramaConfig.Net.SASL.TokenProvider != nil {
// Set them to the same reference for comparison
config.Net.SASL.TokenProvider = test.saramaConfig.Net.SASL.TokenProvider
}

assert.Equal(t, test.saramaConfig, config)
})
}
Expand Down
2 changes: 2 additions & 0 deletions internal/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
if tlsConfig == nil {
tlsConfig = config.Authentication.TLS
}
if tlsConfig != nil {

Check failure on line 146 in internal/kafka/client.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

ifElseChain: rewrite if-else to switch statement (gocritic)

Check failure on line 146 in internal/kafka/client.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, internal)

ifElseChain: rewrite if-else to switch statement (gocritic)

Check failure on line 146 in internal/kafka/client.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, internal)

ifElseChain: rewrite if-else to switch statement (gocritic)
if tlsConfig, err := tlsConfig.LoadTLSConfig(ctx); err != nil {
return nil, err
} else if tlsConfig != nil {
Expand All @@ -153,6 +153,8 @@
} else if config.Authentication.SASL != nil && config.Authentication.SASL.Mechanism == "AWS_MSK_IAM_OAUTHBEARER" {
saramaConfig.Net.TLS.Config = &tls.Config{}
saramaConfig.Net.TLS.Enable = true
} else if config.Authentication.SASL != nil && config.Authentication.SASL.Mechanism == OIDCFILE {
saramaConfig.Net.TLS.Enable = true
}
configureSaramaAuthentication(ctx, config.Authentication, saramaConfig)
return saramaConfig, nil
Expand Down
1 change: 1 addition & 0 deletions internal/kafka/franz_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
SCRAMSHA256 = "SCRAM-SHA-256"
PLAIN = "PLAIN"
AWSMSKIAMOAUTHBEARER = "AWS_MSK_IAM_OAUTHBEARER" //nolint:gosec // These aren't credentials.
OIDCFILE = "OIDCFILE"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a mechanism name understood by anything else? How is this different to SASL/OAUTHBEARER?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a early-development mistake - I'll be definitely pulling this out.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now used, with the conversion of the other logic from sarama to franz-go, so it remains in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question was more about the choice of string, "OIDCFILE". I'm not aware of a SASL mechanism with that name, and couldn't find anything in searches. I think if we go with my other suggestion, we could use OAUTHBEARER.

)

// NewFranzSyncProducer creates a new Kafka client using the franz-go library.
Expand Down
3 changes: 3 additions & 0 deletions internal/kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go 1.24.0
require (
github.com/IBM/sarama v1.46.0
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4
github.com/golang-jwt/jwt/v5 v5.2.0
github.com/google/uuid v1.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka v0.135.0
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9
github.com/stretchr/testify v1.11.1
Expand All @@ -15,6 +17,7 @@ require (
go.opentelemetry.io/collector/config/configopaque v1.41.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/config/configtls v1.41.1-0.20250911155607-37a3ace6274c
go.uber.org/goleak v1.3.0
golang.org/x/oauth2 v0.30.0
)

require (
Expand Down
4 changes: 4 additions & 0 deletions internal/kafka/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading