Skip to content

Commit 58a4200

Browse files
authored
RequestIdInfo and links changes in test server (#2515)
1 parent b9458fc commit 58a4200

File tree

11 files changed

+285
-75
lines changed

11 files changed

+285
-75
lines changed

.github/workflows/ci.yml

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -69,42 +69,45 @@ jobs:
6969
uses: gradle/actions/setup-gradle@v4
7070

7171
- name: Start containerized server and dependencies
72+
env:
73+
TEMPORAL_CLI_VERSION: 1.3.1-nexus-links.0
7274
run: |
73-
wget https://github.com/temporalio/cli/releases/download/v1.3.1-priority.0/temporal_cli_1.3.1-priority.0_linux_amd64.tar.gz
74-
tar -xzf temporal_cli_1.3.1-priority.0_linux_amd64.tar.gz
75+
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
76+
tar -xzf temporal_cli.tar.gz
7577
chmod +x temporal
7678
./temporal server start-dev \
77-
--headless \
78-
--port 7233 \
79-
--http-port 7243 \
80-
--namespace UnitTest \
81-
--db-filename temporal.sqlite \
82-
--sqlite-pragma journal_mode=WAL \
83-
--sqlite-pragma synchronous=OFF \
84-
--search-attribute CustomKeywordField=Keyword \
85-
--search-attribute CustomStringField=Text \
86-
--search-attribute CustomTextField=Text \
87-
--search-attribute CustomIntField=Int \
88-
--search-attribute CustomDatetimeField=Datetime \
89-
--search-attribute CustomDoubleField=Double \
90-
--search-attribute CustomBoolField=Bool \
91-
--dynamic-config-value system.forceSearchAttributesCacheRefreshOnRead=true \
92-
--dynamic-config-value system.enableActivityEagerExecution=true \
93-
--dynamic-config-value system.enableEagerWorkflowStart=true \
94-
--dynamic-config-value system.enableExecuteMultiOperation=true \
95-
--dynamic-config-value frontend.enableUpdateWorkflowExecutionAsyncAccepted=true \
96-
--dynamic-config-value history.MaxBufferedQueryCount=100000 \
97-
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
98-
--dynamic-config-value worker.buildIdScavengerEnabled=true \
99-
--dynamic-config-value frontend.workerVersioningRuleAPIs=true \
100-
--dynamic-config-value worker.removableBuildIdDurationSinceDefault=true \
101-
--dynamic-config-value matching.useNewMatcher=true \
102-
--dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \
103-
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
104-
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true \
105-
--dynamic-config-value frontend.activityAPIsEnabled=true \
106-
--dynamic-config-value system.enableDeploymentVersions=true &
107-
sleep 10s
79+
--headless \
80+
--port 7233 \
81+
--http-port 7243 \
82+
--namespace UnitTest \
83+
--db-filename temporal.sqlite \
84+
--sqlite-pragma journal_mode=WAL \
85+
--sqlite-pragma synchronous=OFF \
86+
--search-attribute CustomKeywordField=Keyword \
87+
--search-attribute CustomStringField=Text \
88+
--search-attribute CustomTextField=Text \
89+
--search-attribute CustomIntField=Int \
90+
--search-attribute CustomDatetimeField=Datetime \
91+
--search-attribute CustomDoubleField=Double \
92+
--search-attribute CustomBoolField=Bool \
93+
--dynamic-config-value system.forceSearchAttributesCacheRefreshOnRead=true \
94+
--dynamic-config-value system.enableActivityEagerExecution=true \
95+
--dynamic-config-value system.enableEagerWorkflowStart=true \
96+
--dynamic-config-value system.enableExecuteMultiOperation=true \
97+
--dynamic-config-value frontend.enableUpdateWorkflowExecutionAsyncAccepted=true \
98+
--dynamic-config-value history.MaxBufferedQueryCount=100000 \
99+
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
100+
--dynamic-config-value worker.buildIdScavengerEnabled=true \
101+
--dynamic-config-value frontend.workerVersioningRuleAPIs=true \
102+
--dynamic-config-value worker.removableBuildIdDurationSinceDefault=true \
103+
--dynamic-config-value matching.useNewMatcher=true \
104+
--dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \
105+
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
106+
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true \
107+
--dynamic-config-value frontend.activityAPIsEnabled=true \
108+
--dynamic-config-value system.enableDeploymentVersions=true \
109+
--dynamic-config-value history.enableRequestIdRefLinks=true &
110+
sleep 10s
108111
109112
- name: Run unit tests
110113
env:

temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.temporal.api.enums.v1.EventType;
99
import io.temporal.api.history.v1.History;
1010
import io.temporal.api.history.v1.HistoryEvent;
11+
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
1112
import io.temporal.client.WorkflowOptions;
1213
import io.temporal.client.WorkflowStub;
1314
import io.temporal.internal.nexus.OperationTokenUtil;
@@ -51,16 +52,21 @@ public void testWorkflowOperationLinks() {
5152
Assert.assertEquals(
5253
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
5354
nexusStartedEvent.getLinks(0).getWorkflowEvent().getEventRef().getEventType());
54-
// Assert that the started workflow has a link to the original workflow
55+
// Assert that the started workflow has a link to the caller workflow
5556
History linkedHistory =
5657
testWorkflowRule
5758
.getWorkflowClient()
5859
.fetchHistory(nexusStartedEvent.getLinks(0).getWorkflowEvent().getWorkflowId())
5960
.getHistory();
6061
HistoryEvent linkedStartedEvent = linkedHistory.getEventsList().get(0);
61-
Assert.assertEquals(1, linkedStartedEvent.getLinksCount());
62+
WorkflowExecutionStartedEventAttributes attrs =
63+
linkedStartedEvent.getWorkflowExecutionStartedEventAttributes();
64+
// Nexus links in callbacks are deduped
65+
Assert.assertEquals(0, linkedStartedEvent.getLinksCount());
66+
Assert.assertEquals(1, attrs.getCompletionCallbacksCount());
6267
Assert.assertEquals(
63-
originalWorkflowId, linkedStartedEvent.getLinks(0).getWorkflowEvent().getWorkflowId());
68+
originalWorkflowId,
69+
attrs.getCompletionCallbacks(0).getLinks(0).getWorkflowEvent().getWorkflowId());
6470
}
6571

6672
public static class TestNexus implements TestWorkflows.TestWorkflow1 {

temporal-test-server/src/main/java/io/temporal/internal/testservice/RequestContext.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
1212
import io.temporal.workflow.Functions;
1313
import java.time.Duration;
14-
import java.util.ArrayList;
15-
import java.util.List;
16-
import java.util.Objects;
14+
import java.util.*;
1715
import java.util.function.LongSupplier;
1816
import javax.annotation.Nonnull;
1917
import javax.annotation.Nullable;

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1346,7 +1346,9 @@ private static void startWorkflow(
13461346
if (request.hasUserMetadata()) {
13471347
event.setUserMetadata(request.getUserMetadata());
13481348
}
1349-
ctx.addEvent(event.build());
1349+
long eventId = ctx.addEvent(event.build());
1350+
ctx.getWorkflowMutableState()
1351+
.attachRequestId(request.getRequestId(), event.getEventType(), eventId);
13501352
}
13511353

13521354
private static void completeWorkflow(

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,18 @@
55
import io.temporal.api.common.v1.Callback;
66
import io.temporal.api.common.v1.Payload;
77
import io.temporal.api.common.v1.Payloads;
8+
import io.temporal.api.enums.v1.EventType;
89
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
910
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
1011
import io.temporal.api.failure.v1.Failure;
1112
import io.temporal.api.history.v1.*;
1213
import io.temporal.api.nexus.v1.Link;
1314
import io.temporal.api.nexus.v1.StartOperationResponse;
1415
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
16+
import io.temporal.api.workflow.v1.RequestIdInfo;
1517
import io.temporal.api.workflowservice.v1.*;
1618
import java.util.List;
19+
import java.util.Map;
1720
import java.util.Optional;
1821
import java.util.function.Consumer;
1922
import javax.annotation.Nonnull;
@@ -132,7 +135,11 @@ PollWorkflowExecutionUpdateResponse pollUpdateWorkflowExecution(
132135

133136
boolean isTerminalState();
134137

135-
boolean isRequestIdAttached(String requestId);
138+
RequestIdInfo getRequestIdInfo(String requestId);
139+
140+
void attachRequestId(@Nonnull String requestId, EventType eventType, long eventId);
136141

137142
List<Callback> getCompletionCallbacks();
143+
144+
void updateRequestIdToEventId(Map<String, RequestIdInfo> requestIdToEventId);
138145
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static io.temporal.internal.testservice.StateMachines.*;
88
import static io.temporal.internal.testservice.StateUtils.mergeMemo;
99
import static io.temporal.internal.testservice.TestServiceRetryState.validateAndOverrideRetryPolicy;
10+
import static io.temporal.internal.testservice.TestWorkflowStore.BUFFERED_EVENT_ID;
1011

1112
import com.google.common.base.Preconditions;
1213
import com.google.common.base.Strings;
@@ -118,7 +119,7 @@ private interface UpdateProcedure {
118119
new ConcurrentHashMap<>();
119120
public StickyExecutionAttributes stickyExecutionAttributes;
120121
private Map<String, Payload> currentMemo;
121-
private final Set<String> attachedRequestIds = new HashSet<>();
122+
private final Map<String, RequestIdInfo> requestIdInfos = new HashMap<>();
122123
private final List<Callback> completionCallbacks = new ArrayList<>();
123124

124125
/**
@@ -2044,8 +2045,19 @@ public boolean isTerminalState() {
20442045
}
20452046

20462047
@Override
2047-
public boolean isRequestIdAttached(@Nonnull String requestId) {
2048-
return attachedRequestIds.contains(requestId);
2048+
public RequestIdInfo getRequestIdInfo(@Nonnull String requestId) {
2049+
return this.requestIdInfos.get(requestId);
2050+
}
2051+
2052+
@Override
2053+
public void attachRequestId(@Nonnull String requestId, EventType eventType, long eventId) {
2054+
this.requestIdInfos.put(
2055+
requestId,
2056+
RequestIdInfo.newBuilder()
2057+
.setEventType(eventType)
2058+
.setEventId(eventId)
2059+
.setBuffered(eventId == BUFFERED_EVENT_ID)
2060+
.build());
20492061
}
20502062

20512063
private void updateHeartbeatTimer(
@@ -3208,13 +3220,25 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock()
32083220
.map(TestWorkflowMutableStateImpl::constructPendingChildExecutionInfo)
32093221
.collect(Collectors.toList());
32103222

3223+
WorkflowExecutionExtendedInfo.Builder extendedInfo = WorkflowExecutionExtendedInfo.newBuilder();
3224+
extendedInfo.putAllRequestIdInfos(
3225+
this.requestIdInfos.entrySet().stream()
3226+
.collect(
3227+
Collectors.toMap(
3228+
Map.Entry::getKey,
3229+
e ->
3230+
e.getValue().toBuilder()
3231+
.setEventId(e.getValue().getBuffered() ? 0 : e.getValue().getEventId())
3232+
.build())));
3233+
32113234
return DescribeWorkflowExecutionResponse.newBuilder()
32123235
.setExecutionConfig(executionConfig)
32133236
.setWorkflowExecutionInfo(executionInfo)
32143237
.addAllPendingActivities(pendingActivities)
32153238
.addAllPendingNexusOperations(pendingNexusOperations)
32163239
.addAllPendingChildren(pendingChildren)
32173240
.addAllCallbacks(callbacks)
3241+
.setWorkflowExtendedInfo(extendedInfo)
32183242
.build();
32193243
}
32203244

@@ -3502,26 +3526,26 @@ private void addExecutionSignaledByExternalEvent(
35023526

35033527
private void addWorkflowExecutionOptionsUpdatedEvent(
35043528
RequestContext ctx, String requestId, List<Callback> completionCallbacks, List<Link> links) {
3529+
HistoryEvent.Builder event =
3530+
HistoryEvent.newBuilder()
3531+
.setWorkerMayIgnore(true)
3532+
.setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED);
3533+
35053534
WorkflowExecutionOptionsUpdatedEventAttributes.Builder attrs =
35063535
WorkflowExecutionOptionsUpdatedEventAttributes.newBuilder();
35073536
if (requestId != null) {
35083537
attrs.setAttachedRequestId(requestId);
3509-
this.attachedRequestIds.add(requestId);
3538+
this.attachRequestId(requestId, event.getEventType(), BUFFERED_EVENT_ID);
35103539
}
35113540
if (completionCallbacks != null) {
35123541
attrs.addAllAttachedCompletionCallbacks(completionCallbacks);
35133542
this.completionCallbacks.addAll(completionCallbacks);
35143543
}
35153544

3516-
HistoryEvent.Builder event =
3517-
HistoryEvent.newBuilder()
3518-
.setWorkerMayIgnore(true)
3519-
.setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED)
3520-
.setWorkflowExecutionOptionsUpdatedEventAttributes(attrs);
3545+
event.setWorkflowExecutionOptionsUpdatedEventAttributes(attrs);
35213546
if (links != null) {
35223547
event.addAllLinks(links);
35233548
}
3524-
35253549
ctx.addEvent(event.build());
35263550
}
35273551

@@ -3656,4 +3680,9 @@ private boolean isTerminalState(State workflowState) {
36563680
public List<Callback> getCompletionCallbacks() {
36573681
return completionCallbacks;
36583682
}
3683+
3684+
@Override
3685+
public void updateRequestIdToEventId(Map<String, RequestIdInfo> requestIdToEventId) {
3686+
requestIdInfos.putAll(requestIdToEventId);
3687+
}
36593688
}

0 commit comments

Comments
 (0)