Skip to content

Commit c9a1502

Browse files
Add support for start delay to the time skipping test server (#2462)
Add support start delay
1 parent ad4a426 commit c9a1502

File tree

6 files changed

+138
-6
lines changed

6 files changed

+138
-6
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ jobs:
7878
--port 7233 \
7979
--http-port 7243 \
8080
--namespace UnitTest \
81+
--db-filename temporal.sqlite \
82+
--sqlite-pragma journal_mode=WAL \
83+
--sqlite-pragma synchronous=OFF \
8184
--search-attribute CustomKeywordField=Keyword \
8285
--search-attribute CustomStringField=Text \
8386
--search-attribute CustomTextField=Text \
@@ -96,6 +99,7 @@ jobs:
9699
--dynamic-config-value frontend.workerVersioningRuleAPIs=true \
97100
--dynamic-config-value worker.removableBuildIdDurationSinceDefault=true \
98101
--dynamic-config-value matching.useNewMatcher=true \
102+
--dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \
99103
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
100104
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true &
101105
sleep 10s
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.client.functional;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.temporal.api.common.v1.WorkflowExecution;
26+
import io.temporal.api.history.v1.HistoryEvent;
27+
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
28+
import io.temporal.client.WorkflowOptions;
29+
import io.temporal.client.WorkflowStub;
30+
import io.temporal.common.WorkflowExecutionHistory;
31+
import io.temporal.internal.common.ProtobufTimeUtils;
32+
import io.temporal.testing.internal.SDKTestOptions;
33+
import io.temporal.testing.internal.SDKTestWorkflowRule;
34+
import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.*;
35+
import java.time.Duration;
36+
import java.util.List;
37+
import java.util.stream.Collectors;
38+
import org.junit.Rule;
39+
import org.junit.Test;
40+
41+
public class StartDelayTest {
42+
43+
@Rule
44+
public SDKTestWorkflowRule testWorkflowRule =
45+
SDKTestWorkflowRule.newBuilder()
46+
.setWorkflowTypes(TestNoArgsWorkflowsFuncImpl.class)
47+
.setUseTimeskipping(false)
48+
.build();
49+
50+
@Test
51+
public void startWithDelay() {
52+
WorkflowOptions workflowOptions =
53+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
54+
.setStartDelay(Duration.ofSeconds(1))
55+
.build();
56+
TestNoArgsWorkflowFunc stubF =
57+
testWorkflowRule
58+
.getWorkflowClient()
59+
.newWorkflowStub(TestNoArgsWorkflowFunc.class, workflowOptions);
60+
long start = System.currentTimeMillis();
61+
stubF.func();
62+
long end = System.currentTimeMillis();
63+
// Assert that the workflow took at least 5 seconds to start
64+
assertEquals(1000, end - start, 500);
65+
WorkflowExecution workflowExecution = WorkflowStub.fromTyped(stubF).getExecution();
66+
WorkflowExecutionHistory workflowExecutionHistory =
67+
testWorkflowRule.getWorkflowClient().fetchHistory(workflowExecution.getWorkflowId());
68+
List<WorkflowExecutionStartedEventAttributes> workflowExecutionStartedEvents =
69+
workflowExecutionHistory.getEvents().stream()
70+
.filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes)
71+
.map(x -> x.getWorkflowExecutionStartedEventAttributes())
72+
.collect(Collectors.toList());
73+
assertEquals(1, workflowExecutionStartedEvents.size());
74+
assertEquals(
75+
Duration.ofSeconds(1),
76+
ProtobufTimeUtils.toJavaDuration(
77+
workflowExecutionStartedEvents.get(0).getFirstWorkflowTaskBackoff()));
78+
}
79+
80+
public static class TestNoArgsWorkflowsFuncImpl implements TestNoArgsWorkflowFunc {
81+
82+
@Override
83+
public String func() {
84+
85+
return "done";
86+
}
87+
88+
@Override
89+
public String update() {
90+
throw new UnsupportedOperationException();
91+
}
92+
}
93+
}

temporal-sdk/src/test/java/io/temporal/workflow/WorkflowRetryTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,24 @@
2222

2323
import static org.junit.Assert.assertEquals;
2424

25+
import io.temporal.api.common.v1.WorkflowExecution;
26+
import io.temporal.api.history.v1.HistoryEvent;
27+
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
2528
import io.temporal.client.WorkflowException;
29+
import io.temporal.client.WorkflowStub;
2630
import io.temporal.common.RetryOptions;
31+
import io.temporal.common.WorkflowExecutionHistory;
2732
import io.temporal.failure.ApplicationFailure;
33+
import io.temporal.internal.common.ProtobufTimeUtils;
2834
import io.temporal.testing.internal.SDKTestOptions;
2935
import io.temporal.testing.internal.SDKTestWorkflowRule;
3036
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
3137
import java.time.Duration;
38+
import java.util.List;
3239
import java.util.Map;
3340
import java.util.concurrent.ConcurrentHashMap;
3441
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.stream.Collectors;
3543
import org.junit.Assert;
3644
import org.junit.Rule;
3745
import org.junit.Test;
@@ -77,6 +85,20 @@ public void testWorkflowRetry() {
7785
long elapsed = testWorkflowRule.getTestEnvironment().currentTimeMillis() - start;
7886
Assert.assertTrue(
7987
String.valueOf(elapsed), elapsed >= 2000); // Ensure that retry delays the restart
88+
// Verify that the first workflow task backoff is set to 1 second
89+
WorkflowExecution workflowExecution = WorkflowStub.fromTyped(workflowStub).getExecution();
90+
WorkflowExecutionHistory workflowExecutionHistory =
91+
testWorkflowRule.getWorkflowClient().fetchHistory(workflowExecution.getWorkflowId());
92+
List<WorkflowExecutionStartedEventAttributes> workflowExecutionStartedEvents =
93+
workflowExecutionHistory.getEvents().stream()
94+
.filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes)
95+
.map(x -> x.getWorkflowExecutionStartedEventAttributes())
96+
.collect(Collectors.toList());
97+
assertEquals(1, workflowExecutionStartedEvents.size());
98+
assertEquals(
99+
Duration.ofSeconds(1),
100+
ProtobufTimeUtils.toJavaDuration(
101+
workflowExecutionStartedEvents.get(0).getFirstWorkflowTaskBackoff()));
80102
}
81103
}
82104

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,14 +1313,16 @@ private static void startWorkflow(
13131313
}
13141314
data.retryState.ifPresent(
13151315
testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
1316+
13161317
a.setFirstExecutionRunId(data.firstExecutionRunId);
13171318
a.setOriginalExecutionRunId(data.originalExecutionRunId);
13181319
data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
13191320
if (data.lastCompletionResult != null) {
13201321
a.setLastCompletionResult(data.lastCompletionResult);
13211322
}
1322-
if (request.hasWorkflowStartDelay()) {
1323-
a.setFirstWorkflowTaskBackoff(request.getWorkflowStartDelay());
1323+
1324+
if (data.backoffStartInterval != null) {
1325+
a.setFirstWorkflowTaskBackoff(data.backoffStartInterval);
13241326
}
13251327
data.lastFailure.ifPresent(a::setContinuedFailure);
13261328
if (request.hasMemo()) {

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,9 +1525,9 @@ private void processFailWorkflowExecution(
15251525
continueAsNewAttr.setMemo(startRequest.getMemo());
15261526
}
15271527
// TODO
1528-
ContinueAsNewWorkflowExecutionCommandAttributes coninueAsNewCommand =
1528+
ContinueAsNewWorkflowExecutionCommandAttributes continueAsNewCommand =
15291529
continueAsNewAttr.build();
1530-
workflow.action(Action.CONTINUE_AS_NEW, ctx, coninueAsNewCommand, workflowTaskCompletedId);
1530+
workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewCommand, workflowTaskCompletedId);
15311531
workflowTaskStateMachine.getData().workflowCompleted = true;
15321532
HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
15331533
WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes =
@@ -1537,7 +1537,7 @@ private void processFailWorkflowExecution(
15371537
Optional.of(rs.getNextAttempt(Optional.of(failure)));
15381538
service.continueAsNew(
15391539
startRequest,
1540-
coninueAsNewCommand,
1540+
continueAsNewCommand,
15411541
continuedAsNewEventAttributes,
15421542
continuedRetryState,
15431543
identity,

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,16 @@ public void startWorkflowExecution(
231231
StartWorkflowExecutionRequest request,
232232
StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
233233
try {
234+
if (!request.getCronSchedule().isEmpty() && request.hasWorkflowStartDelay()) {
235+
throw Status.INVALID_ARGUMENT
236+
.withDescription(
237+
"INVALID_ARGUMENT: CronSchedule and WorkflowStartDelay may not be used together.")
238+
.asRuntimeException();
239+
}
234240
Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
241+
if (request.hasWorkflowStartDelay()) {
242+
backoffInterval = ProtobufTimeUtils.toJavaDuration(request.getWorkflowStartDelay());
243+
}
235244
StartWorkflowExecutionResponse response =
236245
startWorkflowExecutionImpl(
237246
request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
@@ -1453,8 +1462,10 @@ public void signalWithStartWorkflowExecution(
14531462
if (r.hasSearchAttributes()) {
14541463
startRequest.setSearchAttributes(r.getSearchAttributes());
14551464
}
1465+
Duration backoffInterval = Duration.ZERO;
14561466
if (r.hasWorkflowStartDelay()) {
14571467
startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay());
1468+
backoffInterval = ProtobufTimeUtils.toJavaDuration(r.getWorkflowStartDelay());
14581469
}
14591470
if (!r.getLinksList().isEmpty()) {
14601471
startRequest.addAllLinks(r.getLinksList());
@@ -1463,7 +1474,7 @@ public void signalWithStartWorkflowExecution(
14631474
StartWorkflowExecutionResponse startResult =
14641475
startWorkflowExecutionImpl(
14651476
startRequest.build(),
1466-
Duration.ZERO,
1477+
backoffInterval,
14671478
Optional.empty(),
14681479
OptionalLong.empty(),
14691480
ms -> {

0 commit comments

Comments
 (0)