From 15d81c53d1dcefb7be76756e13a550bf9ea4ca37 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 14 May 2025 07:09:59 -0700 Subject: [PATCH 1/4] Add an interceptor for listExecutions --- .../client/ListWorkflowExecutionIterator.java | 4 +- .../client/WorkflowClientInternalImpl.java | 21 ++----- .../client/WorkflowExecutionMetadata.java | 3 +- .../WorkflowClientCallsInterceptor.java | 35 +++++++++++ .../WorkflowClientCallsInterceptorBase.java | 5 ++ .../client/RootWorkflowClientInvoker.java | 26 ++++++++ ...ListWorkflowExecutionsInterceptorTest.java | 63 +++++++++++++++++++ 7 files changed, 137 insertions(+), 20 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/client/ListWorkflowExecutionsInterceptorTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java b/temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java index 1e282c6efb..c1ef04ab9a 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java @@ -11,14 +11,14 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -class ListWorkflowExecutionIterator +public class ListWorkflowExecutionIterator extends EagerPaginator { private final @Nullable String query; private final @Nonnull String namespace; private final @Nullable Integer pageSize; private final @Nonnull GenericWorkflowClient genericClient; - ListWorkflowExecutionIterator( + public ListWorkflowExecutionIterator( @Nullable String query, @Nonnull String namespace, @Nullable Integer pageSize, diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java index e13e88b02d..eae0ef11cf 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java @@ -4,7 +4,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.Iterators; import com.google.common.reflect.TypeToken; import com.uber.m3.tally.Scope; import io.temporal.api.common.v1.WorkflowExecution; @@ -250,22 +249,10 @@ public WorkflowExecutionCount countWorkflows(@Nullable String query) { Stream listExecutions( @Nullable String query, @Nullable Integer pageSize) { - ListWorkflowExecutionIterator iterator = - new ListWorkflowExecutionIterator(query, options.getNamespace(), pageSize, genericClient); - iterator.init(); - Iterator wrappedIterator = - Iterators.transform( - iterator, info -> new WorkflowExecutionMetadata(info, options.getDataConverter())); - - // IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is - // impossible - // TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list - // API - // guarantees absence of duplicates - final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE; - - return StreamSupport.stream( - Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false); + return workflowClientCallsInvoker + .listWorkflowExecutions( + new WorkflowClientCallsInterceptor.ListWorkflowExecutionsInput(query, pageSize)) + .getStream(); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java index bdebb267c9..b35cdaed12 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java @@ -20,11 +20,12 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +/** WorkflowExecutionMetadata contains information about a workflow execution. */ public class WorkflowExecutionMetadata { private final @Nonnull WorkflowExecutionInfo info; private final @Nonnull DataConverter dataConverter; - WorkflowExecutionMetadata( + public WorkflowExecutionMetadata( @Nonnull WorkflowExecutionInfo info, @Nonnull DataConverter dataConverter) { this.info = Preconditions.checkNotNull(info, "info"); this.dataConverter = Preconditions.checkNotNull(dataConverter, "dataConverter"); diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java index 024b78520f..8391412a42 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java @@ -10,6 +10,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -75,6 +76,40 @@ public interface WorkflowClientCallsInterceptor { DescribeWorkflowOutput describe(DescribeWorkflowInput input); + ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input); + + final class ListWorkflowExecutionsInput { + private final String query; + private final Integer pageSize; + + public ListWorkflowExecutionsInput(@Nullable String query, @Nullable Integer pageSize) { + this.query = query; + this.pageSize = pageSize; + } + + @Nullable + public String getQuery() { + return query; + } + + @Nullable + public Integer getPageSize() { + return pageSize; + } + } + + final class ListWorkflowExecutionsOutput { + private final Stream stream; + + public ListWorkflowExecutionsOutput(Stream stream) { + this.stream = stream; + } + + public Stream getStream() { + return stream; + } + } + CountWorkflowOutput countWorkflows(CountWorkflowsInput input); final class WorkflowStartInput { diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java index 09c1014061..6d50650e58 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java @@ -73,6 +73,11 @@ public DescribeWorkflowOutput describe(DescribeWorkflowInput input) { return next.describe(input); } + @Override + public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) { + return next.listWorkflowExecutions(input); + } + @Override public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) { return next.countWorkflows(input); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index a0fc2e4245..a3006518f9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -30,10 +30,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Iterators; + public class RootWorkflowClientInvoker implements WorkflowClientCallsInterceptor { private static final Logger log = LoggerFactory.getLogger(RootWorkflowClientInvoker.class); private static final long POLL_UPDATE_TIMEOUT_S = 60L; @@ -688,6 +691,29 @@ public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) { } CountWorkflowExecutionsResponse resp = genericClient.countWorkflowExecutions(req.build()); return new CountWorkflowOutput(new WorkflowExecutionCount(resp)); +} + +@Override + public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) { + ListWorkflowExecutionIterator iterator = + new ListWorkflowExecutionIterator( + input.getQuery(), clientOptions.getNamespace(), input.getPageSize(), genericClient); + iterator.init(); + Iterator wrappedIterator = + Iterators.transform( + iterator, + info -> new WorkflowExecutionMetadata(info, clientOptions.getDataConverter())); + + // IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is + // impossible + // TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list + // API + // guarantees absence of duplicates + final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE; + + return new ListWorkflowExecutionsOutput( + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false)); } private static R convertResultPayloads( diff --git a/temporal-sdk/src/test/java/io/temporal/client/ListWorkflowExecutionsInterceptorTest.java b/temporal-sdk/src/test/java/io/temporal/client/ListWorkflowExecutionsInterceptorTest.java new file mode 100644 index 0000000000..344ce1251e --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/ListWorkflowExecutionsInterceptorTest.java @@ -0,0 +1,63 @@ +package io.temporal.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assume.assumeTrue; + +import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; +import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase; +import io.temporal.common.interceptors.WorkflowClientInterceptorBase; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestWorkflows; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Rule; +import org.junit.Test; + +public class ListWorkflowExecutionsInterceptorTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestWorkflows.DoNothingNoArgsWorkflow.class) + .build(); + + @Test + public void listExecutions_isIntercepted() throws InterruptedException { + assumeTrue( + "Test Server doesn't support listWorkflowExecutions endpoint yet", + SDKTestWorkflowRule.useExternalService); + + AtomicInteger intercepted = new AtomicInteger(); + WorkflowClient workflowClient = + WorkflowClient.newInstance( + testWorkflowRule.getWorkflowServiceStubs(), + WorkflowClientOptions.newBuilder(testWorkflowRule.getWorkflowClient().getOptions()) + .setInterceptors( + new WorkflowClientInterceptorBase() { + @Override + public WorkflowClientCallsInterceptor workflowClientCallsInterceptor( + WorkflowClientCallsInterceptor next) { + return new WorkflowClientCallsInterceptorBase(next) { + @Override + public ListWorkflowExecutionsOutput listWorkflowExecutions( + ListWorkflowExecutionsInput input) { + intercepted.incrementAndGet(); + return super.listWorkflowExecutions(input); + } + }; + } + }) + .validateAndBuildWithDefaults()); + + WorkflowStub.fromTyped(testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class)) + .start(); + + // Visibility API is eventually consistent + Thread.sleep(2_000); + java.util.List result = + workflowClient + .listExecutions("TaskQueue='" + testWorkflowRule.getTaskQueue() + "'") + .collect(java.util.stream.Collectors.toList()); + assertFalse(result.isEmpty()); + assertEquals(1, intercepted.get()); + } +} From b56b8e9a0f7d78bc84de9346aacd2a481edc03db Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 14 May 2025 07:22:19 -0700 Subject: [PATCH 2/4] Run spotless --- .../io/temporal/internal/client/RootWorkflowClientInvoker.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index a3006518f9..65d3d33e88 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -5,6 +5,7 @@ import static io.temporal.internal.common.HeaderUtils.intoPayloadMap; import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData; +import com.google.common.collect.Iterators; import io.grpc.Deadline; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -35,8 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Iterators; - public class RootWorkflowClientInvoker implements WorkflowClientCallsInterceptor { private static final Logger log = LoggerFactory.getLogger(RootWorkflowClientInvoker.class); private static final long POLL_UPDATE_TIMEOUT_S = 60L; From ecdd65ec6aac5082e85195b35f3324d8bf7a818c Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 15 May 2025 12:16:59 -0700 Subject: [PATCH 3/4] run format --- .../temporal/internal/client/RootWorkflowClientInvoker.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 65d3d33e88..4d45b1c701 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -690,9 +690,9 @@ public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) { } CountWorkflowExecutionsResponse resp = genericClient.countWorkflowExecutions(req.build()); return new CountWorkflowOutput(new WorkflowExecutionCount(resp)); -} + } -@Override + @Override public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) { ListWorkflowExecutionIterator iterator = new ListWorkflowExecutionIterator( From ac93fbf3e114f64c08770ca96f0b2502f5dc88c9 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 2 Jun 2025 08:43:03 -0700 Subject: [PATCH 4/4] Fix ListWorkflowExecutionIterator --- .../client/GetWorkflowExecutionHistoryIterator.java | 7 ++++--- .../client/ListScheduleListDescriptionIterator.java | 8 +++++--- .../temporal/{ => internal}/client/EagerPaginator.java | 10 +++++----- .../client/ListWorkflowExecutionIterator.java | 10 +++++----- 4 files changed, 19 insertions(+), 16 deletions(-) rename temporal-sdk/src/main/java/io/temporal/{ => internal}/client/EagerPaginator.java (89%) rename temporal-sdk/src/main/java/io/temporal/{ => internal}/client/ListWorkflowExecutionIterator.java (83%) diff --git a/temporal-sdk/src/main/java/io/temporal/client/GetWorkflowExecutionHistoryIterator.java b/temporal-sdk/src/main/java/io/temporal/client/GetWorkflowExecutionHistoryIterator.java index 1ffdded84f..aa6200f181 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/GetWorkflowExecutionHistoryIterator.java +++ b/temporal-sdk/src/main/java/io/temporal/client/GetWorkflowExecutionHistoryIterator.java @@ -5,6 +5,7 @@ import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest; import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse; +import io.temporal.internal.client.EagerPaginator; import io.temporal.internal.client.external.GenericWorkflowClient; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -30,7 +31,7 @@ public GetWorkflowExecutionHistoryIterator( } @Override - CompletableFuture performRequest( + protected CompletableFuture performRequest( @Nonnull ByteString nextPageToken) { GetWorkflowExecutionHistoryRequest.Builder requestBuilder = GetWorkflowExecutionHistoryRequest.newBuilder() @@ -46,12 +47,12 @@ CompletableFuture performRequest( } @Override - ByteString getNextPageToken(GetWorkflowExecutionHistoryResponse response) { + protected ByteString getNextPageToken(GetWorkflowExecutionHistoryResponse response) { return response.getNextPageToken(); } @Override - List toElements(GetWorkflowExecutionHistoryResponse response) { + protected List toElements(GetWorkflowExecutionHistoryResponse response) { return response.getHistory().getEventsList(); } } diff --git a/temporal-sdk/src/main/java/io/temporal/client/ListScheduleListDescriptionIterator.java b/temporal-sdk/src/main/java/io/temporal/client/ListScheduleListDescriptionIterator.java index 718ad9bc88..efda9220da 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ListScheduleListDescriptionIterator.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ListScheduleListDescriptionIterator.java @@ -4,6 +4,7 @@ import io.temporal.api.schedule.v1.ScheduleListEntry; import io.temporal.api.workflowservice.v1.ListSchedulesRequest; import io.temporal.api.workflowservice.v1.ListSchedulesResponse; +import io.temporal.internal.client.EagerPaginator; import io.temporal.internal.client.external.GenericWorkflowClient; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -40,7 +41,8 @@ public ListScheduleListDescriptionIterator( } @Override - CompletableFuture performRequest(@Nonnull ByteString nextPageToken) { + protected CompletableFuture performRequest( + @Nonnull ByteString nextPageToken) { ListSchedulesRequest.Builder request = ListSchedulesRequest.newBuilder().setNamespace(namespace).setNextPageToken(nextPageToken); @@ -54,12 +56,12 @@ CompletableFuture performRequest(@Nonnull ByteString next } @Override - ByteString getNextPageToken(ListSchedulesResponse response) { + protected ByteString getNextPageToken(ListSchedulesResponse response) { return response.getNextPageToken(); } @Override - List toElements(ListSchedulesResponse response) { + protected List toElements(ListSchedulesResponse response) { return response.getSchedulesList(); } } diff --git a/temporal-sdk/src/main/java/io/temporal/client/EagerPaginator.java b/temporal-sdk/src/main/java/io/temporal/internal/client/EagerPaginator.java similarity index 89% rename from temporal-sdk/src/main/java/io/temporal/client/EagerPaginator.java rename to temporal-sdk/src/main/java/io/temporal/internal/client/EagerPaginator.java index 4cb0c8a561..5a9ddfe8b0 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/EagerPaginator.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/EagerPaginator.java @@ -1,4 +1,4 @@ -package io.temporal.client; +package io.temporal.internal.client; import com.google.protobuf.ByteString; import java.util.Iterator; @@ -15,7 +15,7 @@ * previous page. The main goal of this approach is to reduce a synchronous wait that would * otherwise happen when a first element of the next page is requested. */ -abstract class EagerPaginator implements Iterator { +public abstract class EagerPaginator implements Iterator { private List activeResponse; private int nextActiveResponseIndex; private CompletableFuture nextResponse; @@ -92,9 +92,9 @@ private Resp waitAndGetNextResponse() { return response; } - abstract CompletableFuture performRequest(@Nonnull ByteString nextPageToken); + protected abstract CompletableFuture performRequest(@Nonnull ByteString nextPageToken); - abstract ByteString getNextPageToken(Resp response); + protected abstract ByteString getNextPageToken(Resp response); - abstract List toElements(Resp response); + protected abstract List toElements(Resp response); } diff --git a/temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java b/temporal-sdk/src/main/java/io/temporal/internal/client/ListWorkflowExecutionIterator.java similarity index 83% rename from temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java rename to temporal-sdk/src/main/java/io/temporal/internal/client/ListWorkflowExecutionIterator.java index c1ef04ab9a..3b67145685 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/ListWorkflowExecutionIterator.java @@ -1,4 +1,4 @@ -package io.temporal.client; +package io.temporal.internal.client; import com.google.protobuf.ByteString; import io.temporal.api.workflow.v1.WorkflowExecutionInfo; @@ -11,7 +11,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -public class ListWorkflowExecutionIterator +class ListWorkflowExecutionIterator extends EagerPaginator { private final @Nullable String query; private final @Nonnull String namespace; @@ -30,7 +30,7 @@ public ListWorkflowExecutionIterator( } @Override - CompletableFuture performRequest( + protected CompletableFuture performRequest( @Nonnull ByteString nextPageToken) { ListWorkflowExecutionsRequest.Builder request = ListWorkflowExecutionsRequest.newBuilder() @@ -49,12 +49,12 @@ CompletableFuture performRequest( } @Override - ByteString getNextPageToken(ListWorkflowExecutionsResponse response) { + protected ByteString getNextPageToken(ListWorkflowExecutionsResponse response) { return response.getNextPageToken(); } @Override - List toElements(ListWorkflowExecutionsResponse response) { + protected List toElements(ListWorkflowExecutionsResponse response) { return response.getExecutionsList(); } }