Skip to content

Commit 0e58687

Browse files
Add getMetricsScope interceptor (#2224)
Add getMetricsScope interceptor
1 parent 97322ec commit 0e58687

File tree

7 files changed

+95
-1
lines changed

7 files changed

+95
-1
lines changed

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package io.temporal.common.interceptors;
2222

23+
import com.uber.m3.tally.Scope;
2324
import io.temporal.activity.ActivityOptions;
2425
import io.temporal.activity.LocalActivityOptions;
2526
import io.temporal.api.common.v1.WorkflowExecution;
@@ -621,6 +622,9 @@ <R> R mutableSideEffect(
621622

622623
void upsertMemo(Map<String, Object> memo);
623624

625+
/** Intercepts call to get the metric scope in a workflow. */
626+
Scope getMetricsScope();
627+
624628
/**
625629
* Intercepts creation of a workflow child thread.
626630
*

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package io.temporal.common.interceptors;
2222

23+
import com.uber.m3.tally.Scope;
2324
import io.temporal.common.SearchAttributeUpdate;
2425
import io.temporal.workflow.Functions.Func;
2526
import io.temporal.workflow.Promise;
@@ -167,6 +168,11 @@ public void upsertMemo(Map<String, Object> memo) {
167168
next.upsertMemo(memo);
168169
}
169170

171+
@Override
172+
public Scope getMetricsScope() {
173+
return next.getMetricsScope();
174+
}
175+
170176
@Override
171177
public Object newChildThread(Runnable runnable, boolean detached, String name) {
172178
return next.newChildThread(runnable, detached, name);

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,7 @@ public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
12101210
return new CancelWorkflowOutput(result);
12111211
}
12121212

1213+
@Override
12131214
public Scope getMetricsScope() {
12141215
return replayContext.getMetricsScope();
12151216
}

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ public static Optional<UpdateInfo> getCurrentUpdateInfo() {
661661
}
662662

663663
public static Scope getMetricsScope() {
664-
return getRootWorkflowContext().getMetricsScope();
664+
return getWorkflowOutboundInterceptor().getMetricsScope();
665665
}
666666

667667
private static boolean isLoggingEnabledInReplay() {

temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import io.temporal.workflow.shared.TestWorkflows.ReceiveSignalObjectWorkflow;
6262
import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnString;
6363
import java.time.Duration;
64+
import java.util.Collections;
6465
import java.util.HashMap;
6566
import java.util.LinkedHashMap;
6667
import java.util.Map;
@@ -77,6 +78,7 @@ public class MetricsTest {
7778

7879
private static final long REPORTING_FLUSH_TIME = 600;
7980
private static final String TASK_QUEUE = "metrics_test";
81+
private static final String TEST_TAG = "test_tag";
8082
private TestWorkflowEnvironment testEnvironment;
8183
private TestStatsReporter reporter;
8284

@@ -243,6 +245,44 @@ public void testWorkflowMetrics() throws InterruptedException {
243245
reporter.assertCounter(TEMPORAL_REQUEST, workflowTaskCompletionTags, 4);
244246
}
245247

248+
@Test
249+
public void testWorkflowMetricsInterceptor() throws InterruptedException {
250+
setUp(
251+
WorkerFactoryOptions.getDefaultInstance().toBuilder()
252+
.setWorkerInterceptors(new WorkerInterceptor())
253+
.build());
254+
255+
Worker worker = testEnvironment.newWorker(TASK_QUEUE);
256+
worker.registerWorkflowImplementationTypes(
257+
TestCustomMetricsInWorkflow.class, TestMetricsInChildWorkflow.class);
258+
worker.registerActivitiesImplementations(new TestActivityImpl());
259+
testEnvironment.start();
260+
261+
WorkflowClient workflowClient = testEnvironment.getWorkflowClient();
262+
WorkflowOptions options =
263+
WorkflowOptions.newBuilder()
264+
.setWorkflowRunTimeout(Duration.ofSeconds(1000))
265+
.setTaskQueue(TASK_QUEUE)
266+
.build();
267+
NoArgsWorkflow workflow = workflowClient.newWorkflowStub(NoArgsWorkflow.class, options);
268+
workflow.execute();
269+
270+
Thread.sleep(REPORTING_FLUSH_TIME);
271+
272+
Map<String, String> workflowTags = new LinkedHashMap<>(TAGS_TASK_QUEUE);
273+
// Assert the interceptor added the extra tag
274+
workflowTags.put(TEST_TAG, NAMESPACE);
275+
276+
workflowTags.put(MetricsTag.WORKFLOW_TYPE, "NoArgsWorkflow");
277+
reporter.assertCounter("test_started", workflowTags, 1);
278+
reporter.assertCounter("test_done", workflowTags, 1);
279+
280+
workflowTags.put(MetricsTag.WORKFLOW_TYPE, "TestChildWorkflow");
281+
reporter.assertCounter("test_child_started", workflowTags, 1);
282+
reporter.assertCounter("test_child_done", workflowTags, 1);
283+
reporter.assertTimerMinDuration("test_timer", workflowTags, Duration.ofSeconds(3));
284+
}
285+
246286
@Test
247287
public void testCorruptedSignalMetrics() throws InterruptedException {
248288
setUp(
@@ -600,4 +640,33 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
600640
overrideArgs.apply(args)));
601641
}
602642
}
643+
644+
private static class WorkerInterceptor extends WorkerInterceptorBase {
645+
@Override
646+
public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) {
647+
return new WorkflowInboundCallsInterceptorBase(next) {
648+
@Override
649+
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
650+
next.init(new OutboundCallsInterceptor(outboundCalls));
651+
}
652+
};
653+
}
654+
}
655+
656+
private static class OutboundCallsInterceptor extends WorkflowOutboundCallsInterceptorBase {
657+
WorkflowOutboundCallsInterceptor next;
658+
659+
public OutboundCallsInterceptor(WorkflowOutboundCallsInterceptor next) {
660+
super(next);
661+
this.next = next;
662+
}
663+
664+
@Override
665+
public Scope getMetricsScope() {
666+
return next.getMetricsScope()
667+
.tagged(
668+
Collections.singletonMap(
669+
TEST_TAG, String.valueOf(Workflow.getInfo().getNamespace())));
670+
}
671+
}
603672
}

temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,11 @@ public void upsertMemo(Map<String, Object> memo) {
481481
throw new UnsupportedOperationException("not implemented");
482482
}
483483

484+
@Override
485+
public Scope getMetricsScope() {
486+
throw new UnsupportedOperationException("not implemented");
487+
}
488+
484489
@Override
485490
public Object newChildThread(Runnable runnable, boolean detached, String name) {
486491
throw new UnsupportedOperationException("not implemented");

temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.junit.Assert.assertNotNull;
2424
import static org.junit.Assert.assertTrue;
2525

26+
import com.uber.m3.tally.Scope;
2627
import io.temporal.activity.ActivityExecutionContext;
2728
import io.temporal.client.ActivityCompletionException;
2829
import io.temporal.common.SearchAttributeUpdate;
@@ -398,6 +399,14 @@ public void upsertMemo(Map<String, Object> memo) {
398399
next.upsertMemo(memo);
399400
}
400401

402+
@Override
403+
public Scope getMetricsScope() {
404+
if (!WorkflowUnsafe.isReplaying()) {
405+
trace.add("getMetricsScope");
406+
}
407+
return next.getMetricsScope();
408+
}
409+
401410
@Override
402411
public Object newChildThread(Runnable runnable, boolean detached, String name) {
403412
if (!WorkflowUnsafe.isReplaying()) {

0 commit comments

Comments
 (0)