Skip to content

Commit 3362703

Browse files
committed
[temporalio#1962] Add newCallbackExecutor to interceptors and override for OpenTracing
This change adds newCallbackExecutor to the OutboundInterceptor interface, overriding the OpenTracing behavior. This allows the OpenTracing interceptor to transfer any active trace/span from the point of creation to the point of deferred execution, thus preserving the span within the callback. Without this, spans become disjoint across Promise.thenCompose, due to the nature of the deferring the execution of the .thenCompose to a callback thread. Fixes temporalio#1962 Signed-off-by: Greg Haskins <greg@manetu.com>
1 parent cf06131 commit 3362703

File tree

8 files changed

+205
-4
lines changed

8 files changed

+205
-4
lines changed

temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,34 @@
3030
import io.temporal.workflow.Workflow;
3131
import io.temporal.workflow.WorkflowInfo;
3232
import io.temporal.workflow.unsafe.WorkflowUnsafe;
33+
import java.util.concurrent.Executor;
3334

3435
public class OpenTracingWorkflowOutboundCallsInterceptor
3536
extends WorkflowOutboundCallsInterceptorBase {
3637
private final SpanFactory spanFactory;
3738
private final Tracer tracer;
3839
private final ContextAccessor contextAccessor;
3940

41+
private class SpanTransferringExecutor implements Executor {
42+
private final Executor passthrough;
43+
private final Span activeSpan;
44+
45+
public SpanTransferringExecutor(Executor passthrough) {
46+
this.passthrough = passthrough;
47+
activeSpan = tracer.scopeManager().activeSpan();
48+
}
49+
50+
public void execute(Runnable r) {
51+
if (activeSpan != null) {
52+
try (Scope ignored = tracer.scopeManager().activate(activeSpan)) {
53+
passthrough.execute(r);
54+
}
55+
} else {
56+
passthrough.execute(r);
57+
}
58+
}
59+
}
60+
4061
public OpenTracingWorkflowOutboundCallsInterceptor(
4162
WorkflowOutboundCallsInterceptor next,
4263
OpenTracingOptions options,
@@ -178,6 +199,12 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
178199
return super.newChildThread(wrappedRunnable, detached, name);
179200
}
180201

202+
@Override
203+
public Executor newCallbackExecutor() {
204+
Executor passthrough = super.newCallbackExecutor();
205+
return new SpanTransferringExecutor(passthrough);
206+
}
207+
181208
private Tracer.SpanBuilder createActivityStartSpanBuilder(String activityName) {
182209
WorkflowInfo workflowInfo = Workflow.getInfo();
183210
return spanFactory.createActivityStartSpan(
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.opentracing;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.opentracing.Scope;
26+
import io.opentracing.Span;
27+
import io.opentracing.mock.MockSpan;
28+
import io.opentracing.mock.MockTracer;
29+
import io.opentracing.util.ThreadLocalScopeManager;
30+
import io.temporal.activity.ActivityInterface;
31+
import io.temporal.activity.ActivityMethod;
32+
import io.temporal.activity.ActivityOptions;
33+
import io.temporal.client.WorkflowClient;
34+
import io.temporal.client.WorkflowClientOptions;
35+
import io.temporal.client.WorkflowOptions;
36+
import io.temporal.testing.internal.SDKTestWorkflowRule;
37+
import io.temporal.worker.WorkerFactoryOptions;
38+
import io.temporal.workflow.*;
39+
import java.time.Duration;
40+
import org.junit.After;
41+
import org.junit.Rule;
42+
import org.junit.Test;
43+
44+
public class CallbackContextTest {
45+
46+
private static final MockTracer mockTracer =
47+
new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP);
48+
49+
private final OpenTracingOptions OT_OPTIONS =
50+
OpenTracingOptions.newBuilder().setTracer(mockTracer).build();
51+
52+
@Rule
53+
public SDKTestWorkflowRule testWorkflowRule =
54+
SDKTestWorkflowRule.newBuilder()
55+
.setWorkflowClientOptions(
56+
WorkflowClientOptions.newBuilder()
57+
.setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS))
58+
.validateAndBuildWithDefaults())
59+
.setWorkerFactoryOptions(
60+
WorkerFactoryOptions.newBuilder()
61+
.setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS))
62+
.validateAndBuildWithDefaults())
63+
.setWorkflowTypes(WorkflowImpl.class)
64+
.setActivityImplementations(new ActivityImpl())
65+
.build();
66+
67+
@After
68+
public void tearDown() {
69+
mockTracer.reset();
70+
}
71+
72+
@ActivityInterface
73+
public interface TestActivity {
74+
@ActivityMethod
75+
boolean activity();
76+
}
77+
78+
@WorkflowInterface
79+
public interface TestWorkflow {
80+
@WorkflowMethod
81+
String workflow(String input);
82+
}
83+
84+
public static class ActivityImpl implements TestActivity {
85+
@Override
86+
public boolean activity() {
87+
try {
88+
Thread.sleep(1000);
89+
} catch (InterruptedException e) {
90+
throw new RuntimeException(e);
91+
}
92+
return true;
93+
}
94+
}
95+
96+
public static class WorkflowImpl implements TestWorkflow {
97+
private final TestActivity activity =
98+
Workflow.newActivityStub(
99+
TestActivity.class,
100+
ActivityOptions.newBuilder()
101+
.setStartToCloseTimeout(Duration.ofMinutes(1))
102+
.validateAndBuildWithDefaults());
103+
104+
@Override
105+
public String workflow(String input) {
106+
return Async.function(activity::activity)
107+
.thenCompose(
108+
(r) -> {
109+
Span activeSpan = mockTracer.activeSpan();
110+
return Workflow.newPromise(
111+
activeSpan != null ? activeSpan.context().toTraceId() : "not-found");
112+
})
113+
.get();
114+
}
115+
}
116+
117+
@Test
118+
public void testCallbackContext() {
119+
MockSpan span = mockTracer.buildSpan("ClientFunction").start();
120+
121+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
122+
try (Scope scope = mockTracer.scopeManager().activate(span)) {
123+
TestWorkflow workflow =
124+
client.newWorkflowStub(
125+
TestWorkflow.class,
126+
WorkflowOptions.newBuilder()
127+
.setTaskQueue(testWorkflowRule.getTaskQueue())
128+
.validateBuildWithDefaults());
129+
assertEquals(span.context().toTraceId(), workflow.workflow("input"));
130+
} finally {
131+
span.finish();
132+
}
133+
}
134+
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.lang.reflect.Type;
3232
import java.time.Duration;
3333
import java.util.*;
34+
import java.util.concurrent.Executor;
3435
import java.util.function.BiPredicate;
3536
import java.util.function.Supplier;
3637
import java.util.stream.Collectors;
@@ -798,5 +799,7 @@ <R> R mutableSideEffect(
798799
*/
799800
Object newChildThread(Runnable runnable, boolean detached, String name);
800801

802+
Executor newCallbackExecutor();
803+
801804
long currentTimeMillis();
802805
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Map;
3131
import java.util.Random;
3232
import java.util.UUID;
33+
import java.util.concurrent.Executor;
3334
import java.util.function.BiPredicate;
3435
import java.util.function.Supplier;
3536

@@ -184,6 +185,11 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
184185
return next.newChildThread(runnable, detached, name);
185186
}
186187

188+
@Override
189+
public Executor newCallbackExecutor() {
190+
return next.newCallbackExecutor();
191+
}
192+
187193
@Override
188194
public long currentTimeMillis() {
189195
return next.currentTimeMillis();

temporal-sdk/src/main/java/io/temporal/internal/sync/CompletablePromiseImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.time.Duration;
3030
import java.util.ArrayList;
3131
import java.util.List;
32+
import java.util.concurrent.Executor;
3233
import java.util.concurrent.TimeUnit;
3334
import java.util.concurrent.TimeoutException;
3435

@@ -40,6 +41,7 @@ class CompletablePromiseImpl<V> implements CompletablePromise<V> {
4041
private final List<Functions.Proc> handlers = new ArrayList<>();
4142
private final DeterministicRunnerImpl runner;
4243
private boolean registeredWithRunner;
44+
private Executor executor;
4345

4446
@SuppressWarnings("unchecked")
4547
static Promise<Object> promiseAnyOf(Promise<?>[] promises) {
@@ -62,7 +64,10 @@ static <V> Promise<V> promiseAnyOf(Iterable<Promise<V>> promises) {
6264
}
6365

6466
CompletablePromiseImpl() {
65-
runner = DeterministicRunnerImpl.currentThreadInternal().getRunner();
67+
WorkflowThread workflowThread = DeterministicRunnerImpl.currentThreadInternal();
68+
runner = workflowThread.getRunner();
69+
executor =
70+
workflowThread.getWorkflowContext().getWorkflowOutboundInterceptor().newCallbackExecutor();
6671
}
6772

6873
@Override
@@ -275,9 +280,8 @@ private <U> Promise<U> then(Functions.Proc1<CompletablePromise<U>> proc) {
275280
* @return true if there were any handlers invoked
276281
*/
277282
private boolean invokeHandlers() {
278-
for (Functions.Proc handler : handlers) {
279-
handler.apply();
280-
}
283+
executor.execute(() -> handlers.forEach(Functions.Proc::apply));
284+
281285
return !handlers.isEmpty();
282286
}
283287
}

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.time.Duration;
7979
import java.time.Instant;
8080
import java.util.*;
81+
import java.util.concurrent.Executor;
8182
import java.util.concurrent.atomic.AtomicBoolean;
8283
import java.util.concurrent.atomic.AtomicReference;
8384
import java.util.function.BiPredicate;
@@ -412,6 +413,12 @@ public WorkflowMetadata getWorkflowMetadata() {
412413
return workflowMetadata.build();
413414
}
414415

416+
private class DirectExecutor implements Executor {
417+
public void execute(Runnable r) {
418+
r.run();
419+
}
420+
}
421+
415422
private class ActivityCallback {
416423
private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
417424

@@ -1419,6 +1426,11 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
14191426
return runner.newWorkflowThread(runnable, detached, name);
14201427
}
14211428

1429+
@Override
1430+
public Executor newCallbackExecutor() {
1431+
return new DirectExecutor();
1432+
}
1433+
14221434
@Override
14231435
public long currentTimeMillis() {
14241436
return replayContext.currentTimeMillis();

temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,11 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
497497
throw new UnsupportedOperationException("not implemented");
498498
}
499499

500+
@Override
501+
public Executor newCallbackExecutor() {
502+
throw new UnsupportedOperationException("not implemented");
503+
}
504+
500505
@Override
501506
public long currentTimeMillis() {
502507
throw new UnsupportedOperationException("not implemented");

temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.lang.reflect.Type;
3737
import java.time.Duration;
3838
import java.util.*;
39+
import java.util.concurrent.Executor;
3940
import java.util.function.BiPredicate;
4041
import java.util.function.Supplier;
4142
import javax.annotation.Nonnull;
@@ -433,6 +434,15 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
433434
return next.newChildThread(runnable, detached, name);
434435
}
435436

437+
@Override
438+
public Executor newCallbackExecutor() {
439+
if (!WorkflowUnsafe.isReplaying()) {
440+
trace.add("newCallbackExecutor ");
441+
}
442+
443+
return next.newCallbackExecutor();
444+
}
445+
436446
@Override
437447
public long currentTimeMillis() {
438448
if (!WorkflowUnsafe.isReplaying()) {

0 commit comments

Comments
 (0)