Skip to content

Commit 7f50752

Browse files
authored
Merge pull request #669 from cizezsy/fix/capacity-limit
Fix: remove the capacity limit of ProcessorListener
2 parents a11262f + e8d4413 commit 7f50752

File tree

2 files changed

+39
-9
lines changed

2 files changed

+39
-9
lines changed

util/src/main/java/io/kubernetes/client/informer/cache/ProcessorListener.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,20 @@
22

33
import io.kubernetes.client.informer.ResourceEventHandler;
44
import io.kubernetes.client.informer.exception.BadNotificationException;
5-
import java.util.concurrent.ArrayBlockingQueue;
65
import java.util.concurrent.BlockingQueue;
6+
import java.util.concurrent.LinkedBlockingQueue;
77
import org.joda.time.DateTime;
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
1010

1111
/**
1212
* ProcessorListener implements Runnable interface. It's supposed to run in background and actually
13-
* executes its event handler on notification. Note that it allows 1000 pending notifications at
14-
* maximum.
13+
* executes its event handler on notification.
1514
*/
1615
public class ProcessorListener<ApiType> implements Runnable {
1716

1817
private static final Logger log = LoggerFactory.getLogger(ProcessorListener.class);
1918

20-
private static final int DEFAULT_QUEUE_CAPACITY = 1000;
21-
2219
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
2320
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
2421
// informer's overall resync check period.
@@ -33,7 +30,7 @@ public ProcessorListener(ResourceEventHandler<ApiType> handler, long resyncPerio
3330
this.resyncPeriod = resyncPeriod;
3431
this.handler = handler;
3532

36-
this.queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
33+
this.queue = new LinkedBlockingQueue<>();
3734

3835
determineNextResync(DateTime.now());
3936
}
@@ -90,9 +87,7 @@ public void add(Notification<ApiType> obj) {
9087
if (obj == null) {
9188
return;
9289
}
93-
if (!this.queue.offer(obj)) {
94-
log.warn("notification queue full!");
95-
}
90+
this.queue.add(obj);
9691
}
9792

9893
public void determineNextResync(DateTime now) {

util/src/test/java/io/kubernetes/client/informer/cache/ProcessorListenerTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,39 @@ public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {
6060
assertTrue(updateNotificationReceived);
6161
assertTrue(deleteNotificationReceived);
6262
}
63+
64+
@Test
65+
public void testMultipleNotificationsHandling() throws InterruptedException {
66+
V1Pod pod = new V1Pod().metadata(new V1ObjectMeta().name("foo").namespace("default"));
67+
final int[] count = {0};
68+
69+
ProcessorListener<V1Pod> listener =
70+
new ProcessorListener<>(
71+
new ResourceEventHandler<V1Pod>() {
72+
@Override
73+
public void onAdd(V1Pod obj) {
74+
assertEquals(pod, obj);
75+
count[0]++;
76+
}
77+
78+
@Override
79+
public void onUpdate(V1Pod oldObj, V1Pod newObj) {}
80+
81+
@Override
82+
public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
83+
},
84+
0);
85+
86+
for (int i = 0; i < 2000; i++) {
87+
listener.add(new ProcessorListener.AddNotification<>(pod));
88+
}
89+
90+
Thread listenerThread = new Thread(listener);
91+
listenerThread.setDaemon(true);
92+
listenerThread.start();
93+
94+
Thread.sleep(2000);
95+
96+
assertEquals(count[0], 2000);
97+
}
6398
}

0 commit comments

Comments
 (0)