Skip to content

Commit c96f8d6

Browse files
Add workflow metadata query (#2301)
Add workflow metadata query
1 parent 0b192d3 commit c96f8d6

21 files changed

+443
-7
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ public interface WorkflowClient {
115115
/** Use this constant as a query type to get a workflow stack trace. */
116116
String QUERY_TYPE_STACK_TRACE = "__stack_trace";
117117

118+
/** Use this constant as a query type to get the workflow metadata. */
119+
String QUERY_TYPE_WORKFLOW_METADATA = "__temporal_workflow_metadata";
120+
118121
/** Replays workflow to the current state and returns empty result or error if replay failed. */
119122
String QUERY_TYPE_REPLAY_ONLY = "__replay_only";
120123

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ public Header getHeader() {
452452

453453
final class SignalRegistrationRequest {
454454
private final String signalType;
455+
private final String description;
455456
private final HandlerUnfinishedPolicy unfinishedPolicy;
456457
private final Class<?>[] argTypes;
457458
private final Type[] genericArgTypes;
@@ -464,19 +465,37 @@ public SignalRegistrationRequest(
464465
Type[] genericArgTypes,
465466
Functions.Proc1<Object[]> callback) {
466467
this.signalType = signalType;
468+
this.description = "";
467469
this.unfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON;
468470
this.argTypes = argTypes;
469471
this.genericArgTypes = genericArgTypes;
470472
this.callback = callback;
471473
}
472474

475+
// Kept for backward compatibility
476+
public SignalRegistrationRequest(
477+
String signalType,
478+
HandlerUnfinishedPolicy unfinishedPolicy,
479+
Class<?>[] argTypes,
480+
Type[] genericArgTypes,
481+
Functions.Proc1<Object[]> callback) {
482+
this.signalType = signalType;
483+
this.description = "";
484+
this.unfinishedPolicy = unfinishedPolicy;
485+
this.argTypes = argTypes;
486+
this.genericArgTypes = genericArgTypes;
487+
this.callback = callback;
488+
}
489+
473490
public SignalRegistrationRequest(
474491
String signalType,
492+
String description,
475493
HandlerUnfinishedPolicy unfinishedPolicy,
476494
Class<?>[] argTypes,
477495
Type[] genericArgTypes,
478496
Functions.Proc1<Object[]> callback) {
479497
this.signalType = signalType;
498+
this.description = description;
480499
this.unfinishedPolicy = unfinishedPolicy;
481500
this.argTypes = argTypes;
482501
this.genericArgTypes = genericArgTypes;
@@ -487,6 +506,11 @@ public String getSignalType() {
487506
return signalType;
488507
}
489508

509+
@Experimental
510+
public String getDescription() {
511+
return description;
512+
}
513+
490514
public HandlerUnfinishedPolicy getUnfinishedPolicy() {
491515
return unfinishedPolicy;
492516
}
@@ -519,20 +543,40 @@ public List<SignalRegistrationRequest> getRequests() {
519543
@Experimental
520544
final class UpdateRegistrationRequest {
521545
private final String updateName;
546+
private final String description;
522547
private final HandlerUnfinishedPolicy unfinishedPolicy;
523548
private final Class<?>[] argTypes;
524549
private final Type[] genericArgTypes;
525550
private final Functions.Func1<Object[], Object> executeCallback;
526551
private final Functions.Proc1<Object[]> validateCallback;
527552

553+
// Kept for backward compatibility
554+
public UpdateRegistrationRequest(
555+
String updateName,
556+
HandlerUnfinishedPolicy unfinishedPolicy,
557+
Class<?>[] argTypes,
558+
Type[] genericArgTypes,
559+
Functions.Proc1<Object[]> validateCallback,
560+
Functions.Func1<Object[], Object> executeCallback) {
561+
this.updateName = updateName;
562+
this.description = "";
563+
this.unfinishedPolicy = unfinishedPolicy;
564+
this.argTypes = argTypes;
565+
this.genericArgTypes = genericArgTypes;
566+
this.validateCallback = validateCallback;
567+
this.executeCallback = executeCallback;
568+
}
569+
528570
public UpdateRegistrationRequest(
529571
String updateName,
572+
String description,
530573
HandlerUnfinishedPolicy unfinishedPolicy,
531574
Class<?>[] argTypes,
532575
Type[] genericArgTypes,
533576
Functions.Proc1<Object[]> validateCallback,
534577
Functions.Func1<Object[], Object> executeCallback) {
535578
this.updateName = updateName;
579+
this.description = description;
536580
this.unfinishedPolicy = unfinishedPolicy;
537581
this.argTypes = argTypes;
538582
this.genericArgTypes = genericArgTypes;
@@ -544,6 +588,11 @@ public String getUpdateName() {
544588
return updateName;
545589
}
546590

591+
@Experimental
592+
public String getDescription() {
593+
return description;
594+
}
595+
547596
public HandlerUnfinishedPolicy getUnfinishedPolicy() {
548597
return unfinishedPolicy;
549598
}
@@ -580,16 +629,32 @@ public List<UpdateRegistrationRequest> getRequests() {
580629

581630
final class RegisterQueryInput {
582631
private final String queryType;
632+
private final String description;
583633
private final Class<?>[] argTypes;
584634
private final Type[] genericArgTypes;
585635
private final Functions.Func1<Object[], Object> callback;
586636

637+
// Kept for backward compatibility
638+
public RegisterQueryInput(
639+
String queryType,
640+
Class<?>[] argTypes,
641+
Type[] genericArgTypes,
642+
Functions.Func1<Object[], Object> callback) {
643+
this.queryType = queryType;
644+
this.description = "";
645+
this.argTypes = argTypes;
646+
this.genericArgTypes = genericArgTypes;
647+
this.callback = callback;
648+
}
649+
587650
public RegisterQueryInput(
588651
String queryType,
652+
String description,
589653
Class<?>[] argTypes,
590654
Type[] genericArgTypes,
591655
Functions.Func1<Object[], Object> callback) {
592656
this.queryType = queryType;
657+
this.description = description;
593658
this.argTypes = argTypes;
594659
this.genericArgTypes = genericArgTypes;
595660
this.callback = callback;
@@ -599,6 +664,11 @@ public String getQueryType() {
599664
return queryType;
600665
}
601666

667+
@Experimental
668+
public String getDescription() {
669+
return description;
670+
}
671+
602672
public Class<?>[] getArgTypes() {
603673
return argTypes;
604674
}

temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethod.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ final class POJOWorkflowMethod {
3131
private final WorkflowMethodType type;
3232
private final Method method;
3333
private final Optional<String> nameFromAnnotation;
34+
private final Optional<String> descriptionFromAnnotation;
3435

3536
POJOWorkflowMethod(Method method) {
3637
this.method = Objects.requireNonNull(method);
@@ -43,6 +44,7 @@ final class POJOWorkflowMethod {
4344
int count = 0;
4445
WorkflowMethodType type = null;
4546
String name = null;
47+
String description = null;
4648
if (workflowMethod != null) {
4749
type = WorkflowMethodType.WORKFLOW;
4850
count++;
@@ -56,6 +58,7 @@ final class POJOWorkflowMethod {
5658
}
5759
count++;
5860
name = signalMethod.name();
61+
description = signalMethod.description();
5962
}
6063
if (queryMethod != null) {
6164
type = WorkflowMethodType.QUERY;
@@ -65,11 +68,13 @@ final class POJOWorkflowMethod {
6568
}
6669
count++;
6770
name = queryMethod.name();
71+
description = queryMethod.description();
6872
}
6973
if (updateMethod != null) {
7074
type = WorkflowMethodType.UPDATE;
7175
count++;
7276
name = updateMethod.name();
77+
description = updateMethod.description();
7378
}
7479
if (updateValidatorMethod != null) {
7580
type = WorkflowMethodType.UPDATE_VALIDATOR;
@@ -86,13 +91,19 @@ final class POJOWorkflowMethod {
8691
throw new IllegalArgumentException(
8792
method
8893
+ " must contain exactly one annotation "
89-
+ "of @WorkflowMethod, @QueryMethod or @SignalMethod");
94+
+ "of @WorkflowMethod, @QueryMethod @UpdateMethod or @SignalMethod");
9095
}
9196
if (Strings.isNullOrEmpty(name)) {
9297
this.nameFromAnnotation = Optional.empty();
9398
} else {
9499
this.nameFromAnnotation = Optional.of(name);
95100
}
101+
102+
if (Strings.isNullOrEmpty(description)) {
103+
this.descriptionFromAnnotation = Optional.empty();
104+
} else {
105+
this.descriptionFromAnnotation = Optional.of(description);
106+
}
96107
this.type = Objects.requireNonNull(type);
97108
}
98109

@@ -108,6 +119,10 @@ public Optional<String> getNameFromAnnotation() {
108119
return nameFromAnnotation;
109120
}
110121

122+
public Optional<String> getDescriptionFromAnnotation() {
123+
return descriptionFromAnnotation;
124+
}
125+
111126
/** Compare and hash on method only. */
112127
@Override
113128
public boolean equals(Object o) {

temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethodMetadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public final class POJOWorkflowMethodMetadata {
2929

3030
private final POJOWorkflowMethod workflowMethod;
3131
private final String name;
32+
private final String description;
3233
private final Class<?> workflowInterface;
3334

3435
POJOWorkflowMethodMetadata(POJOWorkflowMethod methodMetadata, Class<?> workflowInterface) {
@@ -47,6 +48,7 @@ public final class POJOWorkflowMethodMetadata {
4748
} else {
4849
this.name = nameFromAnnotation.orElse(methodMetadata.getMethod().getName());
4950
}
51+
this.description = workflowMethod.getDescriptionFromAnnotation().orElse("");
5052
}
5153

5254
public WorkflowMethodType getType() {
@@ -62,6 +64,10 @@ public String getName() {
6264
return name;
6365
}
6466

67+
public String getDescription() {
68+
return description;
69+
}
70+
6571
public Method getWorkflowMethod() {
6672
return workflowMethod.getMethod();
6773
}

temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@
2121
package io.temporal.internal.sync;
2222

2323
import io.temporal.api.common.v1.Payloads;
24+
import io.temporal.api.sdk.v1.WorkflowInteractionDefinition;
2425
import io.temporal.common.converter.DataConverter;
2526
import io.temporal.common.converter.EncodedValues;
2627
import io.temporal.common.interceptors.Header;
2728
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
2829
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
2930
import io.temporal.workflow.DynamicQueryHandler;
30-
import java.util.HashMap;
31-
import java.util.Map;
32-
import java.util.Optional;
31+
import java.util.*;
3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
3534

@@ -103,4 +102,25 @@ public void registerDynamicQueryHandler(
103102
WorkflowOutboundCallsInterceptor.RegisterDynamicQueryHandlerInput input) {
104103
dynamicQueryHandler = input.getHandler();
105104
}
105+
106+
public List<WorkflowInteractionDefinition> getQueryHandlers() {
107+
List<WorkflowInteractionDefinition> handlers = new ArrayList<>(queryCallbacks.size() + 1);
108+
for (Map.Entry<String, WorkflowOutboundCallsInterceptor.RegisterQueryInput> entry :
109+
queryCallbacks.entrySet()) {
110+
WorkflowOutboundCallsInterceptor.RegisterQueryInput handler = entry.getValue();
111+
handlers.add(
112+
WorkflowInteractionDefinition.newBuilder()
113+
.setName(handler.getQueryType())
114+
.setDescription(handler.getDescription())
115+
.build());
116+
}
117+
if (dynamicQueryHandler != null) {
118+
handlers.add(
119+
WorkflowInteractionDefinition.newBuilder()
120+
.setDescription(dynamicQueryHandler.getDescription())
121+
.build());
122+
}
123+
handlers.sort(Comparator.comparing(WorkflowInteractionDefinition::getName));
124+
return handlers;
125+
}
106126
}

temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package io.temporal.internal.sync;
2222

2323
import io.temporal.api.common.v1.Payloads;
24+
import io.temporal.api.sdk.v1.WorkflowInteractionDefinition;
2425
import io.temporal.common.converter.DataConverter;
2526
import io.temporal.common.converter.DataConverterException;
2627
import io.temporal.common.converter.EncodedValues;
@@ -160,6 +161,27 @@ private void logSerializationException(
160161
Workflow.getMetricsScope().counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1);
161162
}
162163

164+
public List<WorkflowInteractionDefinition> getSignalHandlers() {
165+
List<WorkflowInteractionDefinition> handlers = new ArrayList<>(signalCallbacks.size() + 1);
166+
for (Map.Entry<String, WorkflowOutboundCallsInterceptor.SignalRegistrationRequest> entry :
167+
signalCallbacks.entrySet()) {
168+
WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler = entry.getValue();
169+
handlers.add(
170+
WorkflowInteractionDefinition.newBuilder()
171+
.setName(handler.getSignalType())
172+
.setDescription(handler.getDescription())
173+
.build());
174+
}
175+
if (dynamicSignalHandler != null) {
176+
handlers.add(
177+
WorkflowInteractionDefinition.newBuilder()
178+
.setDescription(dynamicSignalHandler.getDescription())
179+
.build());
180+
}
181+
handlers.sort(Comparator.comparing(WorkflowInteractionDefinition::getName));
182+
return handlers;
183+
}
184+
163185
private static class SignalData {
164186
private final String signalName;
165187
private final Optional<Payloads> payload;

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.temporal.internal.statemachines.UpdateProtocolCallback;
4040
import io.temporal.internal.worker.WorkflowExecutionException;
4141
import io.temporal.internal.worker.WorkflowExecutorCache;
42+
import io.temporal.payload.context.WorkflowSerializationContext;
4243
import io.temporal.worker.WorkflowImplementationOptions;
4344
import io.temporal.workflow.UpdateInfo;
4445
import java.util.List;
@@ -69,6 +70,7 @@ class SyncWorkflow implements ReplayWorkflow {
6970
private WorkflowExecutionHandler workflowProc;
7071
private DeterministicRunner runner;
7172
private DataConverter dataConverter;
73+
private DataConverter dataConverterWithWorkflowContext;
7274

7375
public SyncWorkflow(
7476
String namespace,
@@ -92,6 +94,9 @@ public SyncWorkflow(
9294
this.cache = cache;
9395
this.defaultDeadlockDetectionTimeout = defaultDeadlockDetectionTimeout;
9496
this.dataConverter = dataConverter;
97+
this.dataConverterWithWorkflowContext =
98+
dataConverter.withContext(
99+
new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
95100
this.workflowContext =
96101
new SyncWorkflowContext(
97102
namespace,
@@ -238,6 +243,9 @@ public Optional<Payloads> query(WorkflowQuery query) {
238243
// converter
239244
return DefaultDataConverter.STANDARD_INSTANCE.toPayloads(runner.stackTrace());
240245
}
246+
if (WorkflowClient.QUERY_TYPE_WORKFLOW_METADATA.equals(query.getQueryType())) {
247+
return dataConverterWithWorkflowContext.toPayloads(workflowContext.getWorkflowMetadata());
248+
}
241249
Optional<Payloads> args =
242250
query.hasQueryArgs() ? Optional.of(query.getQueryArgs()) : Optional.empty();
243251
return workflowProc.handleQuery(query.getQueryType(), query.getHeader(), args);

0 commit comments

Comments
 (0)