Skip to content

Commit d430114

Browse files
Add Summary to Nexus Operations (#2444)
1 parent 2377114 commit d430114

File tree

12 files changed

+229
-33
lines changed

12 files changed

+229
-33
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ Functions.Proc scheduleLocalActivityTask(
165165
ExecuteLocalActivityParameters parameters, LocalActivityCallback callback);
166166

167167
/**
168-
* Start child workflow.
168+
* Start a child workflow.
169169
*
170170
* @param parameters encapsulates all the information required to schedule a child workflow for
171171
* execution
@@ -179,8 +179,20 @@ Functions.Proc1<Exception> startChildWorkflow(
179179
Functions.Proc2<WorkflowExecution, Exception> startCallback,
180180
Functions.Proc2<Optional<Payloads>, Exception> completionCallback);
181181

182+
/**
183+
* Start a Nexus operation.
184+
*
185+
* @param attributes nexus operation attributes
186+
* @param metadata user metadata to be associated with the operation.
187+
* @param startedCallback callback that is called when the operation is start if async, or
188+
* completes if it is sync.
189+
* @param completionCallback callback that is called upon child workflow completion or failure
190+
* @return cancellation handle. Invoke {@link io.temporal.workflow.Functions.Proc1#apply(Object)}
191+
* to cancel activity task.
192+
*/
182193
Functions.Proc1<Exception> startNexusOperation(
183194
ScheduleNexusOperationCommandAttributes attributes,
195+
@Nullable UserMetadata metadata,
184196
Functions.Proc2<Optional<String>, Failure> startedCallback,
185197
Functions.Proc2<Optional<Payload>, Failure> completionCallback);
186198

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,12 @@ public Functions.Proc1<Exception> startChildWorkflow(
228228
@Override
229229
public Functions.Proc1<Exception> startNexusOperation(
230230
ScheduleNexusOperationCommandAttributes attributes,
231+
@Nullable UserMetadata metadata,
231232
Functions.Proc2<Optional<String>, Failure> startedCallback,
232233
Functions.Proc2<Optional<Payload>, Failure> completionCallback) {
233234
Functions.Proc cancellationHandler =
234-
workflowStateMachines.startNexusOperation(attributes, startedCallback, completionCallback);
235+
workflowStateMachines.startNexusOperation(
236+
attributes, metadata, startedCallback, completionCallback);
235237
return (exception) -> cancellationHandler.apply();
236238
}
237239

temporal-sdk/src/main/java/io/temporal/internal/statemachines/NexusOperationStateMachine.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
import io.temporal.api.failure.v1.Failure;
3030
import io.temporal.api.failure.v1.NexusOperationFailureInfo;
3131
import io.temporal.api.history.v1.*;
32+
import io.temporal.api.sdk.v1.UserMetadata;
3233
import io.temporal.workflow.Functions;
3334
import java.util.Optional;
35+
import javax.annotation.Nullable;
3436

3537
/**
3638
* NexusOperationStateMachine manages a nexus operation.
@@ -46,6 +48,7 @@ final class NexusOperationStateMachine
4648
private static final String NEXUS_OPERATION_CANCELED_MESSAGE = "Nexus operation canceled";
4749

4850
private ScheduleNexusOperationCommandAttributes scheduleAttributes;
51+
private UserMetadata metadata;
4952
private final Functions.Proc2<Optional<String>, Failure> startedCallback;
5053
private boolean async = false;
5154

@@ -240,22 +243,25 @@ private void notifyTimedOut() {
240243
*/
241244
public static NexusOperationStateMachine newInstance(
242245
ScheduleNexusOperationCommandAttributes attributes,
246+
@Nullable UserMetadata metadata,
243247
Functions.Proc2<Optional<String>, Failure> startedCallback,
244248
Functions.Proc2<Optional<Payload>, Failure> completionCallback,
245249
Functions.Proc1<CancellableCommand> commandSink,
246250
Functions.Proc1<StateMachine> stateMachineSink) {
247251
return new NexusOperationStateMachine(
248-
attributes, startedCallback, completionCallback, commandSink, stateMachineSink);
252+
attributes, metadata, startedCallback, completionCallback, commandSink, stateMachineSink);
249253
}
250254

251255
private NexusOperationStateMachine(
252256
ScheduleNexusOperationCommandAttributes attributes,
257+
@Nullable UserMetadata metadata,
253258
Functions.Proc2<Optional<String>, Failure> startedCallback,
254259
Functions.Proc2<Optional<Payload>, Failure> completionCallback,
255260
Functions.Proc1<CancellableCommand> commandSink,
256261
Functions.Proc1<StateMachine> stateMachineSink) {
257262
super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
258263
this.scheduleAttributes = attributes;
264+
this.metadata = metadata;
259265
this.operation = attributes.getOperation();
260266
this.service = attributes.getService();
261267
this.endpoint = attributes.getEndpoint();
@@ -265,11 +271,16 @@ private NexusOperationStateMachine(
265271
}
266272

267273
public void createScheduleNexusTaskCommand() {
268-
addCommand(
274+
Command.Builder command =
269275
Command.newBuilder()
270276
.setCommandType(CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION)
271-
.setScheduleNexusOperationCommandAttributes(scheduleAttributes)
272-
.build());
273-
scheduleAttributes = null; // avoiding retaining large input for the duration of the operation
277+
.setScheduleNexusOperationCommandAttributes(scheduleAttributes);
278+
if (metadata != null) {
279+
command.setUserMetadata(metadata);
280+
}
281+
addCommand(command.build());
282+
// avoiding retaining large input for the duration of the operation
283+
scheduleAttributes = null;
284+
metadata = null;
274285
}
275286
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -951,12 +951,18 @@ public Functions.Proc startChildWorkflow(
951951

952952
public Functions.Proc startNexusOperation(
953953
ScheduleNexusOperationCommandAttributes attributes,
954+
@Nullable UserMetadata metadata,
954955
Functions.Proc2<Optional<String>, Failure> startedCallback,
955956
Functions.Proc2<Optional<Payload>, Failure> completionCallback) {
956957
checkEventLoopExecuting();
957958
NexusOperationStateMachine operation =
958959
NexusOperationStateMachine.newInstance(
959-
attributes, startedCallback, completionCallback, commandSink, stateMachineSink);
960+
attributes,
961+
metadata,
962+
startedCallback,
963+
completionCallback,
964+
commandSink,
965+
stateMachineSink);
960966
return () -> {
961967
if (operation.isCancellable()) {
962968
operation.cancel();

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,9 +823,15 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
823823
attributes.setScheduleToCloseTimeout(
824824
ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToCloseTimeout()));
825825

826+
@Nullable
827+
UserMetadata userMetadata =
828+
makeUserMetaData(
829+
input.getOptions().getSummary(), null, dataConverterWithCurrentWorkflowContext);
830+
826831
Functions.Proc1<Exception> cancellationCallback =
827832
replayContext.startNexusOperation(
828833
attributes.build(),
834+
userMetadata,
829835
(operationExec, failure) -> {
830836
if (failure != null) {
831837
runner.executeInWorkflowThread(

temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationOptions.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public static NexusOperationOptions getDefaultInstance() {
5252

5353
public static final class Builder {
5454
private Duration scheduleToCloseTimeout;
55+
private String summary;
5556

5657
/**
5758
* Sets the schedule to close timeout for the Nexus operation.
@@ -65,17 +66,30 @@ public NexusOperationOptions.Builder setScheduleToCloseTimeout(
6566
return this;
6667
}
6768

69+
/**
70+
* Single-line fixed summary for this Nexus operation that will appear in UI/CLI. This can be in
71+
* single-line Temporal Markdown format.
72+
*
73+
* <p>Default is none/empty.
74+
*/
75+
@Experimental
76+
public NexusOperationOptions.Builder setSummary(String summary) {
77+
this.summary = summary;
78+
return this;
79+
}
80+
6881
private Builder() {}
6982

7083
private Builder(NexusOperationOptions options) {
7184
if (options == null) {
7285
return;
7386
}
7487
this.scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
88+
this.summary = options.getSummary();
7589
}
7690

7791
public NexusOperationOptions build() {
78-
return new NexusOperationOptions(scheduleToCloseTimeout);
92+
return new NexusOperationOptions(scheduleToCloseTimeout, summary);
7993
}
8094

8195
public NexusOperationOptions.Builder mergeNexusOperationOptions(
@@ -87,39 +101,54 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions(
87101
(override.scheduleToCloseTimeout == null)
88102
? this.scheduleToCloseTimeout
89103
: override.scheduleToCloseTimeout;
104+
this.summary = (override.summary == null) ? this.summary : override.summary;
90105
return this;
91106
}
92107
}
93108

94-
private NexusOperationOptions(Duration scheduleToCloseTimeout) {
109+
private NexusOperationOptions(Duration scheduleToCloseTimeout, String summary) {
95110
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
111+
this.summary = summary;
96112
}
97113

98114
public NexusOperationOptions.Builder toBuilder() {
99115
return new NexusOperationOptions.Builder(this);
100116
}
101117

102-
private Duration scheduleToCloseTimeout;
118+
private final Duration scheduleToCloseTimeout;
119+
private final String summary;
103120

104121
public Duration getScheduleToCloseTimeout() {
105122
return scheduleToCloseTimeout;
106123
}
107124

125+
@Experimental
126+
public String getSummary() {
127+
return summary;
128+
}
129+
108130
@Override
109131
public boolean equals(Object o) {
110132
if (this == o) return true;
111133
if (o == null || getClass() != o.getClass()) return false;
112134
NexusOperationOptions that = (NexusOperationOptions) o;
113-
return Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout);
135+
return Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout)
136+
&& Objects.equals(summary, that.summary);
114137
}
115138

116139
@Override
117140
public int hashCode() {
118-
return Objects.hash(scheduleToCloseTimeout);
141+
return Objects.hash(scheduleToCloseTimeout, summary);
119142
}
120143

121144
@Override
122145
public String toString() {
123-
return "NexusOperationOptions{" + "scheduleToCloseTimeout=" + scheduleToCloseTimeout + '}';
146+
return "NexusOperationOptions{"
147+
+ "scheduleToCloseTimeout="
148+
+ scheduleToCloseTimeout
149+
+ ", summary='"
150+
+ summary
151+
+ '\''
152+
+ '}';
124153
}
125154
}

temporal-sdk/src/test/java/io/temporal/internal/statemachines/CancelNexusOperationStateMachineTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
8989
builder
9090
.<Optional<String>, Failure>add2(
9191
(v, c) ->
92-
stateMachines.startNexusOperation(scheduleAttributes, c, delayedCallback::run))
92+
stateMachines.startNexusOperation(
93+
scheduleAttributes, null, c, delayedCallback::run))
9394
.add((v) -> stateMachines.requestCancelNexusOperation(cancelAttributes))
9495
.<Optional<Payload>, Failure>add2((pair, c) -> delayedCallback.set(c))
9596
.add((pair) -> stateMachines.failWorkflow(pair.getT2()));

temporal-sdk/src/test/java/io/temporal/internal/statemachines/NexusOperationStateMachineTest.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
9292
newScheduleNexusOperationCommandAttributesBuilder();
9393
builder
9494
.<Optional<Payload>, Failure>add2(
95-
(v, c) -> stateMachines.startNexusOperation(attributes.build(), (o, f) -> {}, c))
95+
(v, c) ->
96+
stateMachines.startNexusOperation(attributes.build(), null, (o, f) -> {}, c))
9697
.add(
9798
(pair) ->
9899
stateMachines.completeWorkflow(
@@ -167,7 +168,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
167168
newScheduleNexusOperationCommandAttributesBuilder();
168169
builder
169170
.<Optional<Payload>, Failure>add2(
170-
(v, c) -> stateMachines.startNexusOperation(attributes.build(), (o, f) -> {}, c))
171+
(v, c) ->
172+
stateMachines.startNexusOperation(attributes.build(), null, (o, f) -> {}, c))
171173
.add((pair) -> stateMachines.failWorkflow(pair.getT2()));
172174
}
173175
}
@@ -238,7 +240,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
238240
newScheduleNexusOperationCommandAttributesBuilder();
239241
builder
240242
.<Optional<Payload>, Failure>add2(
241-
(v, c) -> stateMachines.startNexusOperation(attributes.build(), (o, f) -> {}, c))
243+
(v, c) ->
244+
stateMachines.startNexusOperation(attributes.build(), null, (o, f) -> {}, c))
242245
.add((pair) -> stateMachines.failWorkflow(pair.getT2()));
243246
}
244247
}
@@ -309,7 +312,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
309312
newScheduleNexusOperationCommandAttributesBuilder();
310313
builder
311314
.<Optional<Payload>, Failure>add2(
312-
(v, c) -> stateMachines.startNexusOperation(attributes.build(), (o, f) -> {}, c))
315+
(v, c) ->
316+
stateMachines.startNexusOperation(attributes.build(), null, (o, f) -> {}, c))
313317
.add((pair) -> stateMachines.failWorkflow(pair.getT2()));
314318
}
315319
}
@@ -383,7 +387,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
383387
.<Optional<Payload>, Failure>add2(
384388
(v, c) ->
385389
cancellationHandler =
386-
stateMachines.startNexusOperation(attributes.build(), (o, f) -> {}, c))
390+
stateMachines.startNexusOperation(
391+
attributes.build(), null, (o, f) -> {}, c))
387392
.add((pair) -> stateMachines.failWorkflow(pair.getT2()));
388393
// Immediate cancellation
389394
builder.add((v) -> cancellationHandler.apply());
@@ -420,7 +425,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
420425
builder
421426
.<Optional<String>, Failure>add2(
422427
(v, c) ->
423-
stateMachines.startNexusOperation(attributes.build(), c, delayedCallback::run))
428+
stateMachines.startNexusOperation(
429+
attributes.build(), null, c, delayedCallback::run))
424430
.<Optional<Payload>, Failure>add2(
425431
(pair, c) -> {
426432
Assert.assertEquals(OPERATION_ID, pair.getT1().get());
@@ -514,7 +520,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
514520
builder
515521
.<Optional<String>, Failure>add2(
516522
(v, c) ->
517-
stateMachines.startNexusOperation(attributes.build(), c, delayedCallback::run))
523+
stateMachines.startNexusOperation(
524+
attributes.build(), null, c, delayedCallback::run))
518525
.<Optional<Payload>, Failure>add2(
519526
(pair, c) -> {
520527
Assert.assertEquals(OPERATION_ID, pair.getT1().get());
@@ -604,7 +611,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
604611
builder
605612
.<Optional<String>, Failure>add2(
606613
(v, c) ->
607-
stateMachines.startNexusOperation(attributes.build(), c, delayedCallback::run))
614+
stateMachines.startNexusOperation(
615+
attributes.build(), null, c, delayedCallback::run))
608616
.<Optional<Payload>, Failure>add2(
609617
(pair, c) -> {
610618
Assert.assertEquals(OPERATION_ID, pair.getT1().get());
@@ -694,7 +702,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
694702
builder
695703
.<Optional<String>, Failure>add2(
696704
(v, c) ->
697-
stateMachines.startNexusOperation(attributes.build(), c, delayedCallback::run))
705+
stateMachines.startNexusOperation(
706+
attributes.build(), null, c, delayedCallback::run))
698707
.<Optional<Payload>, Failure>add2(
699708
(pair, c) -> {
700709
Assert.assertEquals(OPERATION_ID, pair.getT1().get());

0 commit comments

Comments
 (0)