Skip to content

Add an interceptor for listExecutions #2524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

class ListWorkflowExecutionIterator
public class ListWorkflowExecutionIterator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public to support the interceptor? In .NET for example we felt there was no need to expose the underlying machinery of list workflows because creating an async iterator (or Java stream) should be easy enough if they must completely replace the logic and not just delegate to the next interceptor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally no, the problem is the RootWorkflowClientInvoker.java is in internal so it can't be private, moving this class causes a lot of other issues

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moving this class causes a lot of other issues

Hrmm. I wonder making public and moving from io.temporal.client to io.temporal.client.internal would make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try, I think I had some issue doing that since some other classes in io.temporal.client also have Iterators

extends EagerPaginator<ListWorkflowExecutionsResponse, WorkflowExecutionInfo> {
private final @Nullable String query;
private final @Nonnull String namespace;
private final @Nullable Integer pageSize;
private final @Nonnull GenericWorkflowClient genericClient;

ListWorkflowExecutionIterator(
public ListWorkflowExecutionIterator(
@Nullable String query,
@Nonnull String namespace,
@Nullable Integer pageSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import com.google.common.reflect.TypeToken;
import com.uber.m3.tally.Scope;
import io.temporal.api.common.v1.WorkflowExecution;
Expand Down Expand Up @@ -250,22 +249,10 @@ public WorkflowExecutionCount countWorkflows(@Nullable String query) {

Stream<WorkflowExecutionMetadata> listExecutions(
@Nullable String query, @Nullable Integer pageSize) {
ListWorkflowExecutionIterator iterator =
new ListWorkflowExecutionIterator(query, options.getNamespace(), pageSize, genericClient);
iterator.init();
Iterator<WorkflowExecutionMetadata> wrappedIterator =
Iterators.transform(
iterator, info -> new WorkflowExecutionMetadata(info, options.getDataConverter()));

// IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
// impossible
// TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list
// API
// guarantees absence of duplicates
final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE;

return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false);
return workflowClientCallsInvoker
.listWorkflowExecutions(
new WorkflowClientCallsInterceptor.ListWorkflowExecutionsInput(query, pageSize))
.getStream();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** WorkflowExecutionMetadata contains information about a workflow execution. */
public class WorkflowExecutionMetadata {
private final @Nonnull WorkflowExecutionInfo info;
private final @Nonnull DataConverter dataConverter;

WorkflowExecutionMetadata(
public WorkflowExecutionMetadata(
@Nonnull WorkflowExecutionInfo info, @Nonnull DataConverter dataConverter) {
this.info = Preconditions.checkNotNull(info, "info");
this.dataConverter = Preconditions.checkNotNull(dataConverter, "dataConverter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -75,6 +76,40 @@ public interface WorkflowClientCallsInterceptor {

DescribeWorkflowOutput describe(DescribeWorkflowInput input);

ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input);

final class ListWorkflowExecutionsInput {
private final String query;
private final Integer pageSize;

public ListWorkflowExecutionsInput(@Nullable String query, @Nullable Integer pageSize) {
this.query = query;
this.pageSize = pageSize;
}

@Nullable
public String getQuery() {
return query;
}

@Nullable
public Integer getPageSize() {
return pageSize;
}
}

final class ListWorkflowExecutionsOutput {
private final Stream<WorkflowExecutionMetadata> stream;

public ListWorkflowExecutionsOutput(Stream<WorkflowExecutionMetadata> stream) {
this.stream = stream;
}

public Stream<WorkflowExecutionMetadata> getStream() {
return stream;
}
}

CountWorkflowOutput countWorkflows(CountWorkflowsInput input);

final class WorkflowStartInput {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public DescribeWorkflowOutput describe(DescribeWorkflowInput input) {
return next.describe(input);
}

@Override
public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) {
return next.listWorkflowExecutions(input);
}

@Override
public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) {
return next.countWorkflows(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData;

import com.google.common.collect.Iterators;
import io.grpc.Deadline;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -690,6 +692,29 @@ public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) {
return new CountWorkflowOutput(new WorkflowExecutionCount(resp));
}

@Override
public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) {
ListWorkflowExecutionIterator iterator =
new ListWorkflowExecutionIterator(
input.getQuery(), clientOptions.getNamespace(), input.getPageSize(), genericClient);
iterator.init();
Iterator<WorkflowExecutionMetadata> wrappedIterator =
Iterators.transform(
iterator,
info -> new WorkflowExecutionMetadata(info, clientOptions.getDataConverter()));

// IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
// impossible
// TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list
// API
// guarantees absence of duplicates
final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE;

return new ListWorkflowExecutionsOutput(
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false));
}

private static <R> R convertResultPayloads(
Optional<Payloads> resultValue,
Class<R> resultClass,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.temporal.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assume.assumeTrue;

import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase;
import io.temporal.common.interceptors.WorkflowClientInterceptorBase;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestWorkflows;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Rule;
import org.junit.Test;

public class ListWorkflowExecutionsInterceptorTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflows.DoNothingNoArgsWorkflow.class)
.build();

@Test
public void listExecutions_isIntercepted() throws InterruptedException {
assumeTrue(
"Test Server doesn't support listWorkflowExecutions endpoint yet",
SDKTestWorkflowRule.useExternalService);

AtomicInteger intercepted = new AtomicInteger();
WorkflowClient workflowClient =
WorkflowClient.newInstance(
testWorkflowRule.getWorkflowServiceStubs(),
WorkflowClientOptions.newBuilder(testWorkflowRule.getWorkflowClient().getOptions())
.setInterceptors(
new WorkflowClientInterceptorBase() {
@Override
public WorkflowClientCallsInterceptor workflowClientCallsInterceptor(
WorkflowClientCallsInterceptor next) {
return new WorkflowClientCallsInterceptorBase(next) {
@Override
public ListWorkflowExecutionsOutput listWorkflowExecutions(
ListWorkflowExecutionsInput input) {
intercepted.incrementAndGet();
return super.listWorkflowExecutions(input);
}
};
}
})
.validateAndBuildWithDefaults());

WorkflowStub.fromTyped(testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class))
.start();

// Visibility API is eventually consistent
Thread.sleep(2_000);
java.util.List<WorkflowExecutionMetadata> result =
workflowClient
.listExecutions("TaskQueue='" + testWorkflowRule.getTaskQueue() + "'")
.collect(java.util.stream.Collectors.toList());
assertFalse(result.isEmpty());
assertEquals(1, intercepted.get());
}
}
Loading