diff --git a/.chloggen/feature_kafka-oidc-auth.yaml b/.chloggen/feature_kafka-oidc-auth.yaml new file mode 100644 index 0000000000000..74b3db6b897b7 --- /dev/null +++ b/.chloggen/feature_kafka-oidc-auth.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafka + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: This change adds support for authentication via OIDC to the Kafka client. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [41872] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + It provides an implementation of the sarama.AccessTokenProvider interface, supporting the + client_credentials Grant Type, and a background token refresh. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index acea0ae9e8280..91e4ec7a74d6b 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -46,6 +46,7 @@ require ( ) require ( + cloud.google.com/go/compute/metadata v0.7.0 // indirect github.com/apache/thrift v0.22.0 // indirect github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 // indirect github.com/aws/aws-sdk-go-v2 v1.36.4 // indirect @@ -92,6 +93,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.137.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/core/xidutils v0.137.0 // indirect github.com/openzipkin/zipkin-go v0.4.3 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect @@ -112,6 +114,7 @@ require ( go.opentelemetry.io/collector/consumer/consumertest v0.137.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.137.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4 // indirect + go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/extension/xextension v0.137.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/internal/telemetry v0.137.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/pdata/xpdata v0.137.1-0.20251013162618-a96eab114ea4 // indirect @@ -126,6 +129,7 @@ require ( go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.43.0 // indirect golang.org/x/net v0.46.0 // indirect + golang.org/x/oauth2 v0.31.0 // indirect golang.org/x/sys v0.37.0 // indirect golang.org/x/text v0.30.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect @@ -161,3 +165,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka => ../../pkg/kafka/configkafka + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension => ../../extension/oauth2clientauthextension diff --git a/exporter/kafkaexporter/go.sum b/exporter/kafkaexporter/go.sum index 21a868721e023..1e9d3941f619b 100644 --- a/exporter/kafkaexporter/go.sum +++ b/exporter/kafkaexporter/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go/compute/metadata v0.7.0 h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU= +cloud.google.com/go/compute/metadata v0.7.0/go.mod h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo= github.com/IBM/sarama v1.46.2 h1:65JJmZpxKUWe/7HEHmc56upTfAvgoxuyu4Ek+TcevDE= github.com/IBM/sarama v1.46.2/go.mod h1:PDOGmVeKmW744c/0d4CZ0MfrzmcIYtpmS5+KIWs1zHQ= github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc= @@ -63,6 +65,8 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= +github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -125,6 +129,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.137.0 h1:cKeFgcZf0mI7NL4NLB6c4nsQVZ5Gv14cg6wyB5BFsns= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.137.0/go.mod h1:GLGpfbWdN6n/I0WA+JwGdokRO9WcHDzCPojxmjK4yEI= github.com/openzipkin/zipkin-go v0.4.3 h1:9EGwpqkgnwdEIJ+Od7QVSEIH+ocmm5nPat0G7sjsSdg= github.com/openzipkin/zipkin-go v0.4.3/go.mod h1:M9wCJZFWCo2RiY+o1eBCEMe0Dp2S5LDHcMZmk3RmK7c= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= @@ -214,6 +220,8 @@ go.opentelemetry.io/collector/exporter/xexporter v0.137.1-0.20251013162618-a96ea go.opentelemetry.io/collector/exporter/xexporter v0.137.1-0.20251013162618-a96eab114ea4/go.mod h1:VVH3xbkykgHV+TIfyzRA+1ZLsMjb0FLQZmEphtPI63A= go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4 h1:ydWUlclX7E9H3AGvrayXMnplL7DZXcJShKbCp0fweEY= go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:GIzXTwB+7I3TpTx2Zppp/2884BNg1LaWe0zjumgV82Q= +go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4 h1:6BbCvOb/86AnIGn5J1vcNauoZ2cbloU4WscZJMjbkso= +go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:6Sh0hqPfPqpg0ErCoNPO/ky2NdfGmUX+G5wekPx7A7U= go.opentelemetry.io/collector/extension/extensiontest v0.137.0 h1:gnPF3HIOKqNk93XObt2x0WFvVfPtm76VggWe7LxgcaY= go.opentelemetry.io/collector/extension/extensiontest v0.137.0/go.mod h1:vVmKojdITYka9+iAi3aarxeMrO6kdlywKuf3d3c6lcI= go.opentelemetry.io/collector/extension/xextension v0.137.1-0.20251013162618-a96eab114ea4 h1:0DbV+ob2o19r03p7ZbacTeMqFi1B/ivpcmNBPGLkA+k= @@ -296,6 +304,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/oauth2 v0.31.0 h1:8Fq0yVZLh4j4YA47vHKFTa9Ew5XIrCP8LC6UeNZnLxo= +golang.org/x/oauth2 v0.31.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/extension/oauth2clientauthextension/clientcredentialsconfig.go b/extension/oauth2clientauthextension/clientcredentialsconfig.go index 8d40eec74553d..269ac82d9b6c0 100644 --- a/extension/oauth2clientauthextension/clientcredentialsconfig.go +++ b/extension/oauth2clientauthextension/clientcredentialsconfig.go @@ -15,7 +15,7 @@ import ( "golang.org/x/oauth2/clientcredentials" ) -// clientCredentialsConfig is a clientcredentials.Config wrapper to allow +// ClientCredentialsConfig is a clientcredentials.Config wrapper to allow // values read from files in the ClientID and ClientSecret fields. // // Values from files can be retrieved by populating the ClientIDFile or @@ -25,13 +25,21 @@ import ( // // Example - Retrieve secret from file: // -// cfg := clientCredentialsConfig{ -// Config: clientcredentials.Config{ +// cfg := ClientCredentialsConfig{ +// Config: Config{ // ClientID: "clientId", // ... // }, // ClientSecretFile: "/path/to/client/secret", // } +type ClientCredentialsConfig struct { + Config Config + + AuthStyle oauth2.AuthStyle + ExpiryBuffer int +} + +// clientCredentialsConfig is an internal version that embeds clientcredentials.Config type clientCredentialsConfig struct { clientcredentials.Config @@ -91,7 +99,25 @@ func (c *clientCredentialsConfig) createConfig() (*clientcredentials.Config, err }, nil } -func (c *clientCredentialsConfig) TokenSource(ctx context.Context) oauth2.TokenSource { +// TokenSource creates an oauth2.TokenSource from the exported ClientCredentialsConfig +func (c *ClientCredentialsConfig) TokenSource(ctx context.Context) oauth2.TokenSource { + internalConfig := &clientCredentialsConfig{ + Config: clientcredentials.Config{ + ClientID: c.Config.ClientID, + ClientSecret: string(c.Config.ClientSecret), + TokenURL: c.Config.TokenURL, + Scopes: c.Config.Scopes, + EndpointParams: c.Config.EndpointParams, + AuthStyle: c.AuthStyle, + }, + ClientIDFile: c.Config.ClientIDFile, + ClientSecretFile: c.Config.ClientSecretFile, + ExpiryBuffer: time.Duration(c.ExpiryBuffer) * time.Second, + } + return internalConfig.tokenSource(ctx) +} + +func (c *clientCredentialsConfig) tokenSource(ctx context.Context) oauth2.TokenSource { return oauth2.ReuseTokenSourceWithExpiry(nil, clientCredentialsTokenSource{ctx: ctx, config: c}, c.ExpiryBuffer) } diff --git a/extension/observer/kafkatopicsobserver/go.mod b/extension/observer/kafkatopicsobserver/go.mod index 5e634c6878c7f..5da41f91f97fd 100644 --- a/extension/observer/kafkatopicsobserver/go.mod +++ b/extension/observer/kafkatopicsobserver/go.mod @@ -19,6 +19,7 @@ require ( ) require ( + cloud.google.com/go/compute/metadata v0.7.0 // indirect github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 // indirect github.com/aws/aws-sdk-go-v2 v1.36.4 // indirect github.com/aws/aws-sdk-go-v2/config v1.29.16 // indirect @@ -63,6 +64,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.137.0 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect @@ -79,6 +81,7 @@ require ( go.opentelemetry.io/collector/config/configcompression v1.43.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/config/configopaque v1.43.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/config/configtls v1.43.1-0.20251013162618-a96eab114ea4 // indirect + go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/featuregate v1.43.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/internal/telemetry v0.137.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/pdata v1.43.1-0.20251013162618-a96eab114ea4 // indirect @@ -92,6 +95,7 @@ require ( go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.43.0 // indirect golang.org/x/net v0.46.0 // indirect + golang.org/x/oauth2 v0.31.0 // indirect golang.org/x/sys v0.37.0 // indirect golang.org/x/text v0.30.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect @@ -105,3 +109,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka => ../../../pkg/kafka/configkafka replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer => ../ + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension => ../../../extension/oauth2clientauthextension diff --git a/extension/observer/kafkatopicsobserver/go.sum b/extension/observer/kafkatopicsobserver/go.sum index 524dddaaa4878..b863a375b2e4d 100644 --- a/extension/observer/kafkatopicsobserver/go.sum +++ b/extension/observer/kafkatopicsobserver/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go/compute/metadata v0.7.0 h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU= +cloud.google.com/go/compute/metadata v0.7.0/go.mod h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo= github.com/IBM/sarama v1.46.2 h1:65JJmZpxKUWe/7HEHmc56upTfAvgoxuyu4Ek+TcevDE= github.com/IBM/sarama v1.46.2/go.mod h1:PDOGmVeKmW744c/0d4CZ0MfrzmcIYtpmS5+KIWs1zHQ= github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 h1:2jAwFwA0Xgcx94dUId+K24yFabsKYDtAhCgyMit6OqE= @@ -57,6 +59,8 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= +github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -180,6 +184,10 @@ go.opentelemetry.io/collector/confmap/xconfmap v0.137.1-0.20251013162618-a96eab1 go.opentelemetry.io/collector/confmap/xconfmap v0.137.1-0.20251013162618-a96eab114ea4/go.mod h1:psXdQr13pVrCqNPdoER2QZZorvONAR5ZUEHURe4POh4= go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4 h1:ydWUlclX7E9H3AGvrayXMnplL7DZXcJShKbCp0fweEY= go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:GIzXTwB+7I3TpTx2Zppp/2884BNg1LaWe0zjumgV82Q= +go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4 h1:6BbCvOb/86AnIGn5J1vcNauoZ2cbloU4WscZJMjbkso= +go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:6Sh0hqPfPqpg0ErCoNPO/ky2NdfGmUX+G5wekPx7A7U= +go.opentelemetry.io/collector/extension/extensiontest v0.137.1-0.20251013162618-a96eab114ea4 h1:DBMKb2WJVCjqa6hkDgVI2mgnzv/x/mx9JUA35U0k1YA= +go.opentelemetry.io/collector/extension/extensiontest v0.137.1-0.20251013162618-a96eab114ea4/go.mod h1:2wyfqLdE/kVOdVOnWxPBQlEnI2NV0bqGXJFRPSrtqy0= go.opentelemetry.io/collector/featuregate v1.43.1-0.20251013162618-a96eab114ea4 h1:IXK7EGifr3Lic3mnMlkVXFb1HBlkBoYfoy/AYzsevko= go.opentelemetry.io/collector/featuregate v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:d0tiRzVYrytB6LkcYgz2ESFTv7OktRPQe0QEQcPt1L4= go.opentelemetry.io/collector/internal/telemetry v0.137.1-0.20251013162618-a96eab114ea4 h1:E94fKydbFNkR6Zfhix/alvOftJx6e6srYQ8PIr9xYRw= @@ -244,6 +252,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/oauth2 v0.31.0 h1:8Fq0yVZLh4j4YA47vHKFTa9Ew5XIrCP8LC6UeNZnLxo= +golang.org/x/oauth2 v0.31.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/internal/kafka/authentication.go b/internal/kafka/authentication.go index da63cdf89b695..1b462379161d9 100644 --- a/internal/kafka/authentication.go +++ b/internal/kafka/authentication.go @@ -57,6 +57,12 @@ func configureSASL(ctx context.Context, config configkafka.SASLConfig, saramaCon case AWSMSKIAMOAUTHBEARER: saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth saramaConfig.Net.SASL.TokenProvider = &awsMSKTokenProvider{ctx: ctx, region: config.AWSMSK.Region} + case OIDCFILE: + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + saramaConfig.Net.SASL.TokenProvider, _ = NewOIDCTokenProvider(ctx, saramaConfig.ClientID, + config.OIDCFILE.ClientSecretFilePath, config.OIDCFILE.TokenURL, + config.OIDCFILE.Scopes, config.OIDCFILE.EndPointParams, + config.OIDCFILE.AuthStyle, config.OIDCFILE.ExpiryBuffer) } } diff --git a/internal/kafka/authentication_test.go b/internal/kafka/authentication_test.go index b5fae99ffae18..2c60f86ca4793 100644 --- a/internal/kafka/authentication_test.go +++ b/internal/kafka/authentication_test.go @@ -4,10 +4,12 @@ package kafka import ( + "net/url" "testing" "github.com/IBM/sarama" "github.com/stretchr/testify/assert" + "golang.org/x/oauth2" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka" ) @@ -51,6 +53,16 @@ func TestAuthentication(t *testing.T) { region: "region", } + saramaSASLOIDCFILEConfig := &sarama.Config{} + saramaSASLOIDCFILEConfig.Net.SASL.Enable = true + saramaSASLOIDCFILEConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + // Specify 0 seconds for the RefreshAhead, as we don't want to have it launch + // the background refresher goroutine - we are just verifying authentication + // configuration setup here. + saramaSASLOIDCFILEConfig.Net.SASL.TokenProvider, _ = NewOIDCTokenProvider( + t.Context(), saramaSASLOIDCFILEConfig.ClientID, "/etc/hosts", "http://127.0.0.1:3000/oidc", + []string{"mock-scope"}, url.Values{}, oauth2.AuthStyleAutoDetect, 0) + saramaKerberosCfg := &sarama.Config{} saramaKerberosCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI saramaKerberosCfg.Net.SASL.Enable = true @@ -138,6 +150,22 @@ func TestAuthentication(t *testing.T) { }, saramaConfig: saramaSASLAWSIAMOAUTHConfig, }, + { + auth: configkafka.AuthenticationConfig{ + SASL: &configkafka.SASLConfig{ + Mechanism: "OIDCFILE", + OIDCFILE: configkafka.OIDCFileConfig{ + ClientSecretFilePath: "/etc/hosts", + TokenURL: "http://127.0.0.1:3000/oidc", + Scopes: []string{"mock-scope"}, + RefreshAheadSecs: 0, + EndPointParams: url.Values{}, + AuthStyle: oauth2.AuthStyleAutoDetect, + }, + }, + }, + saramaConfig: saramaSASLOIDCFILEConfig, + }, } for _, test := range tests { t.Run("", func(t *testing.T) { @@ -146,6 +174,14 @@ func TestAuthentication(t *testing.T) { // equalizes SCRAMClientGeneratorFunc to do assertion with the same reference. config.Net.SASL.SCRAMClientGeneratorFunc = test.saramaConfig.Net.SASL.SCRAMClientGeneratorFunc + + // For OIDC token provider, we need to compare fields individually since the context + // contains non-deterministic channels that will cause the comparison to fail + if config.Net.SASL.TokenProvider != nil && test.saramaConfig.Net.SASL.TokenProvider != nil { + // Set them to the same reference for comparison + config.Net.SASL.TokenProvider = test.saramaConfig.Net.SASL.TokenProvider + } + assert.Equal(t, test.saramaConfig, config) }) } diff --git a/internal/kafka/client.go b/internal/kafka/client.go index 4722ca5aed4a7..41e292e7787c5 100644 --- a/internal/kafka/client.go +++ b/internal/kafka/client.go @@ -143,16 +143,19 @@ func newSaramaClientConfig(ctx context.Context, config configkafka.ClientConfig) if tlsConfig == nil { tlsConfig = config.Authentication.TLS } - if tlsConfig != nil { + switch { + case tlsConfig != nil: if tlsConfig, err := tlsConfig.LoadTLSConfig(ctx); err != nil { return nil, err } else if tlsConfig != nil { saramaConfig.Net.TLS.Config = tlsConfig saramaConfig.Net.TLS.Enable = true } - } else if config.Authentication.SASL != nil && config.Authentication.SASL.Mechanism == "AWS_MSK_IAM_OAUTHBEARER" { + case config.Authentication.SASL != nil && config.Authentication.SASL.Mechanism == "AWS_MSK_IAM_OAUTHBEARER": saramaConfig.Net.TLS.Config = &tls.Config{} saramaConfig.Net.TLS.Enable = true + case config.Authentication.SASL != nil && config.Authentication.SASL.Mechanism == OIDCFILE: + saramaConfig.Net.TLS.Enable = true } configureSaramaAuthentication(ctx, config.Authentication, saramaConfig) return saramaConfig, nil diff --git a/internal/kafka/franz_client.go b/internal/kafka/franz_client.go index 67685862572e1..7eee544be76e6 100644 --- a/internal/kafka/franz_client.go +++ b/internal/kafka/franz_client.go @@ -38,6 +38,7 @@ const ( SCRAMSHA256 = "SCRAM-SHA-256" PLAIN = "PLAIN" AWSMSKIAMOAUTHBEARER = "AWS_MSK_IAM_OAUTHBEARER" //nolint:gosec // These aren't credentials. + OIDCFILE = "OIDCFILE" ) // NewFranzSyncProducer creates a new Kafka client using the franz-go library. @@ -259,7 +260,7 @@ func commonOpts(ctx context.Context, clientCfg configkafka.ClientConfig, opts = append(opts, kgo.SASL(auth.AsMechanism())) } if clientCfg.Authentication.SASL != nil { - saslOpt, err := configureKgoSASL(clientCfg.Authentication.SASL) + saslOpt, err := configureKgoSASL(clientCfg.Authentication.SASL, clientCfg.ClientID) if err != nil { return nil, fmt.Errorf("failed to configure SASL: %w", err) } @@ -302,7 +303,7 @@ func commonOpts(ctx context.Context, clientCfg configkafka.ClientConfig, return opts, nil } -func configureKgoSASL(cfg *configkafka.SASLConfig) (kgo.Opt, error) { +func configureKgoSASL(cfg *configkafka.SASLConfig, clientID string) (kgo.Opt, error) { var m sasl.Mechanism switch cfg.Mechanism { case PLAIN: @@ -316,6 +317,19 @@ func configureKgoSASL(cfg *configkafka.SASLConfig) (kgo.Opt, error) { token, _, err := signer.GenerateAuthToken(ctx, cfg.AWSMSK.Region) return oauth.Auth{Token: token}, err }) + case OIDCFILE: + m = oauth.Oauth(func(ctx context.Context) (oauth.Auth, error) { + tokenProvider, cancel := NewOIDCTokenProvider(ctx, clientID, + cfg.OIDCFILE.ClientSecretFilePath, cfg.OIDCFILE.TokenURL, + cfg.OIDCFILE.Scopes, cfg.OIDCFILE.EndPointParams, + cfg.OIDCFILE.AuthStyle, cfg.OIDCFILE.ExpiryBuffer) + _ = cancel // Store cancel function for cleanup if needed + token, err := tokenProvider.GetToken() + if err != nil { + return oauth.Auth{}, err + } + return oauth.Auth{Token: token.AccessToken}, nil + }) default: return nil, fmt.Errorf("unsupported SASL mechanism: %s", cfg.Mechanism) } diff --git a/internal/kafka/go.mod b/internal/kafka/go.mod index 7591692c7707c..dbfb98e0e2d0c 100644 --- a/internal/kafka/go.mod +++ b/internal/kafka/go.mod @@ -5,6 +5,9 @@ go 1.24.0 require ( github.com/IBM/sarama v1.46.2 github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 + github.com/golang-jwt/jwt/v5 v5.2.0 + github.com/google/uuid v1.6.0 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.137.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka v0.137.0 github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 github.com/stretchr/testify v1.11.1 @@ -15,6 +18,32 @@ require ( go.opentelemetry.io/collector/config/configopaque v1.43.1-0.20251013162618-a96eab114ea4 go.opentelemetry.io/collector/config/configtls v1.43.1-0.20251013162618-a96eab114ea4 go.uber.org/goleak v1.3.0 + golang.org/x/oauth2 v0.31.0 +) + +require ( + cloud.google.com/go/compute/metadata v0.7.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector/component v1.43.1-0.20251013162618-a96eab114ea4 // indirect + go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4 // indirect + go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4 // indirect + go.opentelemetry.io/collector/internal/telemetry v0.137.1-0.20251013162618-a96eab114ea4 // indirect + go.opentelemetry.io/collector/pdata v1.43.1-0.20251013162618-a96eab114ea4 // indirect + go.opentelemetry.io/contrib/bridges/otelzap v0.13.0 // indirect + go.opentelemetry.io/otel v1.38.0 // indirect + go.opentelemetry.io/otel/log v0.14.0 // indirect + go.opentelemetry.io/otel/metric v1.38.0 // indirect + go.opentelemetry.io/otel/sdk v1.38.0 // indirect + go.opentelemetry.io/otel/trace v1.38.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect + google.golang.org/grpc v1.76.0 // indirect + google.golang.org/protobuf v1.36.10 // indirect ) require ( @@ -56,7 +85,6 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/twmb/franz-go v1.19.5 github.com/twmb/franz-go/pkg/kadm v1.16.1 github.com/twmb/franz-go/pkg/kmsg v1.12.0 @@ -75,4 +103,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension => ../../extension/oauth2clientauthextension + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka => ../../pkg/kafka/configkafka diff --git a/internal/kafka/go.sum b/internal/kafka/go.sum index 365e29daf8e9e..49aec5bee7a54 100644 --- a/internal/kafka/go.sum +++ b/internal/kafka/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go/compute/metadata v0.7.0 h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU= +cloud.google.com/go/compute/metadata v0.7.0/go.mod h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo= github.com/IBM/sarama v1.46.2 h1:65JJmZpxKUWe/7HEHmc56upTfAvgoxuyu4Ek+TcevDE= github.com/IBM/sarama v1.46.2/go.mod h1:PDOGmVeKmW744c/0d4CZ0MfrzmcIYtpmS5+KIWs1zHQ= github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 h1:2jAwFwA0Xgcx94dUId+K24yFabsKYDtAhCgyMit6OqE= @@ -46,6 +48,7 @@ github.com/foxboron/swtpm_test v0.0.0-20230726224112-46aaafdf7006 h1:50sW4r0Pcvl github.com/foxboron/swtpm_test v0.0.0-20230726224112-46aaafdf7006/go.mod h1:eIXCMsMYCaqq9m1KSSxXwQG11krpuNPGP3k0uaWrbas= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -56,12 +59,19 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= +github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA= github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/google/go-tpm-tools v0.4.4 h1:oiQfAIkc6xTy9Fl5NKTeTJkBTlXdHsxAofmQyxBKY98= github.com/google/go-tpm-tools v0.4.4/go.mod h1:T8jXkp2s+eltnCDIsXR84/MTcVU9Ja7bh3Mit0pa4AY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= @@ -86,6 +96,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= @@ -103,8 +115,10 @@ github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa1 github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= @@ -120,6 +134,7 @@ github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -146,11 +161,15 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/collector/component v1.43.1-0.20251013162618-a96eab114ea4 h1:w1VjKgmFktbRwdt5L1C4j6jL60sLhThh9dRRBqArUiA= go.opentelemetry.io/collector/component v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:LJ8w25mRyV1axguFAwX6NxKzh0sXK4pYVOn3dJvfVuk= +go.opentelemetry.io/collector/component/componenttest v0.137.1-0.20251013162618-a96eab114ea4 h1:GDUlSEmbp4yCEhOym78zazGX/5Cp8/OGfFNtk7M+T1w= +go.opentelemetry.io/collector/component/componenttest v0.137.1-0.20251013162618-a96eab114ea4/go.mod h1:08xR/WnVzcz8dz4TfPidNfQ6GsZ//mp9g6RvXgBMO/Q= go.opentelemetry.io/collector/config/configcompression v1.43.1-0.20251013162618-a96eab114ea4 h1:G+6htJtXnwqkOvoLbWZUDBSIOJw/AW31PilJLoYJKOQ= go.opentelemetry.io/collector/config/configcompression v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:ZlnKaXFYL3HVMUNWVAo/YOLYoxNZo7h8SrQp3l7GV00= go.opentelemetry.io/collector/config/configopaque v1.43.1-0.20251013162618-a96eab114ea4 h1:HtYVRIZuBE0vwbwqAGWDIdpbUTNmuP0DBlFZMt/MziY= @@ -161,24 +180,42 @@ go.opentelemetry.io/collector/confmap v1.43.1-0.20251013162618-a96eab114ea4 h1:3 go.opentelemetry.io/collector/confmap v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:N5GZpFCmwD1GynDu3IWaZW5Ycfc/7YxSU0q1/E3vLdg= go.opentelemetry.io/collector/confmap/xconfmap v0.137.1-0.20251013162618-a96eab114ea4 h1:yA2m7hdE+LELLWJ8BQWLV6PWrxqlI6Ni/u72A4Abp/E= go.opentelemetry.io/collector/confmap/xconfmap v0.137.1-0.20251013162618-a96eab114ea4/go.mod h1:psXdQr13pVrCqNPdoER2QZZorvONAR5ZUEHURe4POh4= +go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4 h1:ydWUlclX7E9H3AGvrayXMnplL7DZXcJShKbCp0fweEY= +go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:GIzXTwB+7I3TpTx2Zppp/2884BNg1LaWe0zjumgV82Q= +go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4 h1:6BbCvOb/86AnIGn5J1vcNauoZ2cbloU4WscZJMjbkso= +go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:6Sh0hqPfPqpg0ErCoNPO/ky2NdfGmUX+G5wekPx7A7U= +go.opentelemetry.io/collector/extension/extensiontest v0.137.1-0.20251013162618-a96eab114ea4 h1:DBMKb2WJVCjqa6hkDgVI2mgnzv/x/mx9JUA35U0k1YA= +go.opentelemetry.io/collector/extension/extensiontest v0.137.1-0.20251013162618-a96eab114ea4/go.mod h1:2wyfqLdE/kVOdVOnWxPBQlEnI2NV0bqGXJFRPSrtqy0= go.opentelemetry.io/collector/featuregate v1.43.1-0.20251013162618-a96eab114ea4 h1:IXK7EGifr3Lic3mnMlkVXFb1HBlkBoYfoy/AYzsevko= go.opentelemetry.io/collector/featuregate v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:d0tiRzVYrytB6LkcYgz2ESFTv7OktRPQe0QEQcPt1L4= go.opentelemetry.io/collector/internal/telemetry v0.137.1-0.20251013162618-a96eab114ea4 h1:E94fKydbFNkR6Zfhix/alvOftJx6e6srYQ8PIr9xYRw= go.opentelemetry.io/collector/internal/telemetry v0.137.1-0.20251013162618-a96eab114ea4/go.mod h1:ui3HnaeyvIe6tpjUFcL70Ev3aw5UxQnzoBGhuXfbbfs= go.opentelemetry.io/collector/pdata v1.43.1-0.20251013162618-a96eab114ea4 h1:lmaRPm+HIMu/Tz4Ht9/NcGCd7FVOUXBmC3fXDZYN6+E= go.opentelemetry.io/collector/pdata v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:rhhv1vy8COsKFpXiBtLG8GTRDRjg2DL7JPq4E+xOD5Q= +go.opentelemetry.io/collector/pipeline v1.43.0 h1:IJjdqE5UCQlyVvFUUzlhSWhP4WIwpH6UyJQ9iWXpyww= +go.opentelemetry.io/collector/pipeline v1.43.0/go.mod h1:xUrAqiebzYbrgxyoXSkk6/Y3oi5Sy3im2iCA51LwUAI= go.opentelemetry.io/contrib/bridges/otelzap v0.13.0 h1:aBKdhLVieqvwWe9A79UHI/0vgp2t/s2euY8X59pGRlw= go.opentelemetry.io/contrib/bridges/otelzap v0.13.0/go.mod h1:SYqtxLQE7iINgh6WFuVi2AI70148B8EI35DSk0Wr8m4= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= go.opentelemetry.io/otel/log v0.14.0 h1:2rzJ+pOAZ8qmZ3DDHg73NEKzSZkhkGIua9gXtxNGgrM= go.opentelemetry.io/otel/log v0.14.0/go.mod h1:5jRG92fEAgx0SU/vFPxmJvhIuDU9E1SUnEQrMlJpOno= +go.opentelemetry.io/otel/log/logtest v0.14.0 h1:BGTqNeluJDK2uIHAY8lRqxjVAYfqgcaTbVk1n3MWe5A= +go.opentelemetry.io/otel/log/logtest v0.14.0/go.mod h1:IuguGt8XVP4XA4d2oEEDMVDBBCesMg8/tSGWDjuKfoA= go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.opentelemetry.io/proto/slim/otlp v1.8.0 h1:afcLwp2XOeCbGrjufT1qWyruFt+6C9g5SOuymrSPUXQ= +go.opentelemetry.io/proto/slim/otlp v1.8.0/go.mod h1:Yaa5fjYm1SMCq0hG0x/87wV1MP9H5xDuG/1+AhvBcsI= +go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.1.0 h1:Uc+elixz922LHx5colXGi1ORbsW8DTIGM+gg+D9V7HE= +go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.1.0/go.mod h1:VyU6dTWBWv6h9w/+DYgSZAPMabWbPTFTuxp25sM8+s0= +go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.1.0 h1:i8YpvWGm/Uq1koL//bnbJ/26eV3OrKWm09+rDYo7keU= +go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.1.0/go.mod h1:pQ70xHY/ZVxNUBPn+qUWPl8nwai87eWdqL3M37lNi9A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -188,15 +225,22 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= @@ -206,11 +250,17 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/oauth2 v0.31.0 h1:8Fq0yVZLh4j4YA47vHKFTa9Ew5XIrCP8LC6UeNZnLxo= +golang.org/x/oauth2 v0.31.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -233,8 +283,15 @@ golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:zPKJod4w6F1+nRGDI9ubnXYhU9NSWoFAijkHkUXeTK8= google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= diff --git a/internal/kafka/oidc_client.go b/internal/kafka/oidc_client.go new file mode 100644 index 0000000000000..2ae103014f67c --- /dev/null +++ b/internal/kafka/oidc_client.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafka // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + +import ( + "context" + + "github.com/IBM/sarama" + "golang.org/x/oauth2" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension" +) + +// OIDCTokenProvider wraps oauth2clientauthextension to provide tokens for Kafka SASL/OAUTHBEARER +type OIDCTokenProvider struct { + tokenSource oauth2.TokenSource +} + +// Token returns a sarama.AccessToken for Sarama-based Kafka clients +func (p *OIDCTokenProvider) Token() (*sarama.AccessToken, error) { + token, err := p.tokenSource.Token() + if err != nil { + return nil, err + } + return &sarama.AccessToken{ + Token: token.AccessToken, + }, nil +} + +// GetToken returns the oauth2.Token directly for franz-go based clients +func (p *OIDCTokenProvider) GetToken() (*oauth2.Token, error) { + return p.tokenSource.Token() +} + +// NewOIDCTokenProvider creates a new OIDC token provider using oauth2clientauthextension. +// This provides token functionality for Franz-go Kafka clients. +func NewOIDCTokenProvider( + ctx context.Context, + clientID string, + clientSecretFilePath string, + tokenURL string, + scopes []string, + endpointParams map[string][]string, + authStyle oauth2.AuthStyle, + expiryBuffer int, +) (*OIDCTokenProvider, context.CancelFunc) { + ctx, cancel := context.WithCancel(ctx) + + config := &oauth2clientauthextension.ClientCredentialsConfig{ + Config: oauth2clientauthextension.Config{ + ClientID: clientID, + ClientSecretFile: clientSecretFilePath, + TokenURL: tokenURL, + Scopes: scopes, + EndpointParams: endpointParams, + }, + AuthStyle: authStyle, + ExpiryBuffer: expiryBuffer, + } + + tokenSource := config.TokenSource(ctx) + + return &OIDCTokenProvider{ + tokenSource: tokenSource, + }, cancel +} diff --git a/internal/kafka/oidc_client_test.go b/internal/kafka/oidc_client_test.go new file mode 100644 index 0000000000000..f3d966b8ffaa8 --- /dev/null +++ b/internal/kafka/oidc_client_test.go @@ -0,0 +1,477 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafka // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "encoding/json" + "errors" + "fmt" + "log" + "net" + "net/http" + "net/url" + "os" + "path/filepath" + "testing" + "time" + + "github.com/golang-jwt/jwt/v5" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/oauth2" +) + +const ( + testClientID = "mock-client-id" + testScope = "mock-scope" +) + +func TestOIDCProvider_GetToken_Success(t *testing.T) { + secretFile, err := k8sSecretFile() + assert.NoError(t, err) + + testClientSecret, err = os.ReadFile(secretFile) + assert.NoError(t, err) + + tokenTTLsecs := 10 + + oidcServerQuit := make(chan bool, 1) + portCh := make(chan int, 1) + go func() { + oidcServer(oidcServerQuit, portCh, tokenTTLsecs) + }() + defer func() { + oidcServerQuit <- true + }() + + // Wait for server to start and get the port + port := <-portCh + tokenURL := fmt.Sprintf("http://127.0.0.1:%d/token", port) + + oidcProvider, cancel := NewOIDCTokenProvider(t.Context(), testClientID, secretFile, tokenURL, + []string{testScope}, url.Values{}, oauth2.AuthStyleAutoDetect, 0) + defer cancel() + + oauthToken, err := oidcProvider.GetToken() + require.NoError(t, err) + assert.NotNil(t, oauthToken) + assert.NotEmpty(t, oauthToken.AccessToken) + + parser := jwt.NewParser(jwt.WithoutClaimsValidation()) + tokenObj, err := parser.Parse(oauthToken.AccessToken, func(_ *jwt.Token) (any, error) { + return publicKey, nil + }) + assert.NoError(t, err) + assert.NotNil(t, tokenObj) + claims := tokenObj.Claims.(jwt.MapClaims) + assert.Equal(t, testClientID, claims["client_id"]) + assert.Equal(t, testScope, claims["scope"]) + + assert.WithinDuration(t, time.Now(), time.Unix(int64(claims["iat"].(float64)), 0), 2*time.Second) + expectedTimeout := time.Now().Add(time.Duration(tokenTTLsecs) * time.Second) + actualTimeout := time.Unix(int64(claims["exp"].(float64)), 0) + assert.WithinDuration(t, expectedTimeout, actualTimeout, 2*time.Second) +} + +func TestOIDCProvider_GetToken_Error(t *testing.T) { + secretFile, err := k8sSecretFile() + assert.NoError(t, err) + + testClientSecret, err = os.ReadFile(secretFile) + assert.NoError(t, err) + + tokenTTLsecs := 10 + + oidcServerQuit := make(chan bool) + portCh := make(chan int, 1) + go func() { + oidcServer(oidcServerQuit, portCh, tokenTTLsecs) + }() + defer func() { + oidcServerQuit <- true + }() + + // Wait for server to start and get the port + port := <-portCh + tokenURL := fmt.Sprintf("http://127.0.0.1:%d/token", port) + + oidcProvider, cancel := NewOIDCTokenProvider(t.Context(), "wrong-client-id", secretFile, + tokenURL, []string{testScope}, url.Values{}, oauth2.AuthStyleAutoDetect, 0) + defer cancel() + + oauthToken, err := oidcProvider.GetToken() + require.Error(t, err) + assert.Nil(t, oauthToken) +} + +func TestOIDCProvider_TokenCaching(t *testing.T) { + secretFile, err := k8sSecretFile() + assert.NoError(t, err) + + testClientSecret, err = os.ReadFile(secretFile) + assert.NoError(t, err) + + tokenTTLsecs := 10 + + oidcServerQuit := make(chan bool, 1) + portCh := make(chan int, 1) + go func() { + oidcServer(oidcServerQuit, portCh, tokenTTLsecs) + }() + defer func() { + oidcServerQuit <- true + }() + + // Wait for server to start and get the port + port := <-portCh + tokenURL := fmt.Sprintf("http://127.0.0.1:%d/token", port) + + oidcProvider, cancel := NewOIDCTokenProvider(t.Context(), testClientID, secretFile, tokenURL, + []string{testScope}, url.Values{}, oauth2.AuthStyleAutoDetect, 0) + defer cancel() + + token1, err1 := oidcProvider.GetToken() + assert.NoError(t, err1) + assert.NotNil(t, token1) + + token2, err2 := oidcProvider.GetToken() + assert.NoError(t, err2) + assert.NotNil(t, token2) + // Check that the same access token is returned (caching is working) + assert.Equal(t, token1.AccessToken, token2.AccessToken) +} + +func TestOIDCProvider_TokenExpired(t *testing.T) { + secretFile, err := k8sSecretFile() + assert.NoError(t, err) + + testClientSecret, err = os.ReadFile(secretFile) + assert.NoError(t, err) + + tokenTTLsecs := 3 + + oidcServerQuit := make(chan bool, 1) + portCh := make(chan int, 1) + go func() { + oidcServer(oidcServerQuit, portCh, tokenTTLsecs) + }() + defer func() { + oidcServerQuit <- true + }() + + // Wait for server to start and get the port + port := <-portCh + tokenURL := fmt.Sprintf("http://127.0.0.1:%d/token", port) + + oidcProvider, cancel := NewOIDCTokenProvider(t.Context(), testClientID, secretFile, tokenURL, + []string{testScope}, url.Values{}, oauth2.AuthStyleAutoDetect, 0) + defer cancel() + + token1, err1 := oidcProvider.GetToken() + assert.NoError(t, err1) + assert.NotNil(t, token1) + + time.Sleep(time.Duration((tokenTTLsecs*1000)+500) * time.Millisecond) + + token2, err2 := oidcProvider.GetToken() + assert.NoError(t, err2) + assert.NotNil(t, token2) + assert.NotEqual(t, token1, token2) +} + +func TestOIDCProvider_RefreshAhead(t *testing.T) { + secretFile, err := k8sSecretFile() + assert.NoError(t, err) + + testClientSecret, err = os.ReadFile(secretFile) + assert.NoError(t, err) + + tokenTTLsecs := 5 + + oidcServerQuit := make(chan bool, 1) + portCh := make(chan int, 1) + go func() { + oidcServer(oidcServerQuit, portCh, tokenTTLsecs) + }() + defer func() { + oidcServerQuit <- true + }() + + // Wait for server to start and get the port + port := <-portCh + tokenURL := fmt.Sprintf("http://127.0.0.1:%d/token", port) + + parser := jwt.NewParser(jwt.WithoutClaimsValidation()) + + oidcProvider, cancel := NewOIDCTokenProvider(t.Context(), testClientID, secretFile, tokenURL, + []string{testScope}, url.Values{}, oauth2.AuthStyleAutoDetect, 2) + defer cancel() + + token1, err1 := oidcProvider.GetToken() + assert.NoError(t, err1) + assert.NotNil(t, token1) + + time.Sleep(time.Duration((tokenTTLsecs*1000)+500) * time.Millisecond) + + token2, err2 := oidcProvider.GetToken() + assert.NoError(t, err2) + assert.NotNil(t, token2) + assert.NotEqual(t, token1, token2) + + // Verify second token is different and issued after the first + token1obj, err := parser.Parse(token1.AccessToken, func(_ *jwt.Token) (any, error) { + return publicKey, nil + }) + assert.NoError(t, err) + assert.NotNil(t, token1obj) + claims1 := token1obj.Claims.(jwt.MapClaims) + assert.Equal(t, testClientID, claims1["client_id"]) + assert.Equal(t, testScope, claims1["scope"]) + tok1IssuedAt := time.Unix(int64(claims1["iat"].(float64)), 0) + tok1ExpAt := time.Unix(int64(claims1["exp"].(float64)), 0) + + token2obj, err := parser.Parse(token2.AccessToken, func(_ *jwt.Token) (any, error) { + return publicKey, nil + }) + assert.NoError(t, err) + assert.NotNil(t, token2obj) + claims2 := token2obj.Claims.(jwt.MapClaims) + assert.Equal(t, testClientID, claims2["client_id"]) + assert.Equal(t, testScope, claims2["scope"]) + tok2IssuedAt := time.Unix(int64(claims2["iat"].(float64)), 0) + tok2ExpAt := time.Unix(int64(claims2["exp"].(float64)), 0) + + assert.True(t, tok2IssuedAt.After(tok1IssuedAt)) + assert.True(t, tok2ExpAt.After(tok1ExpAt)) +} + +func k8sSecretFile() (string, error) { + // Mock a small subset of a Kubernetes Service Account token + claims := jwt.MapClaims{ + "iss": "https://kubernetes.default.svc.cluster.local", + "sub": "system:serviceaccount:test:default", + "aud": []string{"https://kubernetes.default.svc.cluster.local"}, + "exp": time.Now().Add(time.Duration(120) * time.Second).Unix(), + "iat": time.Now().Unix(), + "jti": uuid.NewString(), + } + + token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims) + k8sSAtoken, err := token.SignedString(privateKey) + if err != nil { + return "", fmt.Errorf("error creating mock K8S service account token: %w", err) + } + + tokenPath := filepath.Join(os.TempDir(), "k8sToken") + err = os.WriteFile(tokenPath, []byte(k8sSAtoken), 0o600) + if err != nil { + return "", fmt.Errorf("error writing %s: %w", tokenPath, err) + } + + return tokenPath, nil +} + +// An implementation of a very basic OIDC server that supports only +// the "client_credentials" grant type. +type TokenRequest struct { + GrantType string `json:"grant_type"` + ClientID string `json:"client_id"` + ClientSecret string `json:"client_secret"` + Scope string `json:"scope"` +} + +type ErrorResponse struct { + Error string `json:"error"` + ErrorDescription string `json:"error_description,omitempty"` +} + +var ( + privateKey *rsa.PrivateKey + publicKey *rsa.PublicKey + testClientSecret []byte +) + +func init() { + var err error + privateKey, err = rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + log.Fatal("Failed to generate RSA key:", err) + } + publicKey = &privateKey.PublicKey +} + +func NewTokenHandler(expireSecs int) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusMethodNotAllowed) + err := json.NewEncoder(w).Encode(ErrorResponse{ + Error: "invalid_request", + ErrorDescription: "Method not allowed", + }) + if err != nil { + log.Printf("could not encode error response: %v", err) + } + return + } + + if err := r.ParseForm(); err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + err := json.NewEncoder(w).Encode(ErrorResponse{ + Error: "invalid_request", + ErrorDescription: "Failed to parse form data", + }) + if err != nil { + log.Printf("could not encode error response: %v", err) + } + return + } + + grantType := r.FormValue("grant_type") + submittedClientID := r.FormValue("client_id") + submittedClientSecret := r.FormValue("client_secret") + scope := r.FormValue("scope") + + if grantType != "client_credentials" { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + err := json.NewEncoder(w).Encode(ErrorResponse{ + Error: "unsupported_grant_type", + ErrorDescription: "Only client_credentials grant type is supported", + }) + if err != nil { + log.Printf("could not encode error response: %v", err) + } + return + } + + if submittedClientID == "" || submittedClientSecret == "" { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + err := json.NewEncoder(w).Encode(ErrorResponse{ + Error: "invalid_client", + ErrorDescription: "Client ID and secret are required", + }) + if err != nil { + log.Printf("could not encode error response: %v", err) + } + return + } + + if !validateClient(submittedClientID, submittedClientSecret) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusUnauthorized) + err := json.NewEncoder(w).Encode(ErrorResponse{ + Error: "invalid_client", + ErrorDescription: "Invalid client credentials", + }) + if err != nil { + log.Printf("could not encode error response: %v", err) + } + return + } + + token, err := generateJWTToken(submittedClientID, scope, expireSecs) + if err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + encodeErr := json.NewEncoder(w).Encode(ErrorResponse{ + Error: "server_error", + ErrorDescription: "Failed to generate token", + }) + if encodeErr != nil { + log.Printf("could not encode error response: %v", encodeErr) + } + return + } + + response := oauth2.Token{ + AccessToken: token, + TokenType: "Bearer", + // Note that `expiry` is not an official field in the OIDC or OAuth2 specs + Expiry: time.Now().Add(time.Duration(expireSecs) * time.Second), + ExpiresIn: int64(expireSecs), + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err = json.NewEncoder(w).Encode(response) + if err != nil { + log.Printf("could not encode response: %v", err) + } + } +} + +func validateClient(clientID, clientSecret string) bool { + validClients := map[string]string{ + testClientID: string(testClientSecret), + // "test_client": "test_secret", + } + + expectedSecret, exists := validClients[clientID] + return exists && expectedSecret == clientSecret +} + +func generateJWTToken(clientID, scope string, expireSecs int) (string, error) { + now := time.Now() + + claims := jwt.MapClaims{ + "iss": "oidc-mock-server", + "sub": clientID, + "aud": "api", + "exp": now.Add(time.Duration(expireSecs) * time.Second).Unix(), + "iat": now.Unix(), + "client_id": clientID, + } + + if scope != "" { + claims["scope"] = scope + } + + token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims) + + return token.SignedString(privateKey) +} + +func oidcServer(shutdownCh <-chan bool, portCh chan<- int, expireSecs int) { + mux := http.NewServeMux() + mux.HandleFunc("/token", NewTokenHandler(expireSecs)) + s := &http.Server{ + Addr: ":0", // Use port 0 for dynamic allocation + Handler: mux, + ReadHeaderTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + } + + listener, err := net.Listen("tcp", s.Addr) + if err != nil { + log.Fatalf("could not create listener: %v", err) + } + + // Get the actual port number and send it back + port := listener.Addr().(*net.TCPAddr).Port + portCh <- port + + go func() { + serveErr := s.Serve(listener) + if serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) { + log.Printf("OIDC server error: %v", serveErr) + } + }() + + <-shutdownCh + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = s.Shutdown(shutdownCtx) + if err != nil { + log.Printf("error shutting down OIDC server: %v", err) + } +} diff --git a/pkg/kafka/configkafka/config.go b/pkg/kafka/configkafka/config.go index 557d4ab8980fe..f56263e6d4f57 100644 --- a/pkg/kafka/configkafka/config.go +++ b/pkg/kafka/configkafka/config.go @@ -6,12 +6,14 @@ package configkafka // import "github.com/open-telemetry/opentelemetry-collector import ( "errors" "fmt" + "net/url" "time" "github.com/IBM/sarama" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap" + "golang.org/x/oauth2" ) const ( @@ -356,18 +358,23 @@ type SASLConfig struct { Username string `mapstructure:"username"` // Password to be used on authentication Password string `mapstructure:"password"` - // SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM_OAUTHBEARER, SCRAM-SHA-256 or SCRAM-SHA-512). + // SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM_OAUTHBEARER, OIDCFILE, + // OIDC_STRING, SCRAM-SHA-256 or SCRAM-SHA-512). Mechanism string `mapstructure:"mechanism"` // SASL Protocol Version to be used, possible values are: (0, 1). Defaults to 0. Version int `mapstructure:"version"` // AWSMSK holds configuration specific to AWS MSK. AWSMSK AWSMSKConfig `mapstructure:"aws_msk"` + // OIDCFile holds configuration for OIDC with file-based secret + OIDCFILE OIDCFileConfig `mapstructure:"oidc_file"` } func (c SASLConfig) Validate() error { switch c.Mechanism { case "AWS_MSK_IAM_OAUTHBEARER": // TODO validate c.AWSMSK + case "OIDCFILE": + // TODO valid c.OIDCFile case "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512": // Do nothing, valid mechanism if c.Username == "" { @@ -395,6 +402,18 @@ type AWSMSKConfig struct { Region string `mapstructure:"region"` } +// OIDCFileConfig defines the additional configuration fields +// for the OIDCFILE mechanism +type OIDCFileConfig struct { + ClientSecretFilePath string `mapstructure:"client_secret_file_path"` + TokenURL string `mapstructure:"token_url"` + Scopes []string `mapstructure:"scopes"` + RefreshAheadSecs int `mapstructure:"refresh_ahead_secs"` + EndPointParams url.Values `mapstructure:"endpoint_params"` + AuthStyle oauth2.AuthStyle `mapstructure:"auth_style"` + ExpiryBuffer int `mapstructure:"expiry_buffer"` +} + // KerberosConfig defines kerberos configuration. type KerberosConfig struct { ServiceName string `mapstructure:"service_name"` diff --git a/pkg/kafka/configkafka/go.mod b/pkg/kafka/configkafka/go.mod index 3174a53605ca0..8868a50e8dbb0 100644 --- a/pkg/kafka/configkafka/go.mod +++ b/pkg/kafka/configkafka/go.mod @@ -11,6 +11,7 @@ require ( go.opentelemetry.io/collector/confmap v1.43.1-0.20251013162618-a96eab114ea4 go.opentelemetry.io/collector/confmap/xconfmap v0.137.1-0.20251013162618-a96eab114ea4 go.uber.org/goleak v1.3.0 + golang.org/x/oauth2 v0.30.0 ) require ( diff --git a/pkg/kafka/configkafka/go.sum b/pkg/kafka/configkafka/go.sum index 96b4422f84951..d2de8eee12132 100644 --- a/pkg/kafka/configkafka/go.sum +++ b/pkg/kafka/configkafka/go.sum @@ -180,6 +180,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= +golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/receiver/kafkametricsreceiver/go.mod b/receiver/kafkametricsreceiver/go.mod index 8f3ab8de66b56..e83907f65919e 100644 --- a/receiver/kafkametricsreceiver/go.mod +++ b/receiver/kafkametricsreceiver/go.mod @@ -29,6 +29,7 @@ require ( ) require ( + cloud.google.com/go/compute/metadata v0.7.0 // indirect github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 // indirect github.com/aws/aws-sdk-go-v2 v1.36.4 // indirect github.com/aws/aws-sdk-go-v2/config v1.29.16 // indirect @@ -73,6 +74,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.137.0 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect @@ -89,6 +91,8 @@ require ( go.opentelemetry.io/collector/config/configtls v1.43.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.137.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.137.1-0.20251013162618-a96eab114ea4 // indirect + go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4 // indirect + go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/internal/telemetry v0.137.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.137.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/pipeline v1.43.1-0.20251013162618-a96eab114ea4 // indirect @@ -104,6 +108,7 @@ require ( go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.43.0 // indirect golang.org/x/net v0.46.0 // indirect + golang.org/x/oauth2 v0.31.0 // indirect golang.org/x/sys v0.37.0 // indirect golang.org/x/text v0.30.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect @@ -116,6 +121,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka => ../../pkg/kafka/configkafka +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension => ../../extension/oauth2clientauthextension + // see https://github.com/distribution/distribution/issues/3590 exclude github.com/docker/distribution v2.8.0+incompatible diff --git a/receiver/kafkametricsreceiver/go.sum b/receiver/kafkametricsreceiver/go.sum index ff03d117d457f..caaa2e67c3c9f 100644 --- a/receiver/kafkametricsreceiver/go.sum +++ b/receiver/kafkametricsreceiver/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go/compute/metadata v0.7.0 h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU= +cloud.google.com/go/compute/metadata v0.7.0/go.mod h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo= github.com/IBM/sarama v1.46.2 h1:65JJmZpxKUWe/7HEHmc56upTfAvgoxuyu4Ek+TcevDE= github.com/IBM/sarama v1.46.2/go.mod h1:PDOGmVeKmW744c/0d4CZ0MfrzmcIYtpmS5+KIWs1zHQ= github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 h1:2jAwFwA0Xgcx94dUId+K24yFabsKYDtAhCgyMit6OqE= @@ -57,6 +59,8 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= +github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -186,6 +190,12 @@ go.opentelemetry.io/collector/consumer/consumertest v0.137.1-0.20251013162618-a9 go.opentelemetry.io/collector/consumer/consumertest v0.137.1-0.20251013162618-a96eab114ea4/go.mod h1:Yz1Mo4ibkgutZRUOf/odYxyrJCnqWPiQ3s9XsESkyZA= go.opentelemetry.io/collector/consumer/xconsumer v0.137.1-0.20251013162618-a96eab114ea4 h1:t3bFdGBL/gCTXVeKRNd/n9dhl1mKj+upajY6Y5pL1yM= go.opentelemetry.io/collector/consumer/xconsumer v0.137.1-0.20251013162618-a96eab114ea4/go.mod h1:RsSFgyg2HiQHSE0yBFxG1GKm+x/sKtLYNkhwUqQ6ABg= +go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4 h1:ydWUlclX7E9H3AGvrayXMnplL7DZXcJShKbCp0fweEY= +go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:GIzXTwB+7I3TpTx2Zppp/2884BNg1LaWe0zjumgV82Q= +go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4 h1:6BbCvOb/86AnIGn5J1vcNauoZ2cbloU4WscZJMjbkso= +go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:6Sh0hqPfPqpg0ErCoNPO/ky2NdfGmUX+G5wekPx7A7U= +go.opentelemetry.io/collector/extension/extensiontest v0.137.1-0.20251013162618-a96eab114ea4 h1:DBMKb2WJVCjqa6hkDgVI2mgnzv/x/mx9JUA35U0k1YA= +go.opentelemetry.io/collector/extension/extensiontest v0.137.1-0.20251013162618-a96eab114ea4/go.mod h1:2wyfqLdE/kVOdVOnWxPBQlEnI2NV0bqGXJFRPSrtqy0= go.opentelemetry.io/collector/featuregate v1.43.1-0.20251013162618-a96eab114ea4 h1:IXK7EGifr3Lic3mnMlkVXFb1HBlkBoYfoy/AYzsevko= go.opentelemetry.io/collector/featuregate v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:d0tiRzVYrytB6LkcYgz2ESFTv7OktRPQe0QEQcPt1L4= go.opentelemetry.io/collector/filter v0.137.1-0.20251013162618-a96eab114ea4 h1:Njejdb5z+0bGP41bty5HeDZtslEwk5pNceRmeYCLys0= @@ -268,6 +278,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/oauth2 v0.31.0 h1:8Fq0yVZLh4j4YA47vHKFTa9Ew5XIrCP8LC6UeNZnLxo= +golang.org/x/oauth2 v0.31.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index 4c6f95c52c57c..92b6fa4f7f5f5 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -51,6 +51,7 @@ require ( ) require ( + cloud.google.com/go/compute/metadata v0.7.0 // indirect github.com/apache/thrift v0.22.0 // indirect github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 // indirect github.com/aws/aws-sdk-go-v2 v1.36.4 // indirect @@ -96,6 +97,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.137.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/core/xidutils v0.137.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.137.0 // indirect github.com/openzipkin/zipkin-go v0.4.3 // indirect @@ -114,6 +116,7 @@ require ( go.opentelemetry.io/collector/config/configoptional v1.43.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/exporter v1.43.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4 // indirect + go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/extension/xextension v0.137.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/internal/telemetry v0.137.1-0.20251013162618-a96eab114ea4 // indirect go.opentelemetry.io/collector/pdata/xpdata v0.137.1-0.20251013162618-a96eab114ea4 // indirect @@ -126,6 +129,7 @@ require ( golang.org/x/crypto v0.43.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/net v0.46.0 // indirect + golang.org/x/oauth2 v0.31.0 // indirect golang.org/x/sys v0.37.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect google.golang.org/grpc v1.76.0 // indirect @@ -158,3 +162,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure => ../../pkg/translator/azure replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka => ../../pkg/kafka/configkafka + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension => ../../extension/oauth2clientauthextension diff --git a/receiver/kafkareceiver/go.sum b/receiver/kafkareceiver/go.sum index f21aba25ae11c..f528564426228 100644 --- a/receiver/kafkareceiver/go.sum +++ b/receiver/kafkareceiver/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go/compute/metadata v0.7.0 h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU= +cloud.google.com/go/compute/metadata v0.7.0/go.mod h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo= github.com/IBM/sarama v1.46.2 h1:65JJmZpxKUWe/7HEHmc56upTfAvgoxuyu4Ek+TcevDE= github.com/IBM/sarama v1.46.2/go.mod h1:PDOGmVeKmW744c/0d4CZ0MfrzmcIYtpmS5+KIWs1zHQ= github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc= @@ -65,6 +67,8 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= +github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -127,6 +131,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.137.0 h1:cKeFgcZf0mI7NL4NLB6c4nsQVZ5Gv14cg6wyB5BFsns= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.137.0/go.mod h1:GLGpfbWdN6n/I0WA+JwGdokRO9WcHDzCPojxmjK4yEI= github.com/openzipkin/zipkin-go v0.4.3 h1:9EGwpqkgnwdEIJ+Od7QVSEIH+ocmm5nPat0G7sjsSdg= github.com/openzipkin/zipkin-go v0.4.3/go.mod h1:M9wCJZFWCo2RiY+o1eBCEMe0Dp2S5LDHcMZmk3RmK7c= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= @@ -216,6 +222,8 @@ go.opentelemetry.io/collector/exporter/xexporter v0.137.0 h1:2fSmBDB+tuFoYKJSHbR go.opentelemetry.io/collector/exporter/xexporter v0.137.0/go.mod h1:9gudRad3ijkbzcnTLE0y+CzUDtC4TaPyZQDUKB2yzVs= go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4 h1:ydWUlclX7E9H3AGvrayXMnplL7DZXcJShKbCp0fweEY= go.opentelemetry.io/collector/extension v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:GIzXTwB+7I3TpTx2Zppp/2884BNg1LaWe0zjumgV82Q= +go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4 h1:6BbCvOb/86AnIGn5J1vcNauoZ2cbloU4WscZJMjbkso= +go.opentelemetry.io/collector/extension/extensionauth v1.43.1-0.20251013162618-a96eab114ea4/go.mod h1:6Sh0hqPfPqpg0ErCoNPO/ky2NdfGmUX+G5wekPx7A7U= go.opentelemetry.io/collector/extension/extensiontest v0.137.0 h1:gnPF3HIOKqNk93XObt2x0WFvVfPtm76VggWe7LxgcaY= go.opentelemetry.io/collector/extension/extensiontest v0.137.0/go.mod h1:vVmKojdITYka9+iAi3aarxeMrO6kdlywKuf3d3c6lcI= go.opentelemetry.io/collector/extension/xextension v0.137.1-0.20251013162618-a96eab114ea4 h1:0DbV+ob2o19r03p7ZbacTeMqFi1B/ivpcmNBPGLkA+k= @@ -300,6 +308,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/oauth2 v0.31.0 h1:8Fq0yVZLh4j4YA47vHKFTa9Ew5XIrCP8LC6UeNZnLxo= +golang.org/x/oauth2 v0.31.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=