Skip to content

Commit 59bbabb

Browse files
Make sure the Schedule Client has the namespace header injected (#2452)
1 parent 48b7223 commit 59bbabb

File tree

4 files changed

+31
-10
lines changed

4 files changed

+31
-10
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,7 @@
3737
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
3838
import io.temporal.common.interceptors.WorkflowClientInterceptor;
3939
import io.temporal.internal.WorkflowThreadMarker;
40-
import io.temporal.internal.client.NexusStartWorkflowRequest;
41-
import io.temporal.internal.client.RootWorkflowClientInvoker;
42-
import io.temporal.internal.client.WorkerFactoryRegistry;
43-
import io.temporal.internal.client.WorkflowClientInternal;
40+
import io.temporal.internal.client.*;
4441
import io.temporal.internal.client.external.GenericWorkflowClient;
4542
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
4643
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;

temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.temporal.common.interceptors.ScheduleClientCallsInterceptor;
2727
import io.temporal.common.interceptors.ScheduleClientInterceptor;
2828
import io.temporal.internal.WorkflowThreadMarker;
29+
import io.temporal.internal.client.NamespaceInjectWorkflowServiceStubs;
2930
import io.temporal.internal.client.RootScheduleClientInvoker;
3031
import io.temporal.internal.client.external.GenericWorkflowClient;
3132
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
@@ -60,6 +61,8 @@ public static ScheduleClient newInstance(
6061
}
6162

6263
ScheduleClientImpl(WorkflowServiceStubs workflowServiceStubs, ScheduleClientOptions options) {
64+
workflowServiceStubs =
65+
new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace());
6366
this.workflowServiceStubs = workflowServiceStubs;
6467
this.options = options;
6568
this.metricsScope =

temporal-sdk/src/main/java/io/temporal/client/NamespaceInjectWorkflowServiceStubs.java renamed to temporal-sdk/src/main/java/io/temporal/internal/client/NamespaceInjectWorkflowServiceStubs.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
* limitations under the License.
1919
*/
2020

21-
package io.temporal.client;
21+
package io.temporal.internal.client;
2222

2323
import io.grpc.ManagedChannel;
2424
import io.grpc.Metadata;
@@ -34,8 +34,8 @@
3434
import java.util.function.Supplier;
3535
import javax.annotation.Nullable;
3636

37-
/** Inject the namespace into the gRPC header */
38-
class NamespaceInjectWorkflowServiceStubs implements WorkflowServiceStubs {
37+
/** Inject the namespace into the gRPC header, overriding the current namespace if already set. */
38+
public class NamespaceInjectWorkflowServiceStubs implements WorkflowServiceStubs {
3939
private static Metadata.Key<String> TEMPORAL_NAMESPACE_HEADER_KEY =
4040
Metadata.Key.of("temporal-namespace", Metadata.ASCII_STRING_MARSHALLER);
4141
private final Metadata metadata;
@@ -56,14 +56,14 @@ public WorkflowServiceStubsOptions getOptions() {
5656
public WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub() {
5757
return next.blockingStub()
5858
.withInterceptors(
59-
new GrpcMetadataProviderInterceptor(Collections.singleton(() -> metadata)));
59+
new GrpcMetadataProviderInterceptor(Collections.singleton(() -> metadata), true));
6060
}
6161

6262
@Override
6363
public WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub() {
6464
return next.futureStub()
6565
.withInterceptors(
66-
new GrpcMetadataProviderInterceptor(Collections.singleton(() -> metadata)));
66+
new GrpcMetadataProviderInterceptor(Collections.singleton(() -> metadata), true));
6767
}
6868

6969
@Override

temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcMetadataProviderInterceptor.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,17 @@
2727

2828
public class GrpcMetadataProviderInterceptor implements ClientInterceptor {
2929
private final Collection<GrpcMetadataProvider> grpcMetadataProviders;
30+
private final boolean override;
3031

3132
public GrpcMetadataProviderInterceptor(Collection<GrpcMetadataProvider> grpcMetadataProviders) {
3233
this.grpcMetadataProviders = checkNotNull(grpcMetadataProviders, "grpcMetadataProviders");
34+
this.override = false;
35+
}
36+
37+
public GrpcMetadataProviderInterceptor(
38+
Collection<GrpcMetadataProvider> grpcMetadataProviders, boolean override) {
39+
this.grpcMetadataProviders = checkNotNull(grpcMetadataProviders, "grpcMetadataProviders");
40+
this.override = override;
3341
}
3442

3543
@Override
@@ -47,7 +55,20 @@ private final class HeaderAttachingClientCall<ReqT, RespT>
4755

4856
@Override
4957
public void start(Listener<RespT> responseListener, Metadata headers) {
50-
grpcMetadataProviders.stream().map(GrpcMetadataProvider::getMetadata).forEach(headers::merge);
58+
grpcMetadataProviders.stream()
59+
.map(GrpcMetadataProvider::getMetadata)
60+
.forEach(
61+
m -> {
62+
// If override is true, discard all existing headers with the same key
63+
// before adding the new ones. Otherwise, merge will add the new value to the
64+
// existing key.
65+
if (override) {
66+
for (String key : m.keys()) {
67+
headers.discardAll(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
68+
}
69+
}
70+
headers.merge(m);
71+
});
5172
super.start(responseListener, headers);
5273
}
5374
}

0 commit comments

Comments
 (0)