Skip to content

Commit 3b26db7

Browse files
Make sure workflow_failed is incremented on NonDeterministicException (#2141)
Make sure workflow_failed is incremented on NonDeterministicException
1 parent 0f90334 commit 3b26db7

File tree

2 files changed

+175
-0
lines changed

2 files changed

+175
-0
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,10 +255,14 @@ private void applyServerHistory(long lastEventId, WorkflowHistoryIterator histor
255255
implementationOptions.getFailWorkflowExceptionTypes();
256256
for (Class<? extends Throwable> failType : failTypes) {
257257
if (failType.isAssignableFrom(e.getClass())) {
258+
metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
258259
throw new WorkflowExecutionException(
259260
workflow.getWorkflowContext().mapWorkflowExceptionToFailure(e));
260261
}
261262
}
263+
if (e instanceof WorkflowExecutionException) {
264+
metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
265+
}
262266
throw wrap(e);
263267
}
264268
if (!timerStopped && !workflowStateMachines.isReplaying()) {
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.worker;
22+
23+
import static org.junit.Assert.assertThrows;
24+
25+
import com.uber.m3.tally.RootScopeBuilder;
26+
import com.uber.m3.tally.Scope;
27+
import com.uber.m3.util.ImmutableMap;
28+
import io.temporal.api.common.v1.WorkflowExecution;
29+
import io.temporal.client.WorkflowClient;
30+
import io.temporal.client.WorkflowFailedException;
31+
import io.temporal.client.WorkflowOptions;
32+
import io.temporal.common.reporter.TestStatsReporter;
33+
import io.temporal.failure.ApplicationFailure;
34+
import io.temporal.testing.internal.SDKTestWorkflowRule;
35+
import io.temporal.worker.MetricsType;
36+
import io.temporal.worker.NonDeterministicException;
37+
import io.temporal.worker.WorkflowImplementationOptions;
38+
import io.temporal.workflow.SignalMethod;
39+
import io.temporal.workflow.Workflow;
40+
import io.temporal.workflow.WorkflowInterface;
41+
import io.temporal.workflow.WorkflowMethod;
42+
import java.time.Duration;
43+
import java.util.Map;
44+
import org.junit.Before;
45+
import org.junit.Rule;
46+
import org.junit.Test;
47+
48+
public class WorkflowFailedMetricsTests {
49+
private final TestStatsReporter reporter = new TestStatsReporter();
50+
private static boolean triggerNonDeterministicException = false;
51+
52+
Scope metricsScope =
53+
new RootScopeBuilder().reporter(reporter).reportEvery(com.uber.m3.util.Duration.ofMillis(1));
54+
55+
@Rule
56+
public SDKTestWorkflowRule testWorkflowRule =
57+
SDKTestWorkflowRule.newBuilder()
58+
.setWorkflowTypes(
59+
WorkflowImplementationOptions.newBuilder()
60+
.setFailWorkflowExceptionTypes(
61+
NonDeterministicException.class, IllegalArgumentException.class)
62+
.build())
63+
.setMetricsScope(metricsScope)
64+
.setWorkflowTypes(NonDeterministicWorkflowImpl.class, WorkflowExceptionImpl.class)
65+
.build();
66+
67+
@Before
68+
public void setup() {
69+
reporter.flush();
70+
}
71+
72+
@WorkflowInterface
73+
public interface TestWorkflowWithSignal {
74+
@WorkflowMethod
75+
String workflow();
76+
77+
@SignalMethod
78+
void unblock();
79+
}
80+
81+
@WorkflowInterface
82+
public interface TestWorkflow {
83+
@WorkflowMethod
84+
String workflow(boolean runtimeException);
85+
}
86+
87+
public static class NonDeterministicWorkflowImpl implements TestWorkflowWithSignal {
88+
@Override
89+
public String workflow() {
90+
if (triggerNonDeterministicException) {
91+
Workflow.sleep(Duration.ofSeconds(1));
92+
}
93+
Workflow.sideEffect(Integer.class, () -> 0);
94+
Workflow.await(() -> false);
95+
return "ok";
96+
}
97+
98+
@Override
99+
public void unblock() {}
100+
}
101+
102+
public static class WorkflowExceptionImpl implements TestWorkflow {
103+
@Override
104+
public String workflow(boolean runtimeException) {
105+
if (runtimeException) {
106+
throw new IllegalArgumentException("test exception");
107+
} else {
108+
throw ApplicationFailure.newFailure("test failure", "test reason");
109+
}
110+
}
111+
}
112+
113+
private Map<String, String> getWorkflowTags(String workflowType) {
114+
return ImmutableMap.of(
115+
"task_queue",
116+
testWorkflowRule.getTaskQueue(),
117+
"namespace",
118+
"UnitTest",
119+
"workflow_type",
120+
workflowType);
121+
}
122+
123+
@Test
124+
public void nonDeterminismIncrementsWorkflowFailedMetric() {
125+
reporter.assertNoMetric(
126+
MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflowWithSignal"));
127+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
128+
TestWorkflowWithSignal workflow =
129+
client.newWorkflowStub(
130+
TestWorkflowWithSignal.class,
131+
WorkflowOptions.newBuilder()
132+
.setTaskQueue(testWorkflowRule.getTaskQueue())
133+
.validateBuildWithDefaults());
134+
WorkflowExecution exec = WorkflowClient.start(workflow::workflow);
135+
testWorkflowRule.waitForTheEndOfWFT(exec.getWorkflowId());
136+
testWorkflowRule.invalidateWorkflowCache();
137+
triggerNonDeterministicException = true;
138+
workflow.unblock();
139+
assertThrows(WorkflowFailedException.class, () -> workflow.workflow());
140+
reporter.assertCounter(
141+
MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflowWithSignal"), 1);
142+
}
143+
144+
@Test
145+
public void runtimeExceptionWorkflowFailedMetric() {
146+
reporter.assertNoMetric(MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflow"));
147+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
148+
TestWorkflow workflow =
149+
client.newWorkflowStub(
150+
TestWorkflow.class,
151+
WorkflowOptions.newBuilder()
152+
.setTaskQueue(testWorkflowRule.getTaskQueue())
153+
.validateBuildWithDefaults());
154+
assertThrows(WorkflowFailedException.class, () -> workflow.workflow(true));
155+
reporter.assertCounter(MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflow"), 1);
156+
}
157+
158+
@Test
159+
public void applicationFailureWorkflowFailedMetric() {
160+
reporter.assertNoMetric(MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflow"));
161+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
162+
TestWorkflow workflow =
163+
client.newWorkflowStub(
164+
TestWorkflow.class,
165+
WorkflowOptions.newBuilder()
166+
.setTaskQueue(testWorkflowRule.getTaskQueue())
167+
.validateBuildWithDefaults());
168+
assertThrows(WorkflowFailedException.class, () -> workflow.workflow(false));
169+
reporter.assertCounter(MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflow"), 1);
170+
}
171+
}

0 commit comments

Comments
 (0)