Skip to content

Commit ddda99b

Browse files
Fix ConcurrentModificationException in LocalActivityMeteringHelper (#2108)
1 parent 6da11b9 commit ddda99b

File tree

2 files changed

+47
-13
lines changed

2 files changed

+47
-13
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -422,30 +422,32 @@ public void cancel(HistoryEvent cancelEvent) {
422422
}
423423
}
424424

425-
private static class LocalActivityMeteringHelper {
425+
@VisibleForTesting
426+
static class LocalActivityMeteringHelper {
426427
private final Map<String, AtomicInteger> firstWftActivities = new HashMap<>();
427428
private final Map<String, AtomicInteger> nonFirstWftActivities = new HashMap<>();
428429
private final Set<String> completed = new HashSet<>();
429430

430-
private void newWFTStarting() {
431+
void newWFTStarting() {
431432
for (String activityId : firstWftActivities.keySet()) {
432-
AtomicInteger removed = firstWftActivities.remove(activityId);
433-
removed.set(0);
434-
nonFirstWftActivities.put(activityId, removed);
433+
AtomicInteger attemptCount = firstWftActivities.get(activityId);
434+
attemptCount.set(0);
435+
nonFirstWftActivities.put(activityId, attemptCount);
435436
}
437+
firstWftActivities.clear();
436438
}
437439

438-
private void addNewLocalActivity(ExecuteLocalActivityParameters params) {
440+
void addNewLocalActivity(ExecuteLocalActivityParameters params) {
439441
AtomicInteger attemptsDuringWFTCounter = new AtomicInteger(0);
440442
params.setOnNewAttemptCallback(attemptsDuringWFTCounter::incrementAndGet);
441443
firstWftActivities.put(params.getActivityId(), attemptsDuringWFTCounter);
442444
}
443445

444-
private void markLocalActivityComplete(String activityId) {
446+
void markLocalActivityComplete(String activityId) {
445447
completed.add(activityId);
446448
}
447449

448-
private int getNonfirstAttempts() {
450+
int getNonfirstAttempts() {
449451
int result =
450452
nonFirstWftActivities.values().stream()
451453
.map(ai -> ai.getAndSet(0))

temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerTaskHandlerTests.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,9 @@
3636
import io.temporal.api.history.v1.History;
3737
import io.temporal.api.history.v1.HistoryEvent;
3838
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
39-
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
40-
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
41-
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
42-
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
39+
import io.temporal.api.workflowservice.v1.*;
4340
import io.temporal.internal.common.InternalUtils;
41+
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
4442
import io.temporal.internal.worker.SingleWorkerOptions;
4543
import io.temporal.internal.worker.WorkflowExecutorCache;
4644
import io.temporal.internal.worker.WorkflowRunLockManager;
@@ -79,7 +77,6 @@ public void ifStickyExecutionAttributesAreNotSetThenWorkflowsAreNotCached() thro
7977
// Act
8078
WorkflowTaskHandler.Result result =
8179
taskHandler.handleWorkflowTask(HistoryUtils.generateWorkflowTaskWithInitialHistory());
82-
8380
// Assert
8481
assertEquals(0, cache.size());
8582
assertNotNull(result.getTaskCompleted());
@@ -142,6 +139,41 @@ public void workflowTaskFailOnIncompleteHistory() throws Throwable {
142139
result.getTaskFailed().getFailure().getMessage());
143140
}
144141

142+
@Test
143+
public void localActivityMeteringHelper() {
144+
ReplayWorkflowRunTaskHandler.LocalActivityMeteringHelper laMeteringHelper =
145+
new ReplayWorkflowRunTaskHandler.LocalActivityMeteringHelper();
146+
ExecuteLocalActivityParameters executeLA =
147+
new ExecuteLocalActivityParameters(
148+
PollActivityTaskQueueResponse.newBuilder().setActivityId("1"),
149+
null,
150+
0,
151+
null,
152+
false,
153+
null);
154+
laMeteringHelper.addNewLocalActivity(executeLA);
155+
laMeteringHelper.addNewLocalActivity(
156+
new ExecuteLocalActivityParameters(
157+
PollActivityTaskQueueResponse.newBuilder().setActivityId("2"),
158+
null,
159+
0,
160+
null,
161+
false,
162+
null));
163+
for (int i = 0; i < 5; i++) {
164+
executeLA.getOnNewAttemptCallback().apply();
165+
}
166+
// Verify retries are not counted for the first task
167+
assertEquals(0, laMeteringHelper.getNonfirstAttempts());
168+
laMeteringHelper.newWFTStarting();
169+
assertEquals(0, laMeteringHelper.getNonfirstAttempts());
170+
// Verify retries are counted for the non first task
171+
for (int i = 0; i < 5; i++) {
172+
executeLA.getOnNewAttemptCallback().apply();
173+
}
174+
assertEquals(5, laMeteringHelper.getNonfirstAttempts());
175+
}
176+
145177
@Test
146178
public void ifStickyExecutionAttributesAreSetThenWorkflowsAreCached() throws Throwable {
147179
assumeFalse("skipping for docker tests", SDKTestWorkflowRule.useExternalService);

0 commit comments

Comments
 (0)