diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java index 60fc46880..c223596da 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java @@ -156,6 +156,11 @@ public Object getLastHeartbeatValue() { return heartbeatContext.getLastHeartbeatDetails(); } + @Override + public void cancelOutstandingHeartbeat() { + heartbeatContext.cancelOutstandingHeartbeat(); + } + @Override public WorkflowClient getWorkflowClient() { return client; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java index 672778126..a77838193 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java @@ -128,6 +128,12 @@ public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metri metricsScope, local, dataConverterWithActivityContext); + } finally { + if (!context.isDoNotCompleteOnReturn()) { + // if the activity is not completed, we need to cancel the heartbeat + // to avoid sending it after the activity is completed + context.cancelOutstandingHeartbeat(); + } } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java index aadb76fef..8a8fa3338 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java @@ -17,4 +17,7 @@ interface HeartbeatContext { Optional getHeartbeatDetails(Class detailsClass, Type detailsGenericType); Object getLastHeartbeatDetails(); + + /** Cancel any pending heartbeat and discard cached heartbeat details. */ + void cancelOutstandingHeartbeat(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java index 1482fb0ea..b53e943fc 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java @@ -143,6 +143,20 @@ public Object getLastHeartbeatDetails() { } } + @Override + public void cancelOutstandingHeartbeat() { + lock.lock(); + try { + if (scheduledHeartbeat != null) { + scheduledHeartbeat.cancel(false); + scheduledHeartbeat = null; + } + hasOutstandingHeartbeat = false; + } finally { + lock.unlock(); + } + } + private void doHeartBeatLocked(Object details) { long nextHeartbeatDelay; try { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/InternalActivityExecutionContext.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/InternalActivityExecutionContext.java index 2468a0629..1b65e32cd 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/InternalActivityExecutionContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/InternalActivityExecutionContext.java @@ -9,4 +9,7 @@ public interface InternalActivityExecutionContext extends ActivityExecutionContext { /** Get the latest value of {@link ActivityExecutionContext#heartbeat(Object)}. */ Object getLastHeartbeatValue(); + + /** Cancel any pending heartbeat and discard cached heartbeat details. */ + void cancelOutstandingHeartbeat(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java index 244f9ce92..225bf4e20 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java @@ -79,6 +79,11 @@ public Object getLastHeartbeatValue() { return null; } + @Override + public void cancelOutstandingHeartbeat() { + // Ignored + } + @Override public WorkflowClient getWorkflowClient() { return client; diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java index 5b73f30d5..1079da21a 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java @@ -49,11 +49,9 @@ */ @RunWith(JUnitParamsRunner.class) public class ActivityTimeoutTest { - // TODO This test takes longer than it should to complete because - // of the cached heartbeat that prevents a quick shutdown @Rule public SDKTestWorkflowRule testWorkflowRule = - SDKTestWorkflowRule.newBuilder().setTestTimeoutSeconds(15).setDoNotStart(true).build(); + SDKTestWorkflowRule.newBuilder().setDoNotStart(true).build(); /** * An activity reaches startToClose timeout once, max retries are set to 1. o