Skip to content

Commit ee7e23f

Browse files
committed
Update Fabric8 Kubernetes Client to 6.7.0 (#8604)
Signed-off-by: Jakub Scholz <www@scholzj.com>
1 parent ffa9eb9 commit ee7e23f

File tree

28 files changed

+214
-294
lines changed

28 files changed

+214
-294
lines changed

api/src/main/java/io/strimzi/api/kafka/Crds.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import io.fabric8.kubernetes.client.KubernetesClient;
1515
import io.fabric8.kubernetes.client.dsl.MixedOperation;
1616
import io.fabric8.kubernetes.client.dsl.Resource;
17-
import io.fabric8.kubernetes.internal.KubernetesDeserializer;
1817
import io.strimzi.api.kafka.model.Kafka;
1918
import io.strimzi.api.kafka.model.KafkaBridge;
2019
import io.strimzi.api.kafka.model.KafkaConnect;
@@ -53,17 +52,6 @@ public class Crds {
5352
private Crds() {
5453
}
5554

56-
/**
57-
* Register custom resource kinds with {@link KubernetesDeserializer} so Fabric8 knows how to deserialize them.
58-
*/
59-
public static void registerCustomKinds() {
60-
for (Class<? extends CustomResource> crdClass : CRDS) {
61-
for (String version : apiVersions(crdClass)) {
62-
KubernetesDeserializer.registerCustomKind(version, kind(crdClass), crdClass);
63-
}
64-
}
65-
}
66-
6755
@SuppressWarnings({"checkstyle:JavaNCSS"})
6856
private static CustomResourceDefinition crd(Class<? extends CustomResource> cls) {
6957
String scope, plural, singular, group, kind, listKind;

api/src/test/java/io/strimzi/api/kafka/model/ExamplesTest.java

Lines changed: 3 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,11 @@
55
package io.strimzi.api.kafka.model;
66

77
import com.fasterxml.jackson.annotation.JsonAnyGetter;
8-
import com.fasterxml.jackson.core.JsonProcessingException;
9-
import com.fasterxml.jackson.databind.JsonNode;
108
import com.fasterxml.jackson.databind.ObjectMapper;
11-
import com.fasterxml.jackson.databind.SerializationFeature;
12-
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
139
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
1410
import io.fabric8.kubernetes.api.model.KubernetesResource;
1511
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionSpec;
16-
import io.strimzi.api.kafka.Crds;
12+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
1713
import io.strimzi.test.TestUtils;
1814
import org.junit.jupiter.api.Test;
1915

@@ -25,8 +21,6 @@
2521
import java.util.HashMap;
2622
import java.util.Map;
2723
import java.util.Stack;
28-
import java.util.regex.Matcher;
29-
import java.util.regex.Pattern;
3024
import java.util.stream.Collectors;
3125

3226
import static org.junit.jupiter.api.Assertions.fail;
@@ -37,13 +31,6 @@
3731
* {@code ../packaging/examples} directory are valid.
3832
*/
3933
public class ExamplesTest {
40-
41-
static {
42-
Crds.registerCustomKinds();
43-
}
44-
45-
private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\$\\{\\{(.+?)\\}?\\}");
46-
4734
/**
4835
* Recurse through the examples directory looking for resources of the right type
4936
* and validating them.
@@ -72,13 +59,7 @@ private void validate(File f) {
7259
try {
7360
ObjectMapper mapper = new YAMLMapper();
7461
final String content = TestUtils.readFile(f);
75-
JsonNode rootNode = mapper.readTree(content);
76-
String resourceKind = getKind(rootNode);
77-
if ("Template".equals(resourceKind)) {
78-
validateTemplate(rootNode);
79-
} else {
80-
validate(content);
81-
}
62+
validate(content);
8263
} catch (Exception | AssertionError e) {
8364
throw new AssertionError("Invalid example yaml in " + f.getPath() + ": " + e.getMessage(), e);
8465
}
@@ -89,7 +70,7 @@ private void validate(String content) {
8970
// This uses a custom deserializer which knows about all the built-in
9071
// k8s and os kinds, plus the custom kinds registered via Crds
9172
// But the custom deserializer always allows unknown properties
92-
KubernetesResource resource = TestUtils.fromYamlString(content, KubernetesResource.class, false);
73+
KubernetesResource resource = new KubernetesClientBuilder().build().getKubernetesSerialization().convertValue(content, KubernetesResource.class);
9374
recurseForAdditionalProperties(new Stack(), resource);
9475
}
9576

@@ -144,48 +125,4 @@ private boolean isGetter(Method method) {
144125
&& !Modifier.isStatic(method.getModifiers())
145126
&& !method.getReturnType().equals(void.class);
146127
}
147-
148-
private void validateTemplate(JsonNode rootNode) throws JsonProcessingException {
149-
JsonNode parameters = rootNode.get("parameters");
150-
Map<String, Object> params = new HashMap<>();
151-
for (JsonNode parameter : parameters) {
152-
String name = parameter.get("name").asText();
153-
Object value;
154-
JsonNode valueNode = parameter.get("value");
155-
switch (valueNode.getNodeType()) {
156-
case NULL:
157-
value = null;
158-
break;
159-
case NUMBER:
160-
case BOOLEAN:
161-
value = valueNode.toString();
162-
break;
163-
case STRING:
164-
value = valueNode.asText();
165-
break;
166-
default:
167-
throw new RuntimeException("Unsupported JSON type " + valueNode.getNodeType());
168-
}
169-
params.put(name, value);
170-
}
171-
for (JsonNode object : rootNode.get("objects")) {
172-
String s = new YAMLMapper().enable(YAMLGenerator.Feature.MINIMIZE_QUOTES).enable(SerializationFeature.INDENT_OUTPUT).writeValueAsString(object);
173-
Matcher matcher = PARAMETER_PATTERN.matcher(s);
174-
StringBuilder sb = new StringBuilder();
175-
int last = 0;
176-
while (matcher.find()) {
177-
sb.append(s, last, matcher.start());
178-
String paramName = matcher.group(1);
179-
sb.append(params.get(paramName));
180-
last = matcher.end();
181-
}
182-
sb.append(s.substring(last));
183-
String yamlContent = sb.toString();
184-
validate(yamlContent);
185-
}
186-
}
187-
188-
private String getKind(JsonNode rootNode) {
189-
return rootNode.get("kind").asText();
190-
}
191128
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/Main.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import io.fabric8.kubernetes.api.model.rbac.ClusterRole;
99
import io.fabric8.kubernetes.client.KubernetesClient;
1010
import io.micrometer.prometheus.PrometheusMeterRegistry;
11-
import io.strimzi.api.kafka.Crds;
1211
import io.strimzi.certs.OpenSslCertManager;
1312
import io.strimzi.operator.cluster.leaderelection.LeaderElectionManager;
1413
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderFactory;
@@ -60,14 +59,6 @@ public class Main {
6059

6160
private static final int HEALTH_SERVER_PORT = 8080;
6261

63-
static {
64-
try {
65-
Crds.registerCustomKinds();
66-
} catch (Error | RuntimeException t) {
67-
t.printStackTrace();
68-
}
69-
}
70-
7162
/**
7263
* The main method used to run the Cluster Operator
7364
*

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,15 @@ protected Future<KafkaBridgeStatus> createOrUpdate(Reconciliation reconciliation
9191
LOGGER.debugCr(reconciliation, "Updating Kafka Bridge cluster");
9292
kafkaBridgeServiceAccount(reconciliation, namespace, bridge)
9393
.compose(i -> bridgeInitClusterRoleBinding(reconciliation, initCrbName, initCrb))
94-
.compose(i -> deploymentOperations.scaleDown(reconciliation, namespace, bridge.getComponentName(), bridge.getReplicas()))
94+
.compose(i -> deploymentOperations.scaleDown(reconciliation, namespace, bridge.getComponentName(), bridge.getReplicas(), operationTimeoutMs))
9595
.compose(scale -> serviceOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.serviceName(bridge.getCluster()), bridge.generateService()))
9696
.compose(i -> MetricsAndLoggingUtils.metricsAndLogging(reconciliation, configMapOperations, bridge.logging(), null))
9797
.compose(metricsAndLogging -> configMapOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.metricsAndLogConfigMapName(reconciliation.name()), bridge.generateMetricsAndLogConfigMap(metricsAndLogging)))
9898
.compose(i -> pfa.hasPodDisruptionBudgetV1() ? podDisruptionBudgetOperator.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generatePodDisruptionBudget()) : Future.succeededFuture())
9999
.compose(i -> !pfa.hasPodDisruptionBudgetV1() ? podDisruptionBudgetV1Beta1Operator.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generatePodDisruptionBudgetV1Beta1()) : Future.succeededFuture())
100100
.compose(i -> Util.authTlsHash(secretOperations, namespace, auth, trustedCertificates))
101101
.compose(hash -> deploymentOperations.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generateDeployment(Collections.singletonMap(Annotations.ANNO_STRIMZI_AUTH_HASH, Integer.toString(hash)), pfa.isOpenshift(), imagePullPolicy, imagePullSecrets)))
102-
.compose(i -> deploymentOperations.scaleUp(reconciliation, namespace, bridge.getComponentName(), bridge.getReplicas()))
102+
.compose(i -> deploymentOperations.scaleUp(reconciliation, namespace, bridge.getComponentName(), bridge.getReplicas(), operationTimeoutMs))
103103
.compose(i -> deploymentOperations.waitForObserved(reconciliation, namespace, bridge.getComponentName(), 1_000, operationTimeoutMs))
104104
.compose(i -> bridgeHasZeroReplicas ? Future.succeededFuture() : deploymentOperations.readiness(reconciliation, namespace, bridge.getComponentName(), 1_000, operationTimeoutMs))
105105
.onComplete(reconciliationResult -> {

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,9 +277,9 @@ private Future<Void> reconcileDeployment(Reconciliation reconciliation,
277277
Map<String, String> deploymentAnnotations,
278278
String customContainerImage,
279279
boolean hasZeroReplicas) {
280-
return deploymentOperations.scaleDown(reconciliation, reconciliation.namespace(), connect.getComponentName(), connect.getReplicas())
280+
return deploymentOperations.scaleDown(reconciliation, reconciliation.namespace(), connect.getComponentName(), connect.getReplicas(), operationTimeoutMs)
281281
.compose(i -> deploymentOperations.reconcile(reconciliation, reconciliation.namespace(), connect.getComponentName(), connect.generateDeployment(connect.getReplicas(), deploymentAnnotations, podAnnotations, pfa.isOpenshift(), imagePullPolicy, imagePullSecrets, customContainerImage)))
282-
.compose(i -> deploymentOperations.scaleUp(reconciliation, reconciliation.namespace(), connect.getComponentName(), connect.getReplicas()))
282+
.compose(i -> deploymentOperations.scaleUp(reconciliation, reconciliation.namespace(), connect.getComponentName(), connect.getReplicas(), operationTimeoutMs))
283283
.compose(i -> deploymentOperations.waitForObserved(reconciliation, reconciliation.namespace(), connect.getComponentName(), 1_000, operationTimeoutMs))
284284
.compose(i -> hasZeroReplicas ? Future.succeededFuture() : deploymentOperations.readiness(reconciliation, reconciliation.namespace(), connect.getComponentName(), 1_000, operationTimeoutMs));
285285
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectMigration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ private Future<Void> moveOnePodFromDeploymentToStrimziPodSetWithRecreateStrategy
185185
*/
186186
private Future<Void> scaleDownDeployment(int replicas) {
187187
LOGGER.infoCr(reconciliation, "Scaling down Deployment {}", connect.getComponentName());
188-
return deploymentOperator.scaleDown(reconciliation, reconciliation.namespace(), connect.getComponentName(), replicas)
188+
return deploymentOperator.scaleDown(reconciliation, reconciliation.namespace(), connect.getComponentName(), replicas, operationTimeoutMs)
189189
.compose(i -> deploymentOperator.waitForObserved(reconciliation, reconciliation.namespace(), connect.getComponentName(), 1_000, operationTimeoutMs))
190190
.compose(i -> deploymentOperator.readiness(reconciliation, reconciliation.namespace(), connect.getComponentName(), 1_000, operationTimeoutMs));
191191
}
@@ -290,7 +290,7 @@ private Future<Void> moveOnePodFromStrimziPodSetToDeploymentWithRecreateStrategy
290290
*/
291291
private Future<Void> scaleUpDeployment(int replicas) {
292292
LOGGER.infoCr(reconciliation, "Scaling up Deployment {}", connect.getComponentName());
293-
return deploymentOperator.scaleUp(reconciliation, reconciliation.namespace(), connect.getComponentName(), replicas)
293+
return deploymentOperator.scaleUp(reconciliation, reconciliation.namespace(), connect.getComponentName(), replicas, operationTimeoutMs)
294294
.compose(i -> deploymentOperator.waitForObserved(reconciliation, reconciliation.namespace(), connect.getComponentName(), 1_000, operationTimeoutMs))
295295
.compose(i -> deploymentOperator.readiness(reconciliation, reconciliation.namespace(), connect.getComponentName(), 1_000, operationTimeoutMs));
296296
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,9 @@ private Future<Void> reconcileDeployment(Reconciliation reconciliation,
267267
KafkaMirrorMaker2Cluster mirrorMaker2Cluster,
268268
Map<String, String> podAnnotations,
269269
boolean hasZeroReplicas) {
270-
return deploymentOperations.scaleDown(reconciliation, reconciliation.namespace(), mirrorMaker2Cluster.getComponentName(), mirrorMaker2Cluster.getReplicas())
270+
return deploymentOperations.scaleDown(reconciliation, reconciliation.namespace(), mirrorMaker2Cluster.getComponentName(), mirrorMaker2Cluster.getReplicas(), operationTimeoutMs)
271271
.compose(i -> deploymentOperations.reconcile(reconciliation, reconciliation.namespace(), mirrorMaker2Cluster.getComponentName(), mirrorMaker2Cluster.generateDeployment(mirrorMaker2Cluster.getReplicas(), null, podAnnotations, pfa.isOpenshift(), imagePullPolicy, imagePullSecrets, null)))
272-
.compose(i -> deploymentOperations.scaleUp(reconciliation, reconciliation.namespace(), mirrorMaker2Cluster.getComponentName(), mirrorMaker2Cluster.getReplicas()))
272+
.compose(i -> deploymentOperations.scaleUp(reconciliation, reconciliation.namespace(), mirrorMaker2Cluster.getComponentName(), mirrorMaker2Cluster.getReplicas(), operationTimeoutMs))
273273
.compose(i -> deploymentOperations.waitForObserved(reconciliation, reconciliation.namespace(), mirrorMaker2Cluster.getComponentName(), 1_000, operationTimeoutMs))
274274
.compose(i -> hasZeroReplicas ? Future.succeededFuture() : deploymentOperations.readiness(reconciliation, reconciliation.namespace(), mirrorMaker2Cluster.getComponentName(), 1_000, operationTimeoutMs));
275275
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ protected Future<KafkaMirrorMakerStatus> createOrUpdate(Reconciliation reconcili
100100

101101
LOGGER.debugCr(reconciliation, "Updating Kafka Mirror Maker cluster");
102102
mirrorMakerServiceAccount(reconciliation, namespace, mirror)
103-
.compose(i -> deploymentOperations.scaleDown(reconciliation, namespace, mirror.getComponentName(), mirror.getReplicas()))
103+
.compose(i -> deploymentOperations.scaleDown(reconciliation, namespace, mirror.getComponentName(), mirror.getReplicas(), operationTimeoutMs))
104104
.compose(i -> MetricsAndLoggingUtils.metricsAndLogging(reconciliation, configMapOperations, mirror.logging(), mirror.metrics()))
105105
.compose(metricsAndLoggingCm -> {
106106
ConfigMap logAndMetricsConfigMap = mirror.generateMetricsAndLogConfigMap(metricsAndLoggingCm);
@@ -118,7 +118,7 @@ protected Future<KafkaMirrorMakerStatus> createOrUpdate(Reconciliation reconcili
118118
return Future.succeededFuture();
119119
})
120120
.compose(i -> deploymentOperations.reconcile(reconciliation, namespace, mirror.getComponentName(), mirror.generateDeployment(annotations, pfa.isOpenshift(), imagePullPolicy, imagePullSecrets)))
121-
.compose(i -> deploymentOperations.scaleUp(reconciliation, namespace, mirror.getComponentName(), mirror.getReplicas()))
121+
.compose(i -> deploymentOperations.scaleUp(reconciliation, namespace, mirror.getComponentName(), mirror.getReplicas(), operationTimeoutMs))
122122
.compose(i -> deploymentOperations.waitForObserved(reconciliation, namespace, mirror.getComponentName(), 1_000, operationTimeoutMs))
123123
.compose(i -> mirrorHasZeroReplicas ? Future.succeededFuture() : deploymentOperations.readiness(reconciliation, namespace, mirror.getComponentName(), 1_000, operationTimeoutMs))
124124
.onComplete(reconciliationResult -> {

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetController.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ protected void startController() {
173173
kafkaInformer.start();
174174
kafkaConnectInformer.start();
175175
kafkaMirrorMaker2Informer.start();
176+
177+
strimziPodSetInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("StrimziPodSet", t, stop));
178+
podInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("Pod", t, stop));
179+
kafkaInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("Kafka", t, stop));
180+
kafkaConnectInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("KafkaConnect", t, stop));
181+
kafkaMirrorMaker2Informer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("KafkaMirrorMaker2", t, stop));
176182
}
177183

178184
protected void stopController() {

0 commit comments

Comments
 (0)