Skip to content

Commit e714fc3

Browse files
committed
[temporalio#1962] Add newCallbackExecutor to interceptors and override for OpenTracing
This change adds newCallbackExecutor to the OutboundInterceptor interface, overriding the OpenTracing behavior. This allows the OpenTracing interceptor to transfer any active trace/span from the point of creation to the point of deferred execution, thus preserving the span within the callback. Without this, spans become disjoint across Promise.thenCompose, due to the nature of the deferring the execution of the .thenCompose to a callback thread. Fixes temporalio#1962 Signed-off-by: Greg Haskins <greg@manetu.com>
1 parent cf06131 commit e714fc3

File tree

8 files changed

+225
-4
lines changed

8 files changed

+225
-4
lines changed

temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,38 @@
3030
import io.temporal.workflow.Workflow;
3131
import io.temporal.workflow.WorkflowInfo;
3232
import io.temporal.workflow.unsafe.WorkflowUnsafe;
33+
import java.util.concurrent.Executor;
3334

3435
public class OpenTracingWorkflowOutboundCallsInterceptor
3536
extends WorkflowOutboundCallsInterceptorBase {
3637
private final SpanFactory spanFactory;
3738
private final Tracer tracer;
3839
private final ContextAccessor contextAccessor;
3940

41+
private class SpanTransferringExecutor implements Executor {
42+
private final Executor passthrough;
43+
private final Span capturedSpan;
44+
45+
public SpanTransferringExecutor(Executor passthrough) {
46+
this.passthrough = passthrough;
47+
capturedSpan = tracer.scopeManager().activeSpan();
48+
}
49+
50+
public void execute(Runnable r) {
51+
Span activeSpan = tracer.scopeManager().activeSpan();
52+
53+
if (activeSpan == null && capturedSpan != null) {
54+
// if there is no activeSpan AND we captured a span during construction,
55+
// we should transfer it to the calling context as the new activespan
56+
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
57+
passthrough.execute(r);
58+
}
59+
} else {
60+
passthrough.execute(r);
61+
}
62+
}
63+
}
64+
4065
public OpenTracingWorkflowOutboundCallsInterceptor(
4166
WorkflowOutboundCallsInterceptor next,
4267
OpenTracingOptions options,
@@ -178,6 +203,12 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
178203
return super.newChildThread(wrappedRunnable, detached, name);
179204
}
180205

206+
@Override
207+
public Executor newCallbackExecutor() {
208+
Executor passthrough = super.newCallbackExecutor();
209+
return new SpanTransferringExecutor(passthrough);
210+
}
211+
181212
private Tracer.SpanBuilder createActivityStartSpanBuilder(String activityName) {
182213
WorkflowInfo workflowInfo = Workflow.getInfo();
183214
return spanFactory.createActivityStartSpan(
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.opentracing;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.opentracing.Scope;
26+
import io.opentracing.Span;
27+
import io.opentracing.mock.MockSpan;
28+
import io.opentracing.mock.MockTracer;
29+
import io.opentracing.util.ThreadLocalScopeManager;
30+
import io.temporal.activity.ActivityInterface;
31+
import io.temporal.activity.ActivityMethod;
32+
import io.temporal.activity.ActivityOptions;
33+
import io.temporal.client.WorkflowClient;
34+
import io.temporal.client.WorkflowClientOptions;
35+
import io.temporal.client.WorkflowOptions;
36+
import io.temporal.testing.internal.SDKTestWorkflowRule;
37+
import io.temporal.worker.WorkerFactoryOptions;
38+
import io.temporal.workflow.*;
39+
import java.time.Duration;
40+
import org.junit.After;
41+
import org.junit.Rule;
42+
import org.junit.Test;
43+
44+
public class CallbackContextTest {
45+
46+
private static final MockTracer mockTracer =
47+
new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP);
48+
49+
private final OpenTracingOptions OT_OPTIONS =
50+
OpenTracingOptions.newBuilder().setTracer(mockTracer).build();
51+
52+
@Rule
53+
public SDKTestWorkflowRule testWorkflowRule =
54+
SDKTestWorkflowRule.newBuilder()
55+
.setWorkflowClientOptions(
56+
WorkflowClientOptions.newBuilder()
57+
.setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS))
58+
.validateAndBuildWithDefaults())
59+
.setWorkerFactoryOptions(
60+
WorkerFactoryOptions.newBuilder()
61+
.setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS))
62+
.validateAndBuildWithDefaults())
63+
.setWorkflowTypes(WorkflowImpl.class)
64+
.setActivityImplementations(new ActivityImpl())
65+
.build();
66+
67+
@After
68+
public void tearDown() {
69+
mockTracer.reset();
70+
}
71+
72+
@ActivityInterface
73+
public interface TestActivity {
74+
@ActivityMethod
75+
boolean activity();
76+
}
77+
78+
@WorkflowInterface
79+
public interface TestWorkflow {
80+
@WorkflowMethod
81+
String workflow();
82+
}
83+
84+
public static class ActivityImpl implements TestActivity {
85+
@Override
86+
public boolean activity() {
87+
try {
88+
Thread.sleep(1000);
89+
} catch (InterruptedException e) {
90+
throw new RuntimeException(e);
91+
}
92+
return true;
93+
}
94+
}
95+
96+
public static class WorkflowImpl implements TestWorkflow {
97+
private final TestActivity activity =
98+
Workflow.newActivityStub(
99+
TestActivity.class,
100+
ActivityOptions.newBuilder()
101+
.setStartToCloseTimeout(Duration.ofMinutes(1))
102+
.validateAndBuildWithDefaults());
103+
104+
@Override
105+
public String workflow() {
106+
return Async.function(activity::activity)
107+
.thenCompose(
108+
(r) -> {
109+
Span activeSpan = mockTracer.activeSpan();
110+
return Workflow.newPromise(
111+
activeSpan != null ? activeSpan.context().toTraceId() : "not-found");
112+
})
113+
.get();
114+
}
115+
}
116+
117+
@Test
118+
public void testCallbackContext() {
119+
MockSpan span = mockTracer.buildSpan("ClientFunction").start();
120+
121+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
122+
try (Scope scope = mockTracer.scopeManager().activate(span)) {
123+
TestWorkflow workflow =
124+
client.newWorkflowStub(
125+
TestWorkflow.class,
126+
WorkflowOptions.newBuilder()
127+
.setTaskQueue(testWorkflowRule.getTaskQueue())
128+
.validateBuildWithDefaults());
129+
assertEquals(span.context().toTraceId(), workflow.workflow());
130+
} finally {
131+
span.finish();
132+
}
133+
}
134+
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.lang.reflect.Type;
3232
import java.time.Duration;
3333
import java.util.*;
34+
import java.util.concurrent.Executor;
3435
import java.util.function.BiPredicate;
3536
import java.util.function.Supplier;
3637
import java.util.stream.Collectors;
@@ -798,5 +799,17 @@ <R> R mutableSideEffect(
798799
*/
799800
Object newChildThread(Runnable runnable, boolean detached, String name);
800801

802+
/**
803+
* Intercepts the point where a new callback is being prepared for deferment and allows
804+
* interceptors to provide a wrapped execution environment for running the callback at a later
805+
* time.
806+
*
807+
* <p>The executor's execute() function _must_ fully execute the provided Runnable within the
808+
* caller's thread or determinism guarantees could be violated.
809+
*
810+
* @return created Executor
811+
*/
812+
Executor newCallbackExecutor();
813+
801814
long currentTimeMillis();
802815
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Map;
3131
import java.util.Random;
3232
import java.util.UUID;
33+
import java.util.concurrent.Executor;
3334
import java.util.function.BiPredicate;
3435
import java.util.function.Supplier;
3536

@@ -184,6 +185,11 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
184185
return next.newChildThread(runnable, detached, name);
185186
}
186187

188+
@Override
189+
public Executor newCallbackExecutor() {
190+
return next.newCallbackExecutor();
191+
}
192+
187193
@Override
188194
public long currentTimeMillis() {
189195
return next.currentTimeMillis();

temporal-sdk/src/main/java/io/temporal/internal/sync/CompletablePromiseImpl.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.time.Duration;
3030
import java.util.ArrayList;
3131
import java.util.List;
32+
import java.util.concurrent.Executor;
3233
import java.util.concurrent.TimeUnit;
3334
import java.util.concurrent.TimeoutException;
3435

@@ -40,6 +41,7 @@ class CompletablePromiseImpl<V> implements CompletablePromise<V> {
4041
private final List<Functions.Proc> handlers = new ArrayList<>();
4142
private final DeterministicRunnerImpl runner;
4243
private boolean registeredWithRunner;
44+
private Executor callbackExecutor;
4345

4446
@SuppressWarnings("unchecked")
4547
static Promise<Object> promiseAnyOf(Promise<?>[] promises) {
@@ -62,7 +64,10 @@ static <V> Promise<V> promiseAnyOf(Iterable<Promise<V>> promises) {
6264
}
6365

6466
CompletablePromiseImpl() {
65-
runner = DeterministicRunnerImpl.currentThreadInternal().getRunner();
67+
WorkflowThread workflowThread = DeterministicRunnerImpl.currentThreadInternal();
68+
runner = workflowThread.getRunner();
69+
callbackExecutor =
70+
workflowThread.getWorkflowContext().getWorkflowOutboundInterceptor().newCallbackExecutor();
6671
}
6772

6873
@Override
@@ -275,9 +280,14 @@ private <U> Promise<U> then(Functions.Proc1<CompletablePromise<U>> proc) {
275280
* @return true if there were any handlers invoked
276281
*/
277282
private boolean invokeHandlers() {
278-
for (Functions.Proc handler : handlers) {
279-
handler.apply();
280-
}
283+
// execute synchronously to this thread, but under the context established in the constructor
284+
callbackExecutor.execute(
285+
() -> {
286+
for (Functions.Proc handler : handlers) {
287+
handler.apply();
288+
}
289+
});
290+
281291
return !handlers.isEmpty();
282292
}
283293
}

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.time.Duration;
7979
import java.time.Instant;
8080
import java.util.*;
81+
import java.util.concurrent.Executor;
8182
import java.util.concurrent.atomic.AtomicBoolean;
8283
import java.util.concurrent.atomic.AtomicReference;
8384
import java.util.function.BiPredicate;
@@ -412,6 +413,12 @@ public WorkflowMetadata getWorkflowMetadata() {
412413
return workflowMetadata.build();
413414
}
414415

416+
private class DirectExecutor implements Executor {
417+
public void execute(Runnable r) {
418+
r.run();
419+
}
420+
}
421+
415422
private class ActivityCallback {
416423
private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
417424

@@ -1419,6 +1426,11 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
14191426
return runner.newWorkflowThread(runnable, detached, name);
14201427
}
14211428

1429+
@Override
1430+
public Executor newCallbackExecutor() {
1431+
return new DirectExecutor();
1432+
}
1433+
14221434
@Override
14231435
public long currentTimeMillis() {
14241436
return replayContext.currentTimeMillis();

temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,11 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
497497
throw new UnsupportedOperationException("not implemented");
498498
}
499499

500+
@Override
501+
public Executor newCallbackExecutor() {
502+
throw new UnsupportedOperationException("not implemented");
503+
}
504+
500505
@Override
501506
public long currentTimeMillis() {
502507
throw new UnsupportedOperationException("not implemented");

temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.lang.reflect.Type;
3737
import java.time.Duration;
3838
import java.util.*;
39+
import java.util.concurrent.Executor;
3940
import java.util.function.BiPredicate;
4041
import java.util.function.Supplier;
4142
import javax.annotation.Nonnull;
@@ -433,6 +434,15 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
433434
return next.newChildThread(runnable, detached, name);
434435
}
435436

437+
@Override
438+
public Executor newCallbackExecutor() {
439+
if (!WorkflowUnsafe.isReplaying()) {
440+
trace.add("newCallbackExecutor ");
441+
}
442+
443+
return next.newCallbackExecutor();
444+
}
445+
436446
@Override
437447
public long currentTimeMillis() {
438448
if (!WorkflowUnsafe.isReplaying()) {

0 commit comments

Comments
 (0)