From bd5f8fada78d621aeb7c9f17bc78a9447d42a411 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 14 May 2025 07:09:59 -0700 Subject: [PATCH 1/3] Add an interceptor for listExecutions --- .../client/ListWorkflowExecutionIterator.java | 4 +- .../client/WorkflowClientInternalImpl.java | 21 ++----- .../client/WorkflowExecutionMetadata.java | 3 +- .../WorkflowClientCallsInterceptor.java | 35 +++++++++++ .../WorkflowClientCallsInterceptorBase.java | 5 ++ .../client/RootWorkflowClientInvoker.java | 26 ++++++++ ...ListWorkflowExecutionsInterceptorTest.java | 63 +++++++++++++++++++ 7 files changed, 137 insertions(+), 20 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/client/ListWorkflowExecutionsInterceptorTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java b/temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java index 1e282c6ef..c1ef04ab9 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java @@ -11,14 +11,14 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -class ListWorkflowExecutionIterator +public class ListWorkflowExecutionIterator extends EagerPaginator { private final @Nullable String query; private final @Nonnull String namespace; private final @Nullable Integer pageSize; private final @Nonnull GenericWorkflowClient genericClient; - ListWorkflowExecutionIterator( + public ListWorkflowExecutionIterator( @Nullable String query, @Nonnull String namespace, @Nullable Integer pageSize, diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java index e13e88b02..eae0ef11c 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java @@ -4,7 +4,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.Iterators; import com.google.common.reflect.TypeToken; import com.uber.m3.tally.Scope; import io.temporal.api.common.v1.WorkflowExecution; @@ -250,22 +249,10 @@ public WorkflowExecutionCount countWorkflows(@Nullable String query) { Stream listExecutions( @Nullable String query, @Nullable Integer pageSize) { - ListWorkflowExecutionIterator iterator = - new ListWorkflowExecutionIterator(query, options.getNamespace(), pageSize, genericClient); - iterator.init(); - Iterator wrappedIterator = - Iterators.transform( - iterator, info -> new WorkflowExecutionMetadata(info, options.getDataConverter())); - - // IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is - // impossible - // TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list - // API - // guarantees absence of duplicates - final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE; - - return StreamSupport.stream( - Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false); + return workflowClientCallsInvoker + .listWorkflowExecutions( + new WorkflowClientCallsInterceptor.ListWorkflowExecutionsInput(query, pageSize)) + .getStream(); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java index bdebb267c..b35cdaed1 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java @@ -20,11 +20,12 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +/** WorkflowExecutionMetadata contains information about a workflow execution. */ public class WorkflowExecutionMetadata { private final @Nonnull WorkflowExecutionInfo info; private final @Nonnull DataConverter dataConverter; - WorkflowExecutionMetadata( + public WorkflowExecutionMetadata( @Nonnull WorkflowExecutionInfo info, @Nonnull DataConverter dataConverter) { this.info = Preconditions.checkNotNull(info, "info"); this.dataConverter = Preconditions.checkNotNull(dataConverter, "dataConverter"); diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java index 024b78520..8391412a4 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java @@ -10,6 +10,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -75,6 +76,40 @@ public interface WorkflowClientCallsInterceptor { DescribeWorkflowOutput describe(DescribeWorkflowInput input); + ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input); + + final class ListWorkflowExecutionsInput { + private final String query; + private final Integer pageSize; + + public ListWorkflowExecutionsInput(@Nullable String query, @Nullable Integer pageSize) { + this.query = query; + this.pageSize = pageSize; + } + + @Nullable + public String getQuery() { + return query; + } + + @Nullable + public Integer getPageSize() { + return pageSize; + } + } + + final class ListWorkflowExecutionsOutput { + private final Stream stream; + + public ListWorkflowExecutionsOutput(Stream stream) { + this.stream = stream; + } + + public Stream getStream() { + return stream; + } + } + CountWorkflowOutput countWorkflows(CountWorkflowsInput input); final class WorkflowStartInput { diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java index 09c101406..6d50650e5 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java @@ -73,6 +73,11 @@ public DescribeWorkflowOutput describe(DescribeWorkflowInput input) { return next.describe(input); } + @Override + public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) { + return next.listWorkflowExecutions(input); + } + @Override public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) { return next.countWorkflows(input); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index a0fc2e424..a3006518f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -30,10 +30,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Iterators; + public class RootWorkflowClientInvoker implements WorkflowClientCallsInterceptor { private static final Logger log = LoggerFactory.getLogger(RootWorkflowClientInvoker.class); private static final long POLL_UPDATE_TIMEOUT_S = 60L; @@ -688,6 +691,29 @@ public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) { } CountWorkflowExecutionsResponse resp = genericClient.countWorkflowExecutions(req.build()); return new CountWorkflowOutput(new WorkflowExecutionCount(resp)); +} + +@Override + public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) { + ListWorkflowExecutionIterator iterator = + new ListWorkflowExecutionIterator( + input.getQuery(), clientOptions.getNamespace(), input.getPageSize(), genericClient); + iterator.init(); + Iterator wrappedIterator = + Iterators.transform( + iterator, + info -> new WorkflowExecutionMetadata(info, clientOptions.getDataConverter())); + + // IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is + // impossible + // TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list + // API + // guarantees absence of duplicates + final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE; + + return new ListWorkflowExecutionsOutput( + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false)); } private static R convertResultPayloads( diff --git a/temporal-sdk/src/test/java/io/temporal/client/ListWorkflowExecutionsInterceptorTest.java b/temporal-sdk/src/test/java/io/temporal/client/ListWorkflowExecutionsInterceptorTest.java new file mode 100644 index 000000000..344ce1251 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/ListWorkflowExecutionsInterceptorTest.java @@ -0,0 +1,63 @@ +package io.temporal.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assume.assumeTrue; + +import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; +import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase; +import io.temporal.common.interceptors.WorkflowClientInterceptorBase; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestWorkflows; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Rule; +import org.junit.Test; + +public class ListWorkflowExecutionsInterceptorTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestWorkflows.DoNothingNoArgsWorkflow.class) + .build(); + + @Test + public void listExecutions_isIntercepted() throws InterruptedException { + assumeTrue( + "Test Server doesn't support listWorkflowExecutions endpoint yet", + SDKTestWorkflowRule.useExternalService); + + AtomicInteger intercepted = new AtomicInteger(); + WorkflowClient workflowClient = + WorkflowClient.newInstance( + testWorkflowRule.getWorkflowServiceStubs(), + WorkflowClientOptions.newBuilder(testWorkflowRule.getWorkflowClient().getOptions()) + .setInterceptors( + new WorkflowClientInterceptorBase() { + @Override + public WorkflowClientCallsInterceptor workflowClientCallsInterceptor( + WorkflowClientCallsInterceptor next) { + return new WorkflowClientCallsInterceptorBase(next) { + @Override + public ListWorkflowExecutionsOutput listWorkflowExecutions( + ListWorkflowExecutionsInput input) { + intercepted.incrementAndGet(); + return super.listWorkflowExecutions(input); + } + }; + } + }) + .validateAndBuildWithDefaults()); + + WorkflowStub.fromTyped(testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class)) + .start(); + + // Visibility API is eventually consistent + Thread.sleep(2_000); + java.util.List result = + workflowClient + .listExecutions("TaskQueue='" + testWorkflowRule.getTaskQueue() + "'") + .collect(java.util.stream.Collectors.toList()); + assertFalse(result.isEmpty()); + assertEquals(1, intercepted.get()); + } +} From 04d92a146cf6e47e3095263c28d5122e5e98c705 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 14 May 2025 07:22:19 -0700 Subject: [PATCH 2/3] Run spotless --- .../io/temporal/internal/client/RootWorkflowClientInvoker.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index a3006518f..65d3d33e8 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -5,6 +5,7 @@ import static io.temporal.internal.common.HeaderUtils.intoPayloadMap; import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData; +import com.google.common.collect.Iterators; import io.grpc.Deadline; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -35,8 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Iterators; - public class RootWorkflowClientInvoker implements WorkflowClientCallsInterceptor { private static final Logger log = LoggerFactory.getLogger(RootWorkflowClientInvoker.class); private static final long POLL_UPDATE_TIMEOUT_S = 60L; From ae6931702fa6c6857c57702034645a806321ae7c Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 15 May 2025 12:16:59 -0700 Subject: [PATCH 3/3] run format --- .../temporal/internal/client/RootWorkflowClientInvoker.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 65d3d33e8..4d45b1c70 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -690,9 +690,9 @@ public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) { } CountWorkflowExecutionsResponse resp = genericClient.countWorkflowExecutions(req.build()); return new CountWorkflowOutput(new WorkflowExecutionCount(resp)); -} + } -@Override + @Override public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) { ListWorkflowExecutionIterator iterator = new ListWorkflowExecutionIterator(