Skip to content

Commit 0013675

Browse files
authored
Add ScheduleClientInterceptor APIs (fixes #2048) (#2050)
Signed-off-by: Dan O'Reilly <oreilldf@gmail.com>
1 parent ed211fa commit 0013675

File tree

6 files changed

+344
-4
lines changed

6 files changed

+344
-4
lines changed

temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientImpl.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424

2525
import com.uber.m3.tally.Scope;
2626
import io.temporal.common.interceptors.ScheduleClientCallsInterceptor;
27+
import io.temporal.common.interceptors.ScheduleClientInterceptor;
2728
import io.temporal.internal.WorkflowThreadMarker;
2829
import io.temporal.internal.client.RootScheduleClientInvoker;
2930
import io.temporal.internal.client.external.GenericWorkflowClient;
3031
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
3132
import io.temporal.serviceclient.MetricsTag;
3233
import io.temporal.serviceclient.WorkflowServiceStubs;
34+
import java.util.List;
3335
import java.util.stream.Stream;
3436
import javax.annotation.Nullable;
3537

@@ -39,6 +41,7 @@ final class ScheduleClientImpl implements ScheduleClient {
3941
private final GenericWorkflowClient genericClient;
4042
private final Scope metricsScope;
4143
private final ScheduleClientCallsInterceptor scheduleClientCallsInvoker;
44+
private final List<ScheduleClientInterceptor> interceptors;
4245

4346
/**
4447
* Creates client that connects to an instance of the Temporal Service. Cannot be used from within
@@ -65,7 +68,18 @@ public static ScheduleClient newInstance(
6568
.getMetricsScope()
6669
.tagged(MetricsTag.defaultTags(options.getNamespace()));
6770
this.genericClient = new GenericWorkflowClientImpl(workflowServiceStubs, metricsScope);
68-
this.scheduleClientCallsInvoker = new RootScheduleClientInvoker(genericClient, options);
71+
this.interceptors = options.getInterceptors();
72+
this.scheduleClientCallsInvoker = initializeClientInvoker();
73+
}
74+
75+
private ScheduleClientCallsInterceptor initializeClientInvoker() {
76+
ScheduleClientCallsInterceptor scheduleClientInvoker =
77+
new RootScheduleClientInvoker(genericClient, options);
78+
for (ScheduleClientInterceptor clientInterceptor : interceptors) {
79+
scheduleClientInvoker =
80+
clientInterceptor.scheduleClientCallsInterceptor(scheduleClientInvoker);
81+
}
82+
return scheduleClientInvoker;
6983
}
7084

7185
@Override

temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientOptions.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.temporal.common.context.ContextPropagator;
2424
import io.temporal.common.converter.DataConverter;
2525
import io.temporal.common.converter.GlobalDataConverter;
26+
import io.temporal.common.interceptors.ScheduleClientInterceptor;
2627
import java.lang.management.ManagementFactory;
2728
import java.util.Collections;
2829
import java.util.List;
@@ -56,11 +57,14 @@ public static final class Builder {
5657
private static final String DEFAULT_NAMESPACE = "default";
5758
private static final List<ContextPropagator> EMPTY_CONTEXT_PROPAGATORS =
5859
Collections.emptyList();
60+
private static final List<ScheduleClientInterceptor> EMPTY_INTERCEPTORS =
61+
Collections.emptyList();
5962

6063
private String namespace;
6164
private DataConverter dataConverter;
6265
private String identity;
6366
private List<ContextPropagator> contextPropagators;
67+
private List<ScheduleClientInterceptor> interceptors;
6468

6569
private Builder() {}
6670

@@ -72,6 +76,7 @@ private Builder(ScheduleClientOptions options) {
7276
dataConverter = options.dataConverter;
7377
identity = options.identity;
7478
contextPropagators = options.contextPropagators;
79+
interceptors = options.interceptors;
7580
}
7681

7782
/** Set the namespace this client will operate on. */
@@ -106,30 +111,44 @@ public Builder setContextPropagators(List<ContextPropagator> contextPropagators)
106111
return this;
107112
}
108113

114+
/**
115+
* Set the interceptors for this client.
116+
*
117+
* @param interceptors specifies the list of interceptors to use with the client.
118+
*/
119+
public Builder setInterceptors(List<ScheduleClientInterceptor> interceptors) {
120+
this.interceptors = interceptors;
121+
return this;
122+
}
123+
109124
public ScheduleClientOptions build() {
110125
String name = identity == null ? ManagementFactory.getRuntimeMXBean().getName() : identity;
111126
return new ScheduleClientOptions(
112127
namespace == null ? DEFAULT_NAMESPACE : namespace,
113128
dataConverter == null ? GlobalDataConverter.get() : dataConverter,
114129
name,
115-
contextPropagators == null ? EMPTY_CONTEXT_PROPAGATORS : contextPropagators);
130+
contextPropagators == null ? EMPTY_CONTEXT_PROPAGATORS : contextPropagators,
131+
interceptors == null ? EMPTY_INTERCEPTORS : interceptors);
116132
}
117133
}
118134

119135
private final String namespace;
120136
private final DataConverter dataConverter;
121137
private final String identity;
122138
private final List<ContextPropagator> contextPropagators;
139+
private final List<ScheduleClientInterceptor> interceptors;
123140

124141
private ScheduleClientOptions(
125142
String namespace,
126143
DataConverter dataConverter,
127144
String identity,
128-
List<ContextPropagator> contextPropagators) {
145+
List<ContextPropagator> contextPropagators,
146+
List<ScheduleClientInterceptor> interceptors) {
129147
this.namespace = namespace;
130148
this.dataConverter = dataConverter;
131149
this.identity = identity;
132150
this.contextPropagators = contextPropagators;
151+
this.interceptors = interceptors;
133152
}
134153

135154
/**
@@ -167,4 +186,13 @@ public String getIdentity() {
167186
public List<ContextPropagator> getContextPropagators() {
168187
return contextPropagators;
169188
}
189+
190+
/**
191+
* Get the interceptors of this client
192+
*
193+
* @return The list of interceptors to use with the client.
194+
*/
195+
public List<ScheduleClientInterceptor> getInterceptors() {
196+
return interceptors;
197+
}
170198
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.common.interceptors;
22+
23+
import io.temporal.client.schedules.ScheduleClient;
24+
import io.temporal.client.schedules.ScheduleHandle;
25+
import io.temporal.common.Experimental;
26+
27+
/**
28+
* Intercepts calls to the {@link ScheduleClient} and {@link ScheduleHandle} related to the
29+
* lifecycle of a Schedule.
30+
*/
31+
@Experimental
32+
public interface ScheduleClientInterceptor {
33+
34+
/**
35+
* Called once during creation of ScheduleClient to create a chain of ScheduleClient Interceptors
36+
*
37+
* @param next next schedule client interceptor in the chain of interceptors
38+
* @return new interceptor that should decorate calls to {@code next}
39+
*/
40+
ScheduleClientCallsInterceptor scheduleClientCallsInterceptor(
41+
ScheduleClientCallsInterceptor next);
42+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.common.interceptors;
22+
23+
import io.temporal.common.Experimental;
24+
25+
/** Convenience base class for ScheduleClientInterceptor implementations. */
26+
@Experimental
27+
public class ScheduleClientInterceptorBase implements ScheduleClientInterceptor {
28+
29+
@Override
30+
public ScheduleClientCallsInterceptor scheduleClientCallsInterceptor(
31+
ScheduleClientCallsInterceptor next) {
32+
return next;
33+
}
34+
}

temporal-sdk/src/test/java/io/temporal/client/schedules/ScheduleTest.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.temporal.api.enums.v1.ScheduleOverlapPolicy;
2626
import io.temporal.client.WorkflowOptions;
2727
import io.temporal.common.converter.EncodedValues;
28+
import io.temporal.common.interceptors.ScheduleClientInterceptor;
2829
import io.temporal.testing.internal.SDKTestWorkflowRule;
2930
import io.temporal.workflow.shared.TestWorkflows;
3031
import java.time.Duration;
@@ -47,13 +48,14 @@ public class ScheduleTest {
4748
.setWorkflowTypes(ScheduleTest.QuickWorkflowImpl.class)
4849
.build();
4950

50-
private ScheduleClient createScheduleClient() {
51+
private ScheduleClient createScheduleClient(ScheduleClientInterceptor... interceptors) {
5152
return new ScheduleClientImpl(
5253
testWorkflowRule.getWorkflowServiceStubs(),
5354
ScheduleClientOptions.newBuilder()
5455
.setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace())
5556
.setIdentity(testWorkflowRule.getWorkflowClient().getOptions().getIdentity())
5657
.setDataConverter(testWorkflowRule.getWorkflowClient().getOptions().getDataConverter())
58+
.setInterceptors(Arrays.asList(interceptors))
5759
.build());
5860
}
5961

@@ -544,6 +546,49 @@ public void listSchedules() {
544546
});
545547
}
546548

549+
@Test
550+
public void testInterceptors() {
551+
TracingScheduleInterceptor.FilteredTrace ft = new TracingScheduleInterceptor.FilteredTrace();
552+
String scheduleId = UUID.randomUUID().toString();
553+
TracingScheduleInterceptor interceptor = new TracingScheduleInterceptor(ft);
554+
interceptor.setExpected(
555+
"createSchedule: " + scheduleId,
556+
"listSchedules",
557+
"describeSchedule: " + scheduleId,
558+
"pauseSchedule: " + scheduleId,
559+
"unpauseSchedule: " + scheduleId,
560+
"triggerSchedule: " + scheduleId,
561+
"backfillSchedule: " + scheduleId,
562+
"describeSchedule: " + scheduleId, // Updating a schedule implicitly calls describe.
563+
"updateSchedule: " + scheduleId,
564+
"deleteSchedule: " + scheduleId);
565+
ScheduleClient client = createScheduleClient(interceptor);
566+
ScheduleHandle handle =
567+
client.createSchedule(
568+
scheduleId, createTestSchedule().build(), ScheduleOptions.newBuilder().build());
569+
try {
570+
// Add delay for schedule to appear
571+
testWorkflowRule.sleep(Duration.ofSeconds(2));
572+
// List all schedules and filter
573+
Stream<ScheduleListDescription> scheduleStream = client.listSchedules();
574+
List<ScheduleListDescription> listedSchedules =
575+
scheduleStream
576+
.filter(s -> s.getScheduleId().equals(scheduleId))
577+
.collect(Collectors.toList());
578+
Assert.assertEquals(1, listedSchedules.size());
579+
// Verify the schedule description
580+
handle.describe();
581+
handle.pause();
582+
handle.unpause();
583+
handle.trigger();
584+
handle.backfill(Arrays.asList(new ScheduleBackfill(Instant.now(), Instant.now())));
585+
handle.update(input -> new ScheduleUpdate(createTestSchedule().build()));
586+
} finally {
587+
handle.delete();
588+
}
589+
interceptor.assertExpected();
590+
}
591+
547592
public static class QuickWorkflowImpl implements TestWorkflows.TestWorkflow1 {
548593
@Override
549594
public String execute(String arg) {

0 commit comments

Comments
 (0)