Skip to content

Commit e88c7de

Browse files
[release-1.16] Thread safe improvements on OIDCDiscoveryConfigListener (#4160)
* usage of CopyOnWriteArrayList/AtomicRef Signed-off-by: Matthias Wessendorf <mwessend@redhat.com> * using concurrent hash map and atomic integer for ids Signed-off-by: Matthias Wessendorf <mwessend@redhat.com> --------- Signed-off-by: Matthias Wessendorf <mwessend@redhat.com> Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
1 parent 4dace1e commit e88c7de

File tree

1 file changed

+20
-19
lines changed

1 file changed

+20
-19
lines changed

data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import io.vertx.core.Vertx;
2121
import java.io.File;
2222
import java.io.IOException;
23-
import java.util.ArrayList;
24-
import java.util.List;
25-
import java.util.Objects;
23+
import java.util.concurrent.ConcurrentHashMap;
2624
import java.util.concurrent.ExecutionException;
2725
import java.util.concurrent.TimeUnit;
2826
import java.util.concurrent.TimeoutException;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
import java.util.concurrent.atomic.AtomicReference;
2929
import java.util.function.Consumer;
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
@@ -36,24 +36,27 @@ public class OIDCDiscoveryConfigListener implements AutoCloseable {
3636
private final Vertx vertx;
3737
private final FileWatcher configFeaturesWatcher;
3838
private final int timeoutSeconds;
39-
private List<Consumer<OIDCDiscoveryConfig>> callbacks;
40-
private OIDCDiscoveryConfig oidcDiscoveryConfig;
39+
private final ConcurrentHashMap<Integer, Consumer<OIDCDiscoveryConfig>> callbacks;
40+
private final AtomicReference<OIDCDiscoveryConfig> oidcDiscoveryConfig;
41+
private final AtomicInteger callbackIdGenerator;
4142

4243
public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int timeoutSeconds) throws IOException {
4344
this.featuresConfigPath = featuresConfigPath;
4445
this.vertx = vertx;
4546
this.timeoutSeconds = timeoutSeconds;
47+
this.oidcDiscoveryConfig = new AtomicReference<>();
48+
this.callbacks = new ConcurrentHashMap<>();
49+
this.callbackIdGenerator = new AtomicInteger(0);
4650

4751
this.buildFeaturesAndOIDCDiscoveryConfig();
4852

4953
this.configFeaturesWatcher =
5054
new FileWatcher(new File(featuresConfigPath + "/" + FeaturesConfig.KEY_AUTHENTICATION_OIDC), () -> {
51-
if (this.oidcDiscoveryConfig == null) {
55+
if (this.oidcDiscoveryConfig.get() == null) {
5256
this.buildFeaturesAndOIDCDiscoveryConfig();
53-
if (this.oidcDiscoveryConfig != null && this.callbacks != null) {
54-
this.callbacks.stream()
55-
.filter(Objects::nonNull)
56-
.forEach(c -> c.accept(this.oidcDiscoveryConfig));
57+
OIDCDiscoveryConfig config = this.oidcDiscoveryConfig.get();
58+
if (config != null) {
59+
this.callbacks.values().forEach(callback -> callback.accept(config));
5760
}
5861
}
5962
});
@@ -62,27 +65,25 @@ public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int t
6265
}
6366

6467
public OIDCDiscoveryConfig getOidcDiscoveryConfig() {
65-
return oidcDiscoveryConfig;
68+
return oidcDiscoveryConfig.get();
6669
}
6770

6871
public int registerCallback(Consumer<OIDCDiscoveryConfig> callback) {
69-
if (this.callbacks == null) {
70-
this.callbacks = new ArrayList<>();
71-
}
72-
73-
this.callbacks.add(callback);
74-
return this.callbacks.size() - 1;
72+
int id = callbackIdGenerator.incrementAndGet();
73+
this.callbacks.put(id, callback);
74+
return id;
7575
}
7676

7777
public void deregisterCallback(int callbackId) {
78-
this.callbacks.set(callbackId, null);
78+
this.callbacks.remove(callbackId);
7979
}
8080

8181
private void buildOIDCDiscoveryConfig() throws ExecutionException, InterruptedException, TimeoutException {
82-
this.oidcDiscoveryConfig = OIDCDiscoveryConfig.build(this.vertx)
82+
OIDCDiscoveryConfig config = OIDCDiscoveryConfig.build(this.vertx)
8383
.toCompletionStage()
8484
.toCompletableFuture()
8585
.get(this.timeoutSeconds, TimeUnit.SECONDS);
86+
this.oidcDiscoveryConfig.set(config);
8687
}
8788

8889
private void buildFeaturesAndOIDCDiscoveryConfig() {

0 commit comments

Comments
 (0)