Skip to content

Commit e8d9fda

Browse files
Add API to count workflows (#2518)
1 parent 076f981 commit e8d9fda

File tree

10 files changed

+181
-0
lines changed

10 files changed

+181
-0
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,15 @@ WorkflowStub newUntypedWorkflowStub(
240240
*/
241241
Stream<WorkflowExecutionMetadata> listExecutions(@Nullable String query);
242242

243+
/**
244+
* Count workflow executions using the Visibility API.
245+
*
246+
* @param query Temporal Visibility query, for syntax see <a
247+
* href="https://docs.temporal.io/visibility#list-filter">Visibility docs</a>
248+
* @return count result object
249+
*/
250+
WorkflowExecutionCount countWorkflows(@Nullable String query);
251+
243252
/**
244253
* Streams history events for a workflow execution for the provided {@code workflowId}.
245254
*

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,13 @@ public Stream<WorkflowExecutionMetadata> listExecutions(@Nullable String query)
241241
return listExecutions(query, null);
242242
}
243243

244+
@Override
245+
public WorkflowExecutionCount countWorkflows(@Nullable String query) {
246+
WorkflowClientCallsInterceptor.CountWorkflowsInput input =
247+
new WorkflowClientCallsInterceptor.CountWorkflowsInput(query);
248+
return workflowClientCallsInvoker.countWorkflows(input).getCount();
249+
}
250+
244251
Stream<WorkflowExecutionMetadata> listExecutions(
245252
@Nullable String query, @Nullable Integer pageSize) {
246253
ListWorkflowExecutionIterator iterator =
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.temporal.client;
2+
3+
import io.temporal.api.workflowservice.v1.CountWorkflowExecutionsResponse;
4+
import io.temporal.internal.common.SearchAttributesUtil;
5+
import java.util.List;
6+
import java.util.stream.Collectors;
7+
import javax.annotation.Nonnull;
8+
9+
/** Result of counting workflow executions. */
10+
public class WorkflowExecutionCount {
11+
/** Individual aggregation group record. */
12+
public static class AggregationGroup {
13+
private final List<List<?>> groupValues;
14+
private final long count;
15+
16+
AggregationGroup(List<io.temporal.api.common.v1.Payload> groupValues, long count) {
17+
this.groupValues =
18+
groupValues.stream().map(SearchAttributesUtil::decode).collect(Collectors.toList());
19+
this.count = count;
20+
}
21+
22+
/** Values of the group. */
23+
public List<List<?>> getGroupValues() {
24+
return groupValues;
25+
}
26+
27+
/** Count of workflows in the group. */
28+
public long getCount() {
29+
return count;
30+
}
31+
}
32+
33+
private final long count;
34+
private final List<AggregationGroup> groups;
35+
36+
public WorkflowExecutionCount(@Nonnull CountWorkflowExecutionsResponse response) {
37+
this.count = response.getCount();
38+
this.groups =
39+
response.getGroupsList().stream()
40+
.map(g -> new AggregationGroup(g.getGroupValuesList(), g.getCount()))
41+
.collect(Collectors.toList());
42+
}
43+
44+
/** Total number of workflows matching the request. */
45+
public long getCount() {
46+
return count;
47+
}
48+
49+
/** Aggregation groups returned by the service. */
50+
public List<AggregationGroup> getGroups() {
51+
return groups;
52+
}
53+
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public interface WorkflowClientCallsInterceptor {
7575

7676
DescribeWorkflowOutput describe(DescribeWorkflowInput input);
7777

78+
CountWorkflowOutput countWorkflows(CountWorkflowsInput input);
79+
7880
final class WorkflowStartInput {
7981
private final String workflowId;
8082
private final String workflowType;
@@ -602,4 +604,29 @@ public WorkflowExecutionDescription getDescription() {
602604
return description;
603605
}
604606
}
607+
608+
final class CountWorkflowsInput {
609+
private final String query;
610+
611+
public CountWorkflowsInput(@Nullable String query) {
612+
this.query = query;
613+
}
614+
615+
@Nullable
616+
public String getQuery() {
617+
return query;
618+
}
619+
}
620+
621+
final class CountWorkflowOutput {
622+
private final WorkflowExecutionCount count;
623+
624+
public CountWorkflowOutput(WorkflowExecutionCount count) {
625+
this.count = count;
626+
}
627+
628+
public WorkflowExecutionCount getCount() {
629+
return count;
630+
}
631+
}
605632
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,9 @@ public TerminateOutput terminate(TerminateInput input) {
7272
public DescribeWorkflowOutput describe(DescribeWorkflowInput input) {
7373
return next.describe(input);
7474
}
75+
76+
@Override
77+
public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) {
78+
return next.countWorkflows(input);
79+
}
7580
}

temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,17 @@ public DescribeWorkflowOutput describe(DescribeWorkflowInput input) {
676676
new WorkflowExecutionDescription(response, dataConverterWithWorkflowContext));
677677
}
678678

679+
@Override
680+
public CountWorkflowOutput countWorkflows(CountWorkflowsInput input) {
681+
CountWorkflowExecutionsRequest.Builder req =
682+
CountWorkflowExecutionsRequest.newBuilder().setNamespace(clientOptions.getNamespace());
683+
if (input.getQuery() != null) {
684+
req.setQuery(input.getQuery());
685+
}
686+
CountWorkflowExecutionsResponse resp = genericClient.countWorkflowExecutions(req.build());
687+
return new CountWorkflowOutput(new WorkflowExecutionCount(resp));
688+
}
689+
679690
private static <R> R convertResultPayloads(
680691
Optional<Payloads> resultValue,
681692
Class<R> resultClass,

temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ CompletableFuture<GetWorkflowExecutionHistoryResponse> getWorkflowExecutionHisto
4444
CompletableFuture<ListWorkflowExecutionsResponse> listWorkflowExecutionsAsync(
4545
ListWorkflowExecutionsRequest listRequest);
4646

47+
CountWorkflowExecutionsResponse countWorkflowExecutions(CountWorkflowExecutionsRequest request);
48+
4749
CreateScheduleResponse createSchedule(CreateScheduleRequest request);
4850

4951
CompletableFuture<ListSchedulesResponse> listSchedulesAsync(ListSchedulesRequest request);

temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,18 @@ public CompletableFuture<ListWorkflowExecutionsResponse> listWorkflowExecutionsA
215215
grpcRetryerOptions);
216216
}
217217

218+
@Override
219+
public CountWorkflowExecutionsResponse countWorkflowExecutions(
220+
CountWorkflowExecutionsRequest request) {
221+
return grpcRetryer.retryWithResult(
222+
() ->
223+
service
224+
.blockingStub()
225+
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
226+
.countWorkflowExecutions(request),
227+
grpcRetryerOptions);
228+
}
229+
218230
@Override
219231
public CreateScheduleResponse createSchedule(CreateScheduleRequest request) {
220232
return grpcRetryer.retryWithResult(

temporal-sdk/src/main/java/io/temporal/internal/common/SearchAttributesUtil.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,19 @@ public static <T> List<T> decode(
120120
return (List<T>) data;
121121
}
122122

123+
@Nullable
124+
public static List<?> decode(Payload payload) {
125+
List<?> data = converter.decode(payload);
126+
if (data.size() == 0) {
127+
// User code should observe the empty collection as non-existent search attribute, because
128+
// it's effectively the same.
129+
// We use an empty collection for "unset". See:
130+
// https://github.com/temporalio/temporal/issues/561
131+
return null;
132+
}
133+
return data;
134+
}
135+
123136
@SuppressWarnings("unchecked")
124137
@Nullable
125138
public static <T> List<T> decodeAsType(
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.temporal.client;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assume.assumeTrue;
5+
6+
import io.temporal.testing.internal.SDKTestWorkflowRule;
7+
import io.temporal.workflow.shared.TestWorkflows;
8+
import org.junit.Rule;
9+
import org.junit.Test;
10+
11+
public class CountWorkflowsTest {
12+
@Rule
13+
public SDKTestWorkflowRule testWorkflowRule =
14+
SDKTestWorkflowRule.newBuilder()
15+
.setWorkflowTypes(TestWorkflows.DoNothingNoArgsWorkflow.class)
16+
.build();
17+
18+
@Test
19+
public void countWorkflowExecutions_returnsAllExecutions() throws InterruptedException {
20+
assumeTrue(
21+
"Test Server doesn't support countWorkflowExecutions endpoint yet",
22+
SDKTestWorkflowRule.useExternalService);
23+
24+
final int EXECUTIONS_COUNT = 5;
25+
26+
for (int i = 0; i < EXECUTIONS_COUNT; i++) {
27+
WorkflowStub.fromTyped(testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class))
28+
.start();
29+
}
30+
31+
// Visibility API may be eventual consistent
32+
Thread.sleep(4_000);
33+
34+
String queryString =
35+
"TaskQueue='" + testWorkflowRule.getTaskQueue() + "' GROUP BY ExecutionStatus";
36+
WorkflowExecutionCount count = testWorkflowRule.getWorkflowClient().countWorkflows(queryString);
37+
assertEquals(EXECUTIONS_COUNT, count.getCount());
38+
assertEquals(1, count.getGroups().size());
39+
assertEquals(5, count.getGroups().get(0).getCount());
40+
assertEquals("Completed", count.getGroups().get(0).getGroupValues().get(0).get(0));
41+
}
42+
}

0 commit comments

Comments
 (0)