Skip to content

Cancel pending heartbeat when activity completes #2526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public Object getLastHeartbeatValue() {
return heartbeatContext.getLastHeartbeatDetails();
}

@Override
public void cancelOutstandingHeartbeat() {
heartbeatContext.cancelOutstandingHeartbeat();
}

@Override
public WorkflowClient getWorkflowClient() {
return client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metri
metricsScope,
local,
dataConverterWithActivityContext);
} finally {
if (!context.isDoNotCompleteOnReturn()) {
// if the activity is not completed, we need to cancel the heartbeat
// to avoid sending it after the activity is completed
context.cancelOutstandingHeartbeat();
Comment on lines +133 to +135
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify this a bit? So except for async-activity-completion, this is going to prevent a pending heartbeat from being recorded? Is there somewhere else in Java where the last heartbeat of a failing activity is guaranteed to be sent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there somewhere else in Java where the last heartbeat of a failing activity is guaranteed to be sent?

Yes, we send the last heartbeat value as part of the activity failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to double-extra confirm, this is guaranteed only for non-async-activity-complete situations where the task is already complete, and therefore this logic will never cause a lost heartbeat at the end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we never loose a heartbeat since we always send the last hearbeat as part of activity failure

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ interface HeartbeatContext {
<V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType);

Object getLastHeartbeatDetails();

/** Cancel any pending heartbeat and discard cached heartbeat details. */
void cancelOutstandingHeartbeat();
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ public Object getLastHeartbeatDetails() {
}
}

@Override
public void cancelOutstandingHeartbeat() {
lock.lock();
try {
if (scheduledHeartbeat != null) {
scheduledHeartbeat.cancel(false);
scheduledHeartbeat = null;
}
hasOutstandingHeartbeat = false;
} finally {
lock.unlock();
}
}

private void doHeartBeatLocked(Object details) {
long nextHeartbeatDelay;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@
public interface InternalActivityExecutionContext extends ActivityExecutionContext {
/** Get the latest value of {@link ActivityExecutionContext#heartbeat(Object)}. */
Object getLastHeartbeatValue();

/** Cancel any pending heartbeat and discard cached heartbeat details. */
void cancelOutstandingHeartbeat();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public Object getLastHeartbeatValue() {
return null;
}

@Override
public void cancelOutstandingHeartbeat() {
// Ignored
}

@Override
public WorkflowClient getWorkflowClient() {
return client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@
*/
@RunWith(JUnitParamsRunner.class)
public class ActivityTimeoutTest {
// TODO This test takes longer than it should to complete because
// of the cached heartbeat that prevents a quick shutdown
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder().setTestTimeoutSeconds(15).setDoNotStart(true).build();
SDKTestWorkflowRule.newBuilder().setDoNotStart(true).build();

/**
* An activity reaches startToClose timeout once, max retries are set to 1. o
Expand Down
Loading