Skip to content

Commit 5bd97da

Browse files
committed
[temporalio#1962] Add newCallbackExecutor to interceptors and override for OpenTracing
This change adds newCallbackExecutor to the OutboundInterceptor interface, overriding the OpenTracing behavior. This allows the OpenTracing interceptor to transfer any active trace/span from the point of creation to the point of deferred execution, thus preserving the span within the callback. Without this, spans become disjoint across Promise.thenCompose, due to the nature of the deferring the execution of the .thenCompose to a callback thread. Fixes temporalio#1962 Signed-off-by: Greg Haskins <greg@manetu.com>
1 parent cf06131 commit 5bd97da

File tree

8 files changed

+353
-9
lines changed

8 files changed

+353
-9
lines changed

temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java

Lines changed: 149 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,141 @@
2727
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
2828
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptorBase;
2929
import io.temporal.opentracing.OpenTracingOptions;
30+
import io.temporal.workflow.Functions;
31+
import io.temporal.workflow.Promise;
3032
import io.temporal.workflow.Workflow;
3133
import io.temporal.workflow.WorkflowInfo;
3234
import io.temporal.workflow.unsafe.WorkflowUnsafe;
35+
import java.util.concurrent.Executor;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.TimeoutException;
3338

3439
public class OpenTracingWorkflowOutboundCallsInterceptor
3540
extends WorkflowOutboundCallsInterceptorBase {
3641
private final SpanFactory spanFactory;
3742
private final Tracer tracer;
3843
private final ContextAccessor contextAccessor;
3944

45+
private class SpanTransferringExecutor implements Executor {
46+
private final Executor passthrough;
47+
private final Span capturedSpan;
48+
49+
public SpanTransferringExecutor(Executor passthrough) {
50+
this.passthrough = passthrough;
51+
capturedSpan = tracer.scopeManager().activeSpan();
52+
}
53+
54+
public void execute(Runnable r) {
55+
Span activeSpan = tracer.scopeManager().activeSpan();
56+
57+
if (activeSpan == null && capturedSpan != null) {
58+
// if there is no activeSpan AND we captured a span during construction,
59+
// we should transfer it to the calling context as the new active span
60+
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
61+
passthrough.execute(r);
62+
}
63+
} else {
64+
passthrough.execute(r);
65+
}
66+
}
67+
}
68+
69+
private class PromiseWrapper<R> implements Promise<R> {
70+
private final Span capturedSpan;
71+
private final Promise<R> delegate;
72+
73+
PromiseWrapper(Span capturedSpan, Promise<R> delegate) {
74+
this.capturedSpan = capturedSpan;
75+
this.delegate = delegate;
76+
}
77+
78+
@Override
79+
public boolean isCompleted() {
80+
return delegate.isCompleted();
81+
}
82+
83+
@Override
84+
public R get() {
85+
return delegate.get();
86+
}
87+
88+
@Override
89+
public R cancellableGet() {
90+
return delegate.cancellableGet();
91+
}
92+
93+
@Override
94+
public R get(long timeout, TimeUnit unit) throws TimeoutException {
95+
return delegate.get(timeout, unit);
96+
}
97+
98+
@Override
99+
public R cancellableGet(long timeout, TimeUnit unit) throws TimeoutException {
100+
return delegate.cancellableGet(timeout, unit);
101+
}
102+
103+
@Override
104+
public RuntimeException getFailure() {
105+
return delegate.getFailure();
106+
}
107+
108+
@Override
109+
public <U> Promise<U> thenApply(Functions.Func1<? super R, ? extends U> fn) {
110+
return delegate.thenApply(
111+
(r) -> {
112+
if (capturedSpan != null) {
113+
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
114+
return fn.apply(r);
115+
}
116+
} else {
117+
return fn.apply(r);
118+
}
119+
});
120+
}
121+
122+
@Override
123+
public <U> Promise<U> handle(Functions.Func2<? super R, RuntimeException, ? extends U> fn) {
124+
return delegate.handle(
125+
(r, e) -> {
126+
if (capturedSpan != null) {
127+
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
128+
return fn.apply(r, e);
129+
}
130+
} else {
131+
return fn.apply(r, e);
132+
}
133+
});
134+
}
135+
136+
@Override
137+
public <U> Promise<U> thenCompose(Functions.Func1<? super R, ? extends Promise<U>> fn) {
138+
return delegate.thenCompose(
139+
(R r) -> {
140+
if (capturedSpan != null) {
141+
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
142+
return fn.apply(r);
143+
}
144+
} else {
145+
return fn.apply(r);
146+
}
147+
});
148+
}
149+
150+
@Override
151+
public Promise<R> exceptionally(Functions.Func1<Throwable, ? extends R> fn) {
152+
return delegate.exceptionally(
153+
(Throwable t) -> {
154+
if (capturedSpan != null) {
155+
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
156+
return fn.apply(t);
157+
}
158+
} else {
159+
return fn.apply(t);
160+
}
161+
});
162+
}
163+
}
164+
40165
public OpenTracingWorkflowOutboundCallsInterceptor(
41166
WorkflowOutboundCallsInterceptor next,
42167
OpenTracingOptions options,
@@ -51,13 +176,16 @@ public OpenTracingWorkflowOutboundCallsInterceptor(
51176
@Override
52177
public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
53178
if (!WorkflowUnsafe.isReplaying()) {
179+
Span capturedSpan = tracer.scopeManager().activeSpan();
54180
Span activityStartSpan =
55181
contextAccessor.writeSpanContextToHeader(
56182
() -> createActivityStartSpanBuilder(input.getActivityName()).start(),
57183
input.getHeader(),
58184
tracer);
59185
try (Scope ignored = tracer.scopeManager().activate(activityStartSpan)) {
60-
return super.executeActivity(input);
186+
ActivityOutput<R> output = super.executeActivity(input);
187+
return new ActivityOutput<>(
188+
output.getActivityId(), new PromiseWrapper<>(capturedSpan, output.getResult()));
61189
} finally {
62190
activityStartSpan.finish();
63191
}
@@ -69,13 +197,15 @@ public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
69197
@Override
70198
public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
71199
if (!WorkflowUnsafe.isReplaying()) {
200+
Span capturedSpan = tracer.scopeManager().activeSpan();
72201
Span activityStartSpan =
73202
contextAccessor.writeSpanContextToHeader(
74203
() -> createActivityStartSpanBuilder(input.getActivityName()).start(),
75204
input.getHeader(),
76205
tracer);
77206
try (Scope ignored = tracer.scopeManager().activate(activityStartSpan)) {
78-
return super.executeLocalActivity(input);
207+
LocalActivityOutput<R> output = super.executeLocalActivity(input);
208+
return new LocalActivityOutput<>(new PromiseWrapper<>(capturedSpan, output.getResult()));
79209
} finally {
80210
activityStartSpan.finish();
81211
}
@@ -87,11 +217,14 @@ public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> inp
87217
@Override
88218
public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
89219
if (!WorkflowUnsafe.isReplaying()) {
220+
Span capturedSpan = tracer.scopeManager().activeSpan();
90221
Span childWorkflowStartSpan =
91222
contextAccessor.writeSpanContextToHeader(
92223
() -> createChildWorkflowStartSpanBuilder(input).start(), input.getHeader(), tracer);
93224
try (Scope ignored = tracer.scopeManager().activate(childWorkflowStartSpan)) {
94-
return super.executeChildWorkflow(input);
225+
ChildWorkflowOutput<R> output = super.executeChildWorkflow(input);
226+
return new ChildWorkflowOutput<>(
227+
new PromiseWrapper<>(capturedSpan, output.getResult()), output.getWorkflowExecution());
95228
} finally {
96229
childWorkflowStartSpan.finish();
97230
}
@@ -104,13 +237,16 @@ public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> inp
104237
public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
105238
ExecuteNexusOperationInput<R> input) {
106239
if (!WorkflowUnsafe.isReplaying()) {
240+
Span capturedSpan = tracer.scopeManager().activeSpan();
107241
Span nexusOperationExecuteSpan =
108242
contextAccessor.writeSpanContextToHeader(
109243
() -> createStartNexusOperationSpanBuilder(input).start(),
110244
input.getHeaders(),
111245
tracer);
112246
try (Scope ignored = tracer.scopeManager().activate(nexusOperationExecuteSpan)) {
113-
return super.executeNexusOperation(input);
247+
ExecuteNexusOperationOutput<R> output = super.executeNexusOperation(input);
248+
return new ExecuteNexusOperationOutput<>(
249+
new PromiseWrapper<>(capturedSpan, output.getResult()), output.getOperationExecution());
114250
} finally {
115251
nexusOperationExecuteSpan.finish();
116252
}
@@ -122,6 +258,7 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
122258
@Override
123259
public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
124260
if (!WorkflowUnsafe.isReplaying()) {
261+
Span capturedSpan = tracer.scopeManager().activeSpan();
125262
WorkflowInfo workflowInfo = Workflow.getInfo();
126263
Span childWorkflowStartSpan =
127264
contextAccessor.writeSpanContextToHeader(
@@ -136,7 +273,8 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
136273
input.getHeader(),
137274
tracer);
138275
try (Scope ignored = tracer.scopeManager().activate(childWorkflowStartSpan)) {
139-
return super.signalExternalWorkflow(input);
276+
SignalExternalOutput output = super.signalExternalWorkflow(input);
277+
return new SignalExternalOutput(new PromiseWrapper<>(capturedSpan, output.getResult()));
140278
} finally {
141279
childWorkflowStartSpan.finish();
142280
}
@@ -178,6 +316,12 @@ public Object newChildThread(Runnable runnable, boolean detached, String name) {
178316
return super.newChildThread(wrappedRunnable, detached, name);
179317
}
180318

319+
@Override
320+
public Executor newCallbackExecutor() {
321+
Executor passthrough = super.newCallbackExecutor();
322+
return new SpanTransferringExecutor(passthrough);
323+
}
324+
181325
private Tracer.SpanBuilder createActivityStartSpanBuilder(String activityName) {
182326
WorkflowInfo workflowInfo = Workflow.getInfo();
183327
return spanFactory.createActivityStartSpan(
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.opentracing;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.opentracing.Scope;
26+
import io.opentracing.Span;
27+
import io.opentracing.mock.MockSpan;
28+
import io.opentracing.mock.MockTracer;
29+
import io.opentracing.util.ThreadLocalScopeManager;
30+
import io.temporal.activity.ActivityInterface;
31+
import io.temporal.activity.ActivityMethod;
32+
import io.temporal.activity.ActivityOptions;
33+
import io.temporal.client.WorkflowClient;
34+
import io.temporal.client.WorkflowClientOptions;
35+
import io.temporal.client.WorkflowOptions;
36+
import io.temporal.testing.internal.SDKTestWorkflowRule;
37+
import io.temporal.worker.WorkerFactoryOptions;
38+
import io.temporal.workflow.*;
39+
import java.time.Duration;
40+
import org.junit.After;
41+
import org.junit.Rule;
42+
import org.junit.Test;
43+
44+
public class CallbackContextTest {
45+
46+
private static final MockTracer mockTracer =
47+
new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP);
48+
49+
private final OpenTracingOptions OT_OPTIONS =
50+
OpenTracingOptions.newBuilder().setTracer(mockTracer).build();
51+
52+
@Rule
53+
public SDKTestWorkflowRule testWorkflowRule =
54+
SDKTestWorkflowRule.newBuilder()
55+
.setWorkflowClientOptions(
56+
WorkflowClientOptions.newBuilder()
57+
.setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS))
58+
.validateAndBuildWithDefaults())
59+
.setWorkerFactoryOptions(
60+
WorkerFactoryOptions.newBuilder()
61+
.setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS))
62+
.validateAndBuildWithDefaults())
63+
.setWorkflowTypes(WorkflowImpl.class)
64+
.setActivityImplementations(new ActivityImpl())
65+
.build();
66+
67+
@After
68+
public void tearDown() {
69+
mockTracer.reset();
70+
}
71+
72+
@ActivityInterface
73+
public interface TestActivity {
74+
@ActivityMethod
75+
boolean activity();
76+
}
77+
78+
@WorkflowInterface
79+
public interface TestWorkflow {
80+
@WorkflowMethod
81+
String workflow();
82+
}
83+
84+
public static class ActivityImpl implements TestActivity {
85+
@Override
86+
public boolean activity() {
87+
try {
88+
Thread.sleep(1000);
89+
} catch (InterruptedException e) {
90+
throw new RuntimeException(e);
91+
}
92+
return true;
93+
}
94+
}
95+
96+
public static class WorkflowImpl implements TestWorkflow {
97+
private final TestActivity activity =
98+
Workflow.newActivityStub(
99+
TestActivity.class,
100+
ActivityOptions.newBuilder()
101+
.setStartToCloseTimeout(Duration.ofMinutes(1))
102+
.validateAndBuildWithDefaults());
103+
104+
@Override
105+
public String workflow() {
106+
String workflowSpanId = mockTracer.activeSpan().context().toSpanId();
107+
return Async.function(activity::activity)
108+
.thenCompose(
109+
(r) -> {
110+
Span activeSpan = mockTracer.activeSpan();
111+
112+
if (activeSpan != null) {
113+
String promiseSpanId = activeSpan.context().toSpanId();
114+
if (promiseSpanId.equals(workflowSpanId)) {
115+
return Workflow.newPromise(activeSpan.context().toTraceId());
116+
} else {
117+
return Workflow.newPromise("bad-span");
118+
}
119+
} else {
120+
return Workflow.newPromise("not-found");
121+
}
122+
})
123+
.get();
124+
}
125+
}
126+
127+
@Test
128+
public void testCallbackContext() {
129+
MockSpan span = mockTracer.buildSpan("ClientFunction").start();
130+
131+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
132+
try (Scope scope = mockTracer.scopeManager().activate(span)) {
133+
TestWorkflow workflow =
134+
client.newWorkflowStub(
135+
TestWorkflow.class,
136+
WorkflowOptions.newBuilder()
137+
.setTaskQueue(testWorkflowRule.getTaskQueue())
138+
.validateBuildWithDefaults());
139+
assertEquals(span.context().toTraceId(), workflow.workflow());
140+
} finally {
141+
span.finish();
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)