Skip to content

Commit 8808c40

Browse files
Add support for activity pause (#2476)
Add support for activity pause
1 parent 78a766f commit 8808c40

File tree

5 files changed

+160
-1
lines changed

5 files changed

+160
-1
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ jobs:
102102
--dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \
103103
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
104104
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true \
105+
--dynamic-config-value frontend.activityAPIsEnabled=true \
105106
--dynamic-config-value system.enableDeploymentVersions=true &
106107
sleep 10s
107108
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.client;
22+
23+
import io.temporal.activity.ActivityInfo;
24+
25+
/***
26+
* Indicates that the activity was paused by the user.
27+
*
28+
* <p>Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.<br>
29+
*/
30+
public final class ActivityPausedException extends ActivityCompletionException {
31+
public ActivityPausedException(ActivityInfo info) {
32+
super(info);
33+
}
34+
35+
public ActivityPausedException() {
36+
super();
37+
}
38+
}

temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
/**
2828
* Indicates that {@link WorkerFactory#shutdown()} or {@link WorkerFactory#shutdownNow()} was
29-
* called. It is OK to ignore the exception to let activity to complete. It assumes that {@link
29+
* called. It is OK to ignore the exception to let the activity complete. It assumes that {@link
3030
* WorkerFactory#awaitTermination(long, TimeUnit)} is called with a timeout larger than the activity
3131
* execution time.
3232
*/

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ private void sendHeartbeatRequest(Object details) {
216216
metricsScope);
217217
if (status.getCancelRequested()) {
218218
lastException = new ActivityCanceledException(info);
219+
} else if (status.getActivityPaused()) {
220+
lastException = new ActivityPausedException(info);
219221
} else {
220222
lastException = null;
221223
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.activity;
22+
23+
import static org.junit.Assume.assumeTrue;
24+
25+
import io.temporal.api.common.v1.WorkflowExecution;
26+
import io.temporal.api.workflow.v1.PendingActivityInfo;
27+
import io.temporal.api.workflowservice.v1.PauseActivityRequest;
28+
import io.temporal.client.ActivityPausedException;
29+
import io.temporal.client.WorkflowStub;
30+
import io.temporal.testing.internal.SDKTestOptions;
31+
import io.temporal.testing.internal.SDKTestWorkflowRule;
32+
import io.temporal.workflow.Async;
33+
import io.temporal.workflow.Workflow;
34+
import io.temporal.workflow.shared.TestActivities;
35+
import io.temporal.workflow.shared.TestWorkflows;
36+
import java.time.Duration;
37+
import org.junit.Assert;
38+
import org.junit.Rule;
39+
import org.junit.Test;
40+
41+
public class ActivityPauseTest {
42+
43+
@Rule
44+
public SDKTestWorkflowRule testWorkflowRule =
45+
SDKTestWorkflowRule.newBuilder()
46+
.setWorkflowTypes(TestWorkflowImpl.class)
47+
.setActivityImplementations(new HeartBeatingActivityImpl())
48+
.build();
49+
50+
@Test
51+
public void activityPause() {
52+
assumeTrue(
53+
"Test Server doesn't support activity pause", SDKTestWorkflowRule.useExternalService);
54+
55+
TestWorkflows.TestWorkflowReturnString workflow =
56+
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class);
57+
Assert.assertEquals("I am stopped by Pause", workflow.execute());
58+
Assert.assertEquals(
59+
1,
60+
WorkflowStub.fromTyped(workflow)
61+
.describe()
62+
.getRawDescription()
63+
.getPendingActivitiesCount());
64+
PendingActivityInfo activityInfo =
65+
WorkflowStub.fromTyped(workflow).describe().getRawDescription().getPendingActivities(0);
66+
Assert.assertTrue(activityInfo.getPaused());
67+
}
68+
69+
public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {
70+
71+
private final TestActivities.TestActivity1 activities =
72+
Workflow.newActivityStub(
73+
TestActivities.TestActivity1.class,
74+
SDKTestOptions.newActivityOptions20sScheduleToClose());
75+
76+
@Override
77+
public String execute() {
78+
Async.function(activities::execute, "");
79+
Workflow.sleep(Duration.ofSeconds(1));
80+
return activities.execute("CompleteOnPause");
81+
}
82+
}
83+
84+
public static class HeartBeatingActivityImpl implements TestActivities.TestActivity1 {
85+
@Override
86+
public String execute(String arg) {
87+
ActivityInfo info = Activity.getExecutionContext().getInfo();
88+
// Have the activity pause itself
89+
Activity.getExecutionContext()
90+
.getWorkflowClient()
91+
.getWorkflowServiceStubs()
92+
.blockingStub()
93+
.pauseActivity(
94+
PauseActivityRequest.newBuilder()
95+
.setNamespace(info.getNamespace())
96+
.setExecution(
97+
WorkflowExecution.newBuilder().setWorkflowId(info.getWorkflowId()).build())
98+
.setId(info.getActivityId())
99+
.build());
100+
while (true) {
101+
try {
102+
Thread.sleep(1000);
103+
// Heartbeat and verify that the correct exception is thrown
104+
Activity.getExecutionContext().heartbeat("1");
105+
} catch (ActivityPausedException pe) {
106+
if (arg.equals("CompleteOnPause")) {
107+
// An activity should be able to succeed if paused
108+
return "I am stopped by Pause";
109+
}
110+
// This will fail the attempt, and the activity will not be retried if not unpaused
111+
throw pe;
112+
} catch (InterruptedException e) {
113+
throw new RuntimeException(e);
114+
}
115+
}
116+
}
117+
}
118+
}

0 commit comments

Comments
 (0)