Skip to content

Commit 779d90c

Browse files
Make StickyQueueBalancer synchronized (#2025)
Make StickyQueueBalancer synchronized to avoid potential race where all initial calls to `makePoll()` all get assigned the same task queue.
1 parent 26a8595 commit 779d90c

File tree

1 file changed

+16
-23
lines changed

1 file changed

+16
-23
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/StickyQueueBalancer.java

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,16 @@
2121
package io.temporal.internal.worker;
2222

2323
import io.temporal.api.enums.v1.TaskQueueKind;
24-
import java.util.concurrent.atomic.AtomicBoolean;
25-
import java.util.concurrent.atomic.AtomicInteger;
2624
import javax.annotation.concurrent.ThreadSafe;
2725

2826
@ThreadSafe
2927
public class StickyQueueBalancer {
3028
private final int pollersCount;
3129
private final boolean stickyQueueEnabled;
32-
private final AtomicInteger stickyPollers = new AtomicInteger(0);
33-
private final AtomicInteger normalPollers = new AtomicInteger(0);
34-
private final AtomicBoolean disableNormalPoll = new AtomicBoolean(false);
35-
36-
private volatile long stickyBacklogSize = 0;
30+
private int stickyPollers = 0;
31+
private int normalPollers = 0;
32+
private boolean disableNormalPoll = false;
33+
private long stickyBacklogSize = 0;
3734

3835
public StickyQueueBalancer(int pollersCount, boolean stickyQueueEnabled) {
3936
this.pollersCount = pollersCount;
@@ -43,35 +40,35 @@ public StickyQueueBalancer(int pollersCount, boolean stickyQueueEnabled) {
4340
/**
4441
* @return task queue kind that should be used for the next poll
4542
*/
46-
public TaskQueueKind makePoll() {
43+
public synchronized TaskQueueKind makePoll() {
4744
if (stickyQueueEnabled) {
48-
if (disableNormalPoll.get()) {
49-
stickyPollers.incrementAndGet();
45+
if (disableNormalPoll) {
46+
stickyPollers++;
5047
return TaskQueueKind.TASK_QUEUE_KIND_STICKY;
5148
}
5249
// If pollersCount >= stickyBacklogSize > 0 we want to go back to a normal ratio to avoid a
5350
// situation that too many pollers (all of them in the worst case) will open only sticky queue
5451
// polls observing a stickyBacklogSize == 1 for example (which actually can be 0 already at
5552
// that moment) and get stuck causing dip in worker load.
56-
if (stickyBacklogSize > pollersCount || stickyPollers.get() <= normalPollers.get()) {
57-
stickyPollers.incrementAndGet();
53+
if (stickyBacklogSize > pollersCount || stickyPollers <= normalPollers) {
54+
stickyPollers++;
5855
return TaskQueueKind.TASK_QUEUE_KIND_STICKY;
5956
}
6057
}
61-
normalPollers.incrementAndGet();
58+
normalPollers++;
6259
return TaskQueueKind.TASK_QUEUE_KIND_NORMAL;
6360
}
6461

6562
/**
6663
* @param taskQueueKind what kind of task queue poll was just finished
6764
*/
68-
public void finishPoll(TaskQueueKind taskQueueKind) {
65+
public synchronized void finishPoll(TaskQueueKind taskQueueKind) {
6966
switch (taskQueueKind) {
7067
case TASK_QUEUE_KIND_NORMAL:
71-
normalPollers.decrementAndGet();
68+
normalPollers--;
7269
break;
7370
case TASK_QUEUE_KIND_STICKY:
74-
stickyPollers.decrementAndGet();
71+
stickyPollers--;
7572
break;
7673
default:
7774
throw new IllegalArgumentException("Invalid task queue kind: " + taskQueueKind);
@@ -83,18 +80,14 @@ public void finishPoll(TaskQueueKind taskQueueKind) {
8380
* @param backlogSize backlog size from the poll response, helps to determine if the sticky queue
8481
* is backlogged
8582
*/
86-
public void finishPoll(TaskQueueKind taskQueueKind, long backlogSize) {
83+
public synchronized void finishPoll(TaskQueueKind taskQueueKind, long backlogSize) {
8784
finishPoll(taskQueueKind);
8885
if (TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(taskQueueKind)) {
8986
stickyBacklogSize = backlogSize;
9087
}
9188
}
9289

93-
public void disableNormalPoll() {
94-
disableNormalPoll.set(true);
95-
}
96-
97-
public int getNormalPollerCount() {
98-
return normalPollers.get();
90+
public synchronized void disableNormalPoll() {
91+
disableNormalPoll = true;
9992
}
10093
}

0 commit comments

Comments
 (0)