Skip to content

Commit 0a69f7a

Browse files
THardy98mjameswh
authored andcommitted
feat(workflow): Propagate otel context when scheduling local activities (#1577)
(cherry picked from commit e9184bb)
1 parent e61727c commit 0a69f7a

File tree

3 files changed

+48
-1
lines changed

3 files changed

+48
-1
lines changed

packages/interceptors-opentelemetry/src/workflow/index.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
ContinueAsNewInput,
1010
DisposeInput,
1111
GetLogAttributesInput,
12+
LocalActivityInput,
1213
Next,
1314
SignalInput,
1415
SignalWorkflowInput,
@@ -104,6 +105,23 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
104105
});
105106
}
106107

108+
public async scheduleLocalActivity(
109+
input: LocalActivityInput,
110+
next: Next<WorkflowOutboundCallsInterceptor, 'scheduleLocalActivity'>
111+
): Promise<unknown> {
112+
return await instrument({
113+
tracer: this.tracer,
114+
spanName: `${SpanName.ACTIVITY_START}${SPAN_DELIMITER}${input.activityType}`,
115+
fn: async () => {
116+
const headers = await headersWithContext(input.headers);
117+
return next({
118+
...input,
119+
headers,
120+
});
121+
},
122+
});
123+
}
124+
107125
public async startChildWorkflowExecution(
108126
input: StartChildWorkflowExecutionInput,
109127
next: Next<WorkflowOutboundCallsInterceptor, 'startChildWorkflowExecution'>

packages/test/src/test-otel.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,20 @@ if (RUN_INTEGRATION_TESTS) {
258258
);
259259
t.true(signalChildWithUnblockSpan !== undefined);
260260

261+
const localActivityStartSpan = spans.find(
262+
({ name, parentSpanId }) =>
263+
name === `${SpanName.ACTIVITY_START}${SPAN_DELIMITER}echo` &&
264+
parentSpanId === parentExecuteSpan?.spanContext().spanId
265+
);
266+
t.true(localActivityStartSpan !== undefined);
267+
268+
const localActivityExecuteSpan = spans.find(
269+
({ name, parentSpanId }) =>
270+
name === `${SpanName.ACTIVITY_EXECUTE}${SPAN_DELIMITER}echo` &&
271+
parentSpanId === localActivityStartSpan?.spanContext().spanId
272+
);
273+
t.true(localActivityExecuteSpan !== undefined);
274+
261275
const activityStartedSignalSpan = spans.find(
262276
({ name, parentSpanId }) =>
263277
name === `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}activityStarted` &&

packages/test/src/workflows/smorgasbord.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
setHandler,
1313
condition,
1414
continueAsNew,
15+
proxyLocalActivities,
1516
} from '@temporalio/workflow';
1617
import * as activities from '../activities';
1718
import { signalTarget } from './signal-target';
@@ -22,6 +23,11 @@ const { fakeProgress, queryOwnWf } = proxyActivities<typeof activities>({
2223
cancellationType: ActivityCancellationType.WAIT_CANCELLATION_COMPLETED,
2324
});
2425

26+
const { echo } = proxyLocalActivities<typeof activities>({
27+
startToCloseTimeout: '1m',
28+
cancellationType: ActivityCancellationType.WAIT_CANCELLATION_COMPLETED,
29+
});
30+
2531
export const stepQuery = defineQuery<number>('step');
2632

2733
export async function smorgasbord(iteration = 0): Promise<void> {
@@ -42,11 +48,20 @@ export async function smorgasbord(iteration = 0): Promise<void> {
4248
await childWf.result();
4349
})();
4450

51+
const localActivityPromise = echo('local-activity');
52+
4553
if (iteration === 0) {
4654
CancellationScope.current().cancel();
4755
}
4856

49-
await Promise.all([activityPromise, queryActPromise, timerPromise, childWfPromise, condition(() => unblocked)]);
57+
await Promise.all([
58+
activityPromise,
59+
queryActPromise,
60+
timerPromise,
61+
childWfPromise,
62+
localActivityPromise,
63+
condition(() => unblocked),
64+
]);
5065
});
5166
} catch (e) {
5267
if (iteration !== 0 || !isCancellation(e)) {

0 commit comments

Comments
 (0)