Skip to content

Commit c1c97e8

Browse files
Cancel pending heartbeat when activity completes
1 parent 58a4200 commit c1c97e8

File tree

7 files changed

+37
-3
lines changed

7 files changed

+37
-3
lines changed

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ public Object getLastHeartbeatValue() {
156156
return heartbeatContext.getLastHeartbeatDetails();
157157
}
158158

159+
@Override
160+
public void cancelOutstandingHeartbeat() {
161+
heartbeatContext.cancelOutstandingHeartbeat();
162+
}
163+
159164
@Override
160165
public WorkflowClient getWorkflowClient() {
161166
return client;

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metri
128128
metricsScope,
129129
local,
130130
dataConverterWithActivityContext);
131+
} finally {
132+
if (!context.isDoNotCompleteOnReturn()) {
133+
// if the activity is not completed, we need to cancel the heartbeat
134+
// to avoid sending it after the activity is completed
135+
context.cancelOutstandingHeartbeat();
136+
}
131137
}
132138
}
133139

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,7 @@ interface HeartbeatContext {
1717
<V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType);
1818

1919
Object getLastHeartbeatDetails();
20+
21+
/** Cancel any pending heartbeat and discard cached heartbeat details. */
22+
void cancelOutstandingHeartbeat();
2023
}

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,20 @@ public Object getLastHeartbeatDetails() {
143143
}
144144
}
145145

146+
@Override
147+
public void cancelOutstandingHeartbeat() {
148+
lock.lock();
149+
try {
150+
if (scheduledHeartbeat != null) {
151+
scheduledHeartbeat.cancel(false);
152+
scheduledHeartbeat = null;
153+
}
154+
hasOutstandingHeartbeat = false;
155+
} finally {
156+
lock.unlock();
157+
}
158+
}
159+
146160
private void doHeartBeatLocked(Object details) {
147161
long nextHeartbeatDelay;
148162
try {

temporal-sdk/src/main/java/io/temporal/internal/activity/InternalActivityExecutionContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,7 @@
99
public interface InternalActivityExecutionContext extends ActivityExecutionContext {
1010
/** Get the latest value of {@link ActivityExecutionContext#heartbeat(Object)}. */
1111
Object getLastHeartbeatValue();
12+
13+
/** Cancel any pending heartbeat and discard cached heartbeat details. */
14+
void cancelOutstandingHeartbeat();
1215
}

temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ public Object getLastHeartbeatValue() {
7979
return null;
8080
}
8181

82+
@Override
83+
public void cancelOutstandingHeartbeat() {
84+
// Ignored
85+
}
86+
8287
@Override
8388
public WorkflowClient getWorkflowClient() {
8489
return client;

temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,9 @@
4949
*/
5050
@RunWith(JUnitParamsRunner.class)
5151
public class ActivityTimeoutTest {
52-
// TODO This test takes longer than it should to complete because
53-
// of the cached heartbeat that prevents a quick shutdown
5452
@Rule
5553
public SDKTestWorkflowRule testWorkflowRule =
56-
SDKTestWorkflowRule.newBuilder().setTestTimeoutSeconds(15).setDoNotStart(true).build();
54+
SDKTestWorkflowRule.newBuilder().setDoNotStart(true).build();
5755

5856
/**
5957
* An activity reaches startToClose timeout once, max retries are set to 1. o

0 commit comments

Comments
 (0)