Skip to content

Commit e7a7f0c

Browse files
authored
Set links in Nexus callback (#2513)
* Set links in Nexus callback * modify tests
1 parent 4da87cc commit e7a7f0c

File tree

4 files changed

+240
-96
lines changed

4 files changed

+240
-96
lines changed

temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.nexusrpc.Header;
55
import io.nexusrpc.handler.ServiceImplInstance;
66
import io.temporal.api.common.v1.Callback;
7+
import io.temporal.api.common.v1.Link;
78
import io.temporal.api.enums.v1.TaskQueueKind;
89
import io.temporal.api.taskqueue.v1.TaskQueue;
910
import io.temporal.client.OnConflictOptions;
@@ -13,9 +14,7 @@
1314
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
1415
import io.temporal.common.metadata.WorkflowMethodType;
1516
import io.temporal.internal.client.NexusStartWorkflowRequest;
16-
import java.util.Arrays;
17-
import java.util.Map;
18-
import java.util.TreeMap;
17+
import java.util.*;
1918
import java.util.stream.Collectors;
2019
import org.slf4j.Logger;
2120
import org.slf4j.LoggerFactory;
@@ -88,43 +87,46 @@ public static WorkflowStub createNexusBoundStub(
8887
if (!headers.containsKey(Header.OPERATION_TOKEN)) {
8988
headers.put(Header.OPERATION_TOKEN.toLowerCase(), options.getWorkflowId());
9089
}
90+
List<Link> links =
91+
request.getLinks() == null
92+
? null
93+
: request.getLinks().stream()
94+
.map(
95+
(link) -> {
96+
if (io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor()
97+
.getFullName()
98+
.equals(link.getType())) {
99+
io.temporal.api.nexus.v1.Link nexusLink =
100+
io.temporal.api.nexus.v1.Link.newBuilder()
101+
.setType(link.getType())
102+
.setUrl(link.getUri().toString())
103+
.build();
104+
return LinkConverter.nexusLinkToWorkflowEvent(nexusLink);
105+
} else {
106+
log.warn("ignoring unsupported link data type: {}", link.getType());
107+
return null;
108+
}
109+
})
110+
.filter(Objects::nonNull)
111+
.collect(Collectors.toList());
112+
Callback.Builder cbBuilder =
113+
Callback.newBuilder()
114+
.setNexus(
115+
Callback.Nexus.newBuilder()
116+
.setUrl(request.getCallbackUrl())
117+
.putAllHeader(headers)
118+
.build());
119+
if (links != null) {
120+
cbBuilder.addAllLinks(links);
121+
}
91122
WorkflowOptions.Builder nexusWorkflowOptions =
92123
WorkflowOptions.newBuilder(options)
93124
.setRequestId(request.getRequestId())
94-
.setCompletionCallbacks(
95-
Arrays.asList(
96-
Callback.newBuilder()
97-
.setNexus(
98-
Callback.Nexus.newBuilder()
99-
.setUrl(request.getCallbackUrl())
100-
.putAllHeader(headers)
101-
.build())
102-
.build()));
125+
.setCompletionCallbacks(Collections.singletonList(cbBuilder.build()))
126+
.setLinks(links);
103127
if (options.getTaskQueue() == null) {
104128
nexusWorkflowOptions.setTaskQueue(request.getTaskQueue());
105129
}
106-
if (request.getLinks() != null) {
107-
nexusWorkflowOptions.setLinks(
108-
request.getLinks().stream()
109-
.map(
110-
(link) -> {
111-
if (io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor()
112-
.getFullName()
113-
.equals(link.getType())) {
114-
io.temporal.api.nexus.v1.Link nexusLink =
115-
io.temporal.api.nexus.v1.Link.newBuilder()
116-
.setType(link.getType())
117-
.setUrl(link.getUri().toString())
118-
.build();
119-
return LinkConverter.nexusLinkToWorkflowEvent(nexusLink);
120-
} else {
121-
log.warn("ignoring unsupported link data type: {}", link.getType());
122-
return null;
123-
}
124-
})
125-
.filter(link -> link != null)
126-
.collect(Collectors.toList()));
127-
}
128130
nexusWorkflowOptions.setOnConflictOptions(
129131
OnConflictOptions.newBuilder()
130132
.setAttachRequestId(true)

temporal-sdk/src/main/java/io/temporal/internal/common/LinkConverter.java

Lines changed: 98 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package io.temporal.internal.common;
22

3-
import static io.temporal.internal.common.ProtoEnumNameUtils.EVENT_TYPE_PREFIX;
4-
import static io.temporal.internal.common.ProtoEnumNameUtils.simplifiedToUniqueName;
3+
import static io.temporal.internal.common.ProtoEnumNameUtils.*;
54

65
import io.temporal.api.common.v1.Link;
76
import io.temporal.api.enums.v1.EventType;
7+
import java.io.UnsupportedEncodingException;
88
import java.net.URI;
99
import java.net.URLDecoder;
1010
import java.net.URLEncoder;
1111
import java.nio.charset.StandardCharsets;
12-
import java.util.StringTokenizer;
12+
import java.util.*;
13+
import java.util.AbstractMap.SimpleImmutableEntry;
14+
import java.util.stream.Collectors;
1315
import org.slf4j.Logger;
1416
import org.slf4j.LoggerFactory;
1517

@@ -18,6 +20,15 @@ public class LinkConverter {
1820
private static final Logger log = LoggerFactory.getLogger(LinkConverter.class);
1921

2022
private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history";
23+
private static final String linkReferenceTypeKey = "referenceType";
24+
private static final String linkEventIDKey = "eventID";
25+
private static final String linkEventTypeKey = "eventType";
26+
private static final String linkRequestIDKey = "requestID";
27+
28+
private static final String eventReferenceType =
29+
Link.WorkflowEvent.EventReference.getDescriptor().getName();
30+
private static final String requestIDReferenceType =
31+
Link.WorkflowEvent.RequestIdReference.getDescriptor().getName();
2132

2233
public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.WorkflowEvent we) {
2334
try {
@@ -28,19 +39,36 @@ public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.Workfl
2839
URLEncoder.encode(we.getWorkflowId(), StandardCharsets.UTF_8.toString()),
2940
URLEncoder.encode(we.getRunId(), StandardCharsets.UTF_8.toString()));
3041

42+
List<Map.Entry<String, String>> queryParams = new ArrayList<>();
3143
if (we.hasEventRef()) {
32-
url += "?";
33-
if (we.getEventRef().getEventId() > 0) {
34-
url += "eventID=" + we.getEventRef().getEventId() + "&";
44+
queryParams.add(new SimpleImmutableEntry<>(linkReferenceTypeKey, eventReferenceType));
45+
Link.WorkflowEvent.EventReference eventRef = we.getEventRef();
46+
if (eventRef.getEventId() > 0) {
47+
queryParams.add(
48+
new SimpleImmutableEntry<>(linkEventIDKey, String.valueOf(eventRef.getEventId())));
3549
}
36-
url +=
37-
"eventType="
38-
+ URLEncoder.encode(
39-
we.getEventRef().getEventType().name(), StandardCharsets.UTF_8.toString())
40-
+ "&";
41-
url += "referenceType=EventReference";
50+
final String eventType =
51+
URLEncoder.encode(
52+
encodeEventType(eventRef.getEventType()), StandardCharsets.UTF_8.toString());
53+
queryParams.add(new SimpleImmutableEntry<>(linkEventTypeKey, eventType));
54+
} else if (we.hasRequestIdRef()) {
55+
queryParams.add(new SimpleImmutableEntry<>(linkReferenceTypeKey, requestIDReferenceType));
56+
Link.WorkflowEvent.RequestIdReference requestIDRef = we.getRequestIdRef();
57+
final String requestID =
58+
URLEncoder.encode(requestIDRef.getRequestId(), StandardCharsets.UTF_8.toString());
59+
queryParams.add(new SimpleImmutableEntry<>(linkRequestIDKey, requestID));
60+
final String eventType =
61+
URLEncoder.encode(
62+
encodeEventType(requestIDRef.getEventType()), StandardCharsets.UTF_8.toString());
63+
queryParams.add(new SimpleImmutableEntry<>(linkEventTypeKey, eventType));
4264
}
4365

66+
url +=
67+
"?"
68+
+ queryParams.stream()
69+
.map((item) -> item.getKey() + "=" + item.getValue())
70+
.collect(Collectors.joining("&"));
71+
4472
return io.temporal.api.nexus.v1.Link.newBuilder()
4573
.setUrl(url)
4674
.setType(we.getDescriptorForType().getFullName())
@@ -84,36 +112,74 @@ public static Link nexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusL
84112
.setWorkflowId(workflowID)
85113
.setRunId(runID);
86114

87-
if (uri.getQuery() != null) {
115+
Map<String, String> queryParams = parseQueryParams(uri);
116+
String referenceType = queryParams.get(linkReferenceTypeKey);
117+
if (referenceType.equals(eventReferenceType)) {
88118
Link.WorkflowEvent.EventReference.Builder eventRef =
89119
Link.WorkflowEvent.EventReference.newBuilder();
90-
String query = URLDecoder.decode(uri.getQuery(), StandardCharsets.UTF_8.toString());
91-
st = new StringTokenizer(query, "&");
92-
while (st.hasMoreTokens()) {
93-
String[] param = st.nextToken().split("=");
94-
switch (param[0]) {
95-
case "eventID":
96-
eventRef.setEventId(Long.parseLong(param[1]));
97-
continue;
98-
case "eventType":
99-
// Have to handle the SCREAMING_CASE enum or the traditional temporal PascalCase enum
100-
// to EventType
101-
if (param[1].startsWith(EVENT_TYPE_PREFIX)) {
102-
eventRef.setEventType(EventType.valueOf(param[1]));
103-
} else {
104-
eventRef.setEventType(
105-
EventType.valueOf(simplifiedToUniqueName(param[1], EVENT_TYPE_PREFIX)));
106-
}
107-
}
120+
String eventID = queryParams.get(linkEventIDKey);
121+
if (eventID != null && !eventID.isEmpty()) {
122+
eventRef.setEventId(Long.parseLong(eventID));
123+
}
124+
String eventType = queryParams.get(linkEventTypeKey);
125+
if (eventType != null && !eventType.isEmpty()) {
126+
eventRef.setEventType(decodeEventType(eventType));
108127
}
109128
we.setEventRef(eventRef);
110-
link.setWorkflowEvent(we);
129+
} else if (referenceType.equals(requestIDReferenceType)) {
130+
Link.WorkflowEvent.RequestIdReference.Builder requestIDRef =
131+
Link.WorkflowEvent.RequestIdReference.newBuilder();
132+
String requestID = queryParams.get(linkRequestIDKey);
133+
if (requestID != null && !requestID.isEmpty()) {
134+
requestIDRef.setRequestId(requestID);
135+
}
136+
String eventType = queryParams.get(linkEventTypeKey);
137+
if (eventType != null && !eventType.isEmpty()) {
138+
requestIDRef.setEventType(decodeEventType(eventType));
139+
}
140+
we.setRequestIdRef(requestIDRef);
141+
} else {
142+
log.error("Failed to parse Nexus link URL: invalid reference type: {}", referenceType);
143+
return null;
111144
}
145+
146+
link.setWorkflowEvent(we);
112147
} catch (Exception e) {
113148
// Swallow un-parsable links since they are not critical to processing
114149
log.error("Failed to parse Nexus link URL", e);
115150
return null;
116151
}
117152
return link.build();
118153
}
154+
155+
private static Map<String, String> parseQueryParams(URI uri) throws UnsupportedEncodingException {
156+
final String query = uri.getQuery();
157+
if (query == null || query.isEmpty()) {
158+
return Collections.emptyMap();
159+
}
160+
Map<String, String> queryParams = new HashMap<>();
161+
for (String pair : query.split("&")) {
162+
final String[] kv = pair.split("=", 2);
163+
final String key = URLDecoder.decode(kv[0], StandardCharsets.UTF_8.toString());
164+
final String value =
165+
kv.length == 2 && !kv[1].isEmpty()
166+
? URLDecoder.decode(kv[1], StandardCharsets.UTF_8.toString())
167+
: null;
168+
queryParams.put(key, value);
169+
}
170+
return queryParams;
171+
}
172+
173+
private static String encodeEventType(EventType eventType) {
174+
return uniqueToSimplifiedName(eventType.name(), EVENT_TYPE_PREFIX);
175+
}
176+
177+
private static EventType decodeEventType(String eventType) {
178+
// Have to handle the SCREAMING_CASE enum or the traditional temporal PascalCase enum to
179+
// EventType
180+
if (eventType.startsWith(EVENT_TYPE_PREFIX)) {
181+
return EventType.valueOf(eventType);
182+
}
183+
return EventType.valueOf(simplifiedToUniqueName(eventType, EVENT_TYPE_PREFIX));
184+
}
119185
}

0 commit comments

Comments
 (0)