Skip to content

Commit a7715ef

Browse files
authored
[release-1.15] Expose OIDC audience of KafkaSink in its status (#4073)
* Expose OIDC audience of KafkaSink in its status (#4067) * Provision audience of KafkaSink * Add e2e test * Run full OIDC e2e test suite * run gofmt and goimports * Fix build issue
1 parent d641892 commit a7715ef

File tree

6 files changed

+183
-5
lines changed

6 files changed

+183
-5
lines changed

control-plane/pkg/contract/contract.pb.go

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

control-plane/pkg/reconciler/sink/kafka_sink.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import (
2121
"fmt"
2222
"time"
2323

24+
"knative.dev/eventing/pkg/auth"
25+
"knative.dev/pkg/logging"
26+
2427
"github.com/IBM/sarama"
2528
"go.uber.org/zap"
2629
"k8s.io/apimachinery/pkg/types"
@@ -240,9 +243,9 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
240243

241244
logger.Debug("Updated receiver pod annotation")
242245

243-
transportEncryptionFlags := feature.FromContext(ctx)
246+
features := feature.FromContext(ctx)
244247
var addressableStatus duckv1.AddressStatus
245-
if transportEncryptionFlags.IsPermissiveTransportEncryption() {
248+
if features.IsPermissiveTransportEncryption() {
246249
caCerts, err := r.getCaCerts()
247250
if err != nil {
248251
return err
@@ -257,7 +260,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
257260
// - http address with path-based routing
258261
addressableStatus.Address = &httpAddress
259262
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress, httpAddress}
260-
} else if transportEncryptionFlags.IsStrictTransportEncryption() {
263+
} else if features.IsStrictTransportEncryption() {
261264
// Strict mode: (only https addresses)
262265
// - status.address https address with path-based routing
263266
// - status.addresses:
@@ -296,6 +299,25 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
296299

297300
ks.Status.AddressStatus = addressableStatus
298301

302+
if features.IsOIDCAuthentication() {
303+
audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("KafkaSink"), ks.ObjectMeta)
304+
logging.FromContext(ctx).Debugw("Setting the kafkasinks audience", zap.String("audience", audience))
305+
ks.Status.Address.Audience = &audience
306+
307+
for i := range ks.Status.Addresses {
308+
ks.Status.Addresses[i].Audience = &audience
309+
}
310+
} else {
311+
logging.FromContext(ctx).Debug("Clearing the kafkasinks audience as OIDC is not enabled")
312+
if ks.Status.Address != nil {
313+
ks.Status.Address.Audience = nil
314+
}
315+
316+
for i := range ks.Status.Addresses {
317+
ks.Status.Addresses[i].Audience = nil
318+
}
319+
}
320+
299321
ks.GetConditionSet().Manage(ks.GetStatus()).MarkTrue(base.ConditionAddressable)
300322

301323
return nil

control-plane/pkg/reconciler/sink/kafka_sink_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"io"
2323
"testing"
2424

25+
"knative.dev/eventing/pkg/auth"
26+
2527
"k8s.io/utils/pointer"
2628

2729
duckv1 "knative.dev/pkg/apis/duck/v1"
@@ -49,6 +51,7 @@ import (
4951

5052
"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing"
5153
"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
54+
kafkaeventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
5255
fakeeventingkafkaclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/client/fake"
5356
sinkreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/eventing/v1alpha1/kafkasink"
5457
"knative.dev/eventing-kafka-broker/control-plane/pkg/receiver"
@@ -95,6 +98,11 @@ var (
9598
Path: fmt.Sprintf("/%s/%s", SinkNamespace, SinkName),
9699
}
97100

101+
sinkAudience = auth.GetAudience(kafkaeventing.SchemeGroupVersion.WithKind("KafkaSink"), metav1.ObjectMeta{
102+
Name: SinkName,
103+
Namespace: SinkNamespace,
104+
})
105+
98106
errCreateTopic = fmt.Errorf("failed to create topic")
99107

100108
errDeleteTopic = fmt.Errorf("failed to delete topic")
@@ -1283,6 +1291,73 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) {
12831291
),
12841292
},
12851293
},
1294+
}, {
1295+
Name: "Reconciled normal - OIDC enabled - should provision audience",
1296+
Ctx: feature.ToContext(context.Background(), feature.Flags{
1297+
feature.OIDCAuthentication: feature.Enabled,
1298+
}),
1299+
Objects: []runtime.Object{
1300+
NewSink(
1301+
StatusControllerOwnsTopic(sink.ControllerTopicOwner),
1302+
),
1303+
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
1304+
SinkReceiverPod(env.SystemNamespace, map[string]string{
1305+
"annotation_to_preserve": "value_to_preserve",
1306+
}),
1307+
},
1308+
Key: testKey,
1309+
WantEvents: []string{
1310+
finalizerUpdatedEvent,
1311+
},
1312+
WantUpdates: []clientgotesting.UpdateActionImpl{
1313+
ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{
1314+
Resources: []*contract.Resource{
1315+
{
1316+
Uid: SinkUUID,
1317+
Topics: []string{SinkTopic()},
1318+
Ingress: &contract.Ingress{ContentMode: contract.ContentMode_BINARY, Path: receiver.Path(SinkNamespace, SinkName)},
1319+
BootstrapServers: bootstrapServers,
1320+
Reference: SinkReference(),
1321+
},
1322+
},
1323+
Generation: 1,
1324+
}),
1325+
SinkReceiverPodUpdate(env.SystemNamespace, map[string]string{
1326+
base.VolumeGenerationAnnotationKey: "1",
1327+
"annotation_to_preserve": "value_to_preserve",
1328+
}),
1329+
},
1330+
WantPatches: []clientgotesting.PatchActionImpl{
1331+
patchFinalizers(),
1332+
},
1333+
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
1334+
{
1335+
Object: NewSink(
1336+
StatusControllerOwnsTopic(sink.ControllerTopicOwner),
1337+
InitSinkConditions,
1338+
StatusDataPlaneAvailable,
1339+
StatusConfigParsed,
1340+
BootstrapServers(bootstrapServersArr),
1341+
StatusConfigMapUpdatedReady(&env),
1342+
StatusTopicReadyWithOwner(SinkTopic(), sink.ControllerTopicOwner),
1343+
SinkAddressable(&env),
1344+
StatusProbeSucceeded,
1345+
WithSinkAddress(duckv1.Addressable{
1346+
Name: pointer.String("http"),
1347+
URL: sinkAddress,
1348+
Audience: &sinkAudience,
1349+
}),
1350+
WithSinkAddresses([]duckv1.Addressable{
1351+
{
1352+
Name: pointer.String("http"),
1353+
URL: sinkAddress,
1354+
Audience: &sinkAudience,
1355+
},
1356+
}),
1357+
WithSinkAddessable(),
1358+
),
1359+
},
1360+
},
12861361
},
12871362
}
12881363

test/e2e_new/sink_auth_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
//go:build e2e
2+
// +build e2e
3+
4+
/*
5+
* Copyright 2023 The Knative Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package e2e_new
21+
22+
import (
23+
"testing"
24+
"time"
25+
26+
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink"
27+
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"
28+
29+
testpkg "knative.dev/eventing-kafka-broker/test/pkg"
30+
"knative.dev/eventing/test/rekt/features/oidc"
31+
"knative.dev/pkg/system"
32+
"knative.dev/reconciler-test/pkg/environment"
33+
"knative.dev/reconciler-test/pkg/eventshub"
34+
"knative.dev/reconciler-test/pkg/feature"
35+
"knative.dev/reconciler-test/pkg/k8s"
36+
"knative.dev/reconciler-test/pkg/knative"
37+
)
38+
39+
func TestKafkaSinkSupportsOIDC(t *testing.T) {
40+
t.Parallel()
41+
42+
ctx, env := global.Environment(
43+
knative.WithKnativeNamespace(system.Namespace()),
44+
knative.WithLoggingConfig,
45+
knative.WithTracingConfig,
46+
k8s.WithEventListener,
47+
environment.WithPollTimings(4*time.Second, 12*time.Minute),
48+
environment.Managed(t),
49+
eventshub.WithTLS(t),
50+
)
51+
52+
topic := feature.MakeRandomK8sName("topic")
53+
sink := feature.MakeRandomK8sName("kafkasink")
54+
env.Prerequisite(ctx, t, kafkatopic.GoesReady(topic))
55+
env.Prerequisite(ctx, t, kafkasink.GoesReady(sink, topic, testpkg.BootstrapServersPlaintextArr))
56+
57+
env.TestSet(ctx, t, oidc.AddressableOIDCConformance(kafkasink.GVR(), "KafkaSink", sink, env.Namespace()))
58+
}

test/rekt/resources/kafkasink/kafkasink.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,14 @@ func ValidateAddress(name string, validate addressable.ValidateAddressFn, timing
116116
}
117117
}
118118
}
119+
120+
// GoesReady returns a feature that will create a KafkaSink of the given
121+
// name and topic, and confirm it becomes ready.
122+
func GoesReady(name, topic string, bootstrapServers []string, cfg ...manifest.CfgFn) *feature.Feature {
123+
f := new(feature.Feature)
124+
125+
f.Setup(fmt.Sprintf("install KafkaSink %q", name), Install(name, topic, bootstrapServers, cfg...))
126+
f.Setup("KafkaSink is ready", IsReady(name))
127+
128+
return f
129+
}

test/rekt/resources/kafkatopic/topic.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,14 @@ func WithClusterNamespace(namespace string) manifest.CfgFn {
172172
cfg["clusterNamespace"] = namespace
173173
}
174174
}
175+
176+
// GoesReady returns a feature that will create a topic of the given
177+
// name and confirm it becomes ready.
178+
func GoesReady(name string, cfg ...manifest.CfgFn) *feature.Feature {
179+
f := new(feature.Feature)
180+
181+
f.Setup(fmt.Sprintf("install Topic %q", name), Install(name, cfg...))
182+
f.Setup("Topic is ready", IsReady(name))
183+
184+
return f
185+
}

0 commit comments

Comments
 (0)