Skip to content

Add an interceptor for listExecutions #2524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
import io.temporal.internal.client.EagerPaginator;
import io.temporal.internal.client.external.GenericWorkflowClient;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -30,7 +31,7 @@ public GetWorkflowExecutionHistoryIterator(
}

@Override
CompletableFuture<GetWorkflowExecutionHistoryResponse> performRequest(
protected CompletableFuture<GetWorkflowExecutionHistoryResponse> performRequest(
@Nonnull ByteString nextPageToken) {
GetWorkflowExecutionHistoryRequest.Builder requestBuilder =
GetWorkflowExecutionHistoryRequest.newBuilder()
Expand All @@ -46,12 +47,12 @@ CompletableFuture<GetWorkflowExecutionHistoryResponse> performRequest(
}

@Override
ByteString getNextPageToken(GetWorkflowExecutionHistoryResponse response) {
protected ByteString getNextPageToken(GetWorkflowExecutionHistoryResponse response) {
return response.getNextPageToken();
}

@Override
List<HistoryEvent> toElements(GetWorkflowExecutionHistoryResponse response) {
protected List<HistoryEvent> toElements(GetWorkflowExecutionHistoryResponse response) {
return response.getHistory().getEventsList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.temporal.api.schedule.v1.ScheduleListEntry;
import io.temporal.api.workflowservice.v1.ListSchedulesRequest;
import io.temporal.api.workflowservice.v1.ListSchedulesResponse;
import io.temporal.internal.client.EagerPaginator;
import io.temporal.internal.client.external.GenericWorkflowClient;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -40,7 +41,8 @@ public ListScheduleListDescriptionIterator(
}

@Override
CompletableFuture<ListSchedulesResponse> performRequest(@Nonnull ByteString nextPageToken) {
protected CompletableFuture<ListSchedulesResponse> performRequest(
@Nonnull ByteString nextPageToken) {
ListSchedulesRequest.Builder request =
ListSchedulesRequest.newBuilder().setNamespace(namespace).setNextPageToken(nextPageToken);

Expand All @@ -54,12 +56,12 @@ CompletableFuture<ListSchedulesResponse> performRequest(@Nonnull ByteString next
}

@Override
ByteString getNextPageToken(ListSchedulesResponse response) {
protected ByteString getNextPageToken(ListSchedulesResponse response) {
return response.getNextPageToken();
}

@Override
List<ScheduleListEntry> toElements(ListSchedulesResponse response) {
protected List<ScheduleListEntry> toElements(ListSchedulesResponse response) {
return response.getSchedulesList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import com.google.common.reflect.TypeToken;
import com.uber.m3.tally.Scope;
import io.temporal.api.common.v1.WorkflowExecution;
Expand Down Expand Up @@ -250,22 +249,10 @@ public WorkflowExecutionCount countWorkflows(@Nullable String query) {

Stream<WorkflowExecutionMetadata> listExecutions(
@Nullable String query, @Nullable Integer pageSize) {
ListWorkflowExecutionIterator iterator =
new ListWorkflowExecutionIterator(query, options.getNamespace(), pageSize, genericClient);
iterator.init();
Iterator<WorkflowExecutionMetadata> wrappedIterator =
Iterators.transform(
iterator, info -> new WorkflowExecutionMetadata(info, options.getDataConverter()));

// IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
// impossible
// TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list
// API
// guarantees absence of duplicates
final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE;

return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false);
return workflowClientCallsInvoker
.listWorkflowExecutions(
new WorkflowClientCallsInterceptor.ListWorkflowExecutionsInput(query, pageSize))
.getStream();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** WorkflowExecutionMetadata contains information about a workflow execution. */
public class WorkflowExecutionMetadata {
private final @Nonnull WorkflowExecutionInfo info;
private final @Nonnull DataConverter dataConverter;

WorkflowExecutionMetadata(
public WorkflowExecutionMetadata(
@Nonnull WorkflowExecutionInfo info, @Nonnull DataConverter dataConverter) {
this.info = Preconditions.checkNotNull(info, "info");
this.dataConverter = Preconditions.checkNotNull(dataConverter, "dataConverter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -75,6 +76,40 @@ public interface WorkflowClientCallsInterceptor {

DescribeWorkflowOutput describe(DescribeWorkflowInput input);

ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input);

final class ListWorkflowExecutionsInput {
private final String query;
private final Integer pageSize;

public ListWorkflowExecutionsInput(@Nullable String query, @Nullable Integer pageSize) {
this.query = query;
this.pageSize = pageSize;
}

@Nullable
public String getQuery() {
return query;
}

@Nullable
public Integer getPageSize() {
return pageSize;
}
}

final class ListWorkflowExecutionsOutput {
private final Stream<WorkflowExecutionMetadata> stream;

public ListWorkflowExecutionsOutput(Stream<WorkflowExecutionMetadata> stream) {
this.stream = stream;
}

public Stream<WorkflowExecutionMetadata> getStream() {
return stream;
}
}

CountWorkflowOutput countWorkflows(CountWorkflowsInput input);

final class WorkflowStartInput {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public DescribeWorkflowOutput describe(DescribeWorkflowInput input) {
return next.describe(input);
}

@Override
public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) {
return next.listWorkflowExecutions(input);
}

@Override
public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) {
return next.countWorkflows(input);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.temporal.client;
package io.temporal.internal.client;

import com.google.protobuf.ByteString;
import java.util.Iterator;
Expand All @@ -15,7 +15,7 @@
* previous page. The main goal of this approach is to reduce a synchronous wait that would
* otherwise happen when a first element of the next page is requested.
*/
abstract class EagerPaginator<Resp, T> implements Iterator<T> {
public abstract class EagerPaginator<Resp, T> implements Iterator<T> {
private List<T> activeResponse;
private int nextActiveResponseIndex;
private CompletableFuture<Resp> nextResponse;
Expand Down Expand Up @@ -92,9 +92,9 @@ private Resp waitAndGetNextResponse() {
return response;
}

abstract CompletableFuture<Resp> performRequest(@Nonnull ByteString nextPageToken);
protected abstract CompletableFuture<Resp> performRequest(@Nonnull ByteString nextPageToken);

abstract ByteString getNextPageToken(Resp response);
protected abstract ByteString getNextPageToken(Resp response);

abstract List<T> toElements(Resp response);
protected abstract List<T> toElements(Resp response);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.temporal.client;
package io.temporal.internal.client;

import com.google.protobuf.ByteString;
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
Expand All @@ -18,7 +18,7 @@ class ListWorkflowExecutionIterator
private final @Nullable Integer pageSize;
private final @Nonnull GenericWorkflowClient genericClient;

ListWorkflowExecutionIterator(
public ListWorkflowExecutionIterator(
@Nullable String query,
@Nonnull String namespace,
@Nullable Integer pageSize,
Expand All @@ -30,7 +30,7 @@ class ListWorkflowExecutionIterator
}

@Override
CompletableFuture<ListWorkflowExecutionsResponse> performRequest(
protected CompletableFuture<ListWorkflowExecutionsResponse> performRequest(
@Nonnull ByteString nextPageToken) {
ListWorkflowExecutionsRequest.Builder request =
ListWorkflowExecutionsRequest.newBuilder()
Expand All @@ -49,12 +49,12 @@ CompletableFuture<ListWorkflowExecutionsResponse> performRequest(
}

@Override
ByteString getNextPageToken(ListWorkflowExecutionsResponse response) {
protected ByteString getNextPageToken(ListWorkflowExecutionsResponse response) {
return response.getNextPageToken();
}

@Override
List<WorkflowExecutionInfo> toElements(ListWorkflowExecutionsResponse response) {
protected List<WorkflowExecutionInfo> toElements(ListWorkflowExecutionsResponse response) {
return response.getExecutionsList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData;

import com.google.common.collect.Iterators;
import io.grpc.Deadline;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -690,6 +692,29 @@ public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) {
return new CountWorkflowOutput(new WorkflowExecutionCount(resp));
}

@Override
public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) {
ListWorkflowExecutionIterator iterator =
new ListWorkflowExecutionIterator(
input.getQuery(), clientOptions.getNamespace(), input.getPageSize(), genericClient);
iterator.init();
Iterator<WorkflowExecutionMetadata> wrappedIterator =
Iterators.transform(
iterator,
info -> new WorkflowExecutionMetadata(info, clientOptions.getDataConverter()));

// IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
// impossible
// TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list
// API
// guarantees absence of duplicates
final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE;

return new ListWorkflowExecutionsOutput(
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false));
}

private static <R> R convertResultPayloads(
Optional<Payloads> resultValue,
Class<R> resultClass,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.temporal.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assume.assumeTrue;

import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase;
import io.temporal.common.interceptors.WorkflowClientInterceptorBase;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestWorkflows;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Rule;
import org.junit.Test;

public class ListWorkflowExecutionsInterceptorTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflows.DoNothingNoArgsWorkflow.class)
.build();

@Test
public void listExecutions_isIntercepted() throws InterruptedException {
assumeTrue(
"Test Server doesn't support listWorkflowExecutions endpoint yet",
SDKTestWorkflowRule.useExternalService);

AtomicInteger intercepted = new AtomicInteger();
WorkflowClient workflowClient =
WorkflowClient.newInstance(
testWorkflowRule.getWorkflowServiceStubs(),
WorkflowClientOptions.newBuilder(testWorkflowRule.getWorkflowClient().getOptions())
.setInterceptors(
new WorkflowClientInterceptorBase() {
@Override
public WorkflowClientCallsInterceptor workflowClientCallsInterceptor(
WorkflowClientCallsInterceptor next) {
return new WorkflowClientCallsInterceptorBase(next) {
@Override
public ListWorkflowExecutionsOutput listWorkflowExecutions(
ListWorkflowExecutionsInput input) {
intercepted.incrementAndGet();
return super.listWorkflowExecutions(input);
}
};
}
})
.validateAndBuildWithDefaults());

WorkflowStub.fromTyped(testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class))
.start();

// Visibility API is eventually consistent
Thread.sleep(2_000);
java.util.List<WorkflowExecutionMetadata> result =
workflowClient
.listExecutions("TaskQueue='" + testWorkflowRule.getTaskQueue() + "'")
.collect(java.util.stream.Collectors.toList());
assertFalse(result.isEmpty());
assertEquals(1, intercepted.get());
}
}
Loading