Skip to content

Commit 2377114

Browse files
authored
Fix workflow ID reuse policy and conflict policy handling (#2446)
* Fix workflow ID reuse policy and conflict policy handling * add tests
1 parent f7b8ded commit 2377114

File tree

2 files changed

+131
-44
lines changed

2 files changed

+131
-44
lines changed

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java

Lines changed: 77 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -253,64 +253,83 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
253253
WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
254254
WorkflowIdReusePolicy reusePolicy = startRequest.getWorkflowIdReusePolicy();
255255
WorkflowIdConflictPolicy conflictPolicy = startRequest.getWorkflowIdConflictPolicy();
256-
if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
257-
&& reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
258-
throw createInvalidArgument(
259-
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
260-
}
261256

257+
validateWorkflowIdReusePolicy(reusePolicy, conflictPolicy);
262258
validateOnConflictOptions(startRequest);
263259

260+
// Backwards compatibility: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING is deprecated
261+
if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
262+
conflictPolicy = WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING;
263+
reusePolicy = WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE;
264+
}
265+
if (conflictPolicy == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED) {
266+
conflictPolicy = WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL;
267+
}
268+
if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_UNSPECIFIED) {
269+
reusePolicy = WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE;
270+
}
271+
264272
TestWorkflowMutableState existing;
265273
lock.lock();
266274
try {
267275
String newRunId = UUID.randomUUID().toString();
268276
existing = executionsByWorkflowId.get(workflowId);
269277
if (existing != null) {
270-
WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
278+
StartWorkflowExecutionResponse dedupedResponse = dedupeRequest(startRequest, existing);
279+
if (dedupedResponse != null) {
280+
return dedupedResponse;
281+
}
271282

283+
WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
272284
if (status == WORKFLOW_EXECUTION_STATUS_RUNNING) {
273-
StartWorkflowExecutionResponse dedupedResponse = dedupeRequest(startRequest, existing);
274-
if (dedupedResponse != null) {
275-
return dedupedResponse;
285+
switch (conflictPolicy) {
286+
case WORKFLOW_ID_CONFLICT_POLICY_FAIL:
287+
return throwDuplicatedWorkflow(startRequest, existing);
288+
case WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING:
289+
if (startRequest.hasOnConflictOptions()) {
290+
existing.applyOnConflictOptions(startRequest);
291+
}
292+
return StartWorkflowExecutionResponse.newBuilder()
293+
.setStarted(false)
294+
.setRunId(existing.getExecutionId().getExecution().getRunId())
295+
.build();
296+
case WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING:
297+
existing.terminateWorkflowExecution(
298+
TerminateWorkflowExecutionRequest.newBuilder()
299+
.setNamespace(startRequest.getNamespace())
300+
.setWorkflowExecution(existing.getExecutionId().getExecution())
301+
.setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
302+
.setIdentity("history-service")
303+
.setDetails(
304+
Payloads.newBuilder()
305+
.addPayloads(
306+
Payload.newBuilder()
307+
.setData(
308+
ByteString.copyFromUtf8(
309+
String.format(
310+
"terminated by new runID: %s", newRunId)))
311+
.build())
312+
.build())
313+
.build());
314+
break;
276315
}
316+
}
277317

278-
if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
279-
|| conflictPolicy
280-
== WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING) {
281-
existing.terminateWorkflowExecution(
282-
TerminateWorkflowExecutionRequest.newBuilder()
283-
.setNamespace(startRequest.getNamespace())
284-
.setWorkflowExecution(existing.getExecutionId().getExecution())
285-
.setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
286-
.setIdentity("history-service")
287-
.setDetails(
288-
Payloads.newBuilder()
289-
.addPayloads(
290-
Payload.newBuilder()
291-
.setData(
292-
ByteString.copyFromUtf8(
293-
String.format("terminated by new runID: %s", newRunId)))
294-
.build())
295-
.build())
296-
.build());
297-
} else if (conflictPolicy
298-
== WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) {
299-
if (startRequest.hasOnConflictOptions()) {
300-
existing.applyOnConflictOptions(startRequest);
318+
// Status of existing workflow could have changed to TERMINATED.
319+
status = existing.getWorkflowExecutionStatus();
320+
321+
// At this point, the existing workflow already completed or was terminated.
322+
switch (reusePolicy) {
323+
case WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE:
324+
break;
325+
case WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY:
326+
if (status == WORKFLOW_EXECUTION_STATUS_COMPLETED
327+
|| status == WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW) {
328+
return throwDuplicatedWorkflow(startRequest, existing);
301329
}
302-
return StartWorkflowExecutionResponse.newBuilder()
303-
.setStarted(false)
304-
.setRunId(existing.getExecutionId().getExecution().getRunId())
305-
.build();
306-
} else {
330+
break;
331+
case WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE:
307332
return throwDuplicatedWorkflow(startRequest, existing);
308-
}
309-
} else if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE
310-
|| (reusePolicy == WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
311-
&& (status == WORKFLOW_EXECUTION_STATUS_COMPLETED
312-
|| status == WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW))) {
313-
return throwDuplicatedWorkflow(startRequest, existing);
314333
}
315334
}
316335

@@ -373,14 +392,28 @@ private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
373392
WorkflowExecutionAlreadyStartedFailure.getDescriptor());
374393
}
375394

395+
private void validateWorkflowIdReusePolicy(
396+
WorkflowIdReusePolicy reusePolicy, WorkflowIdConflictPolicy conflictPolicy) {
397+
if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
398+
&& reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
399+
throw createInvalidArgument(
400+
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
401+
}
402+
if (conflictPolicy == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
403+
&& reusePolicy == WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
404+
throw createInvalidArgument(
405+
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE cannot be used together with WorkflowIdConflictPolicy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING");
406+
}
407+
}
408+
376409
private void validateOnConflictOptions(StartWorkflowExecutionRequest startRequest) {
377410
if (!startRequest.hasOnConflictOptions()) {
378411
return;
379412
}
380413
OnConflictOptions options = startRequest.getOnConflictOptions();
381414
if (options.getAttachCompletionCallbacks() && !options.getAttachRequestId()) {
382415
throw createInvalidArgument(
383-
"Invalid OnConflictOptions: AttachCompletionCallbacks cannot be 'true' if AttachRequestId is 'false'.");
416+
"Invalid OnConflictOptions: AttachCompletionCallbacks cannot be 'true' if AttachRequestId is 'false'.");
384417
}
385418
}
386419

temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowIdReusePolicyTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import io.temporal.api.common.v1.WorkflowExecution;
2424
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
25+
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
2526
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
2627
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
2728
import io.temporal.client.*;
@@ -118,6 +119,59 @@ public void secondWorkflowTerminatesFirst() {
118119
describe(execution2).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
119120
}
120121

122+
@Test
123+
public void deduplicateRequestWorkflowStillRunning() {
124+
String workflowId = "deduplicate-request-1";
125+
WorkflowOptions options =
126+
WorkflowOptions.newBuilder()
127+
.setWorkflowId(workflowId)
128+
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
129+
.setTaskQueue(testWorkflowRule.getTaskQueue())
130+
.setRequestId("request-id-1")
131+
.build();
132+
133+
WorkflowExecution execution1 = startForeverWorkflow(options);
134+
describe(execution1).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
135+
136+
WorkflowExecution execution2 = startForeverWorkflow(options);
137+
describe(execution2).assertExecutionId(execution1);
138+
}
139+
140+
@Test
141+
public void deduplicateRequestWorkflowAlreadyCompleted() {
142+
String workflowId = "deduplicate-request-2";
143+
WorkflowOptions options =
144+
WorkflowOptions.newBuilder()
145+
.setWorkflowId(workflowId)
146+
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
147+
.setTaskQueue(testWorkflowRule.getTaskQueue())
148+
.setRequestId("request-id-2")
149+
.build();
150+
151+
WorkflowExecution execution1 = runFailingWorkflow(options);
152+
describe(execution1).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);
153+
154+
WorkflowExecution execution2 = startForeverWorkflow(options);
155+
describe(execution2).assertExecutionId(execution1);
156+
}
157+
158+
@Test
159+
public void invalidWorkflowIdReusePolicy() {
160+
String workflowId = "invalid-workflow-id-reuse-policy";
161+
WorkflowOptions options =
162+
WorkflowOptions.newBuilder()
163+
.setWorkflowId(workflowId)
164+
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
165+
.setTaskQueue(testWorkflowRule.getTaskQueue())
166+
.setWorkflowIdReusePolicy(
167+
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
168+
.setWorkflowIdConflictPolicy(
169+
WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING)
170+
.build();
171+
172+
Assert.assertThrows(WorkflowServiceException.class, () -> startForeverWorkflow(options));
173+
}
174+
121175
private WorkflowExecution startForeverWorkflow(WorkflowOptions options) {
122176
TestWorkflows.PrimitiveWorkflow workflowStub =
123177
testWorkflowRule

0 commit comments

Comments
 (0)