Skip to content

Commit b7c72a2

Browse files
Add num_pollers metric (#2514)
1 parent 6c961a0 commit b7c72a2

File tree

7 files changed

+104
-0
lines changed

7 files changed

+104
-0
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest;
1212
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
1313
import io.temporal.internal.common.ProtobufTimeUtils;
14+
import io.temporal.serviceclient.MetricsTag;
1415
import io.temporal.serviceclient.WorkflowServiceStubs;
1516
import io.temporal.worker.MetricsType;
17+
import io.temporal.worker.PollerTypeMetricsTag;
1618
import io.temporal.worker.tuning.*;
1719
import java.util.Objects;
20+
import java.util.concurrent.atomic.AtomicInteger;
1821
import java.util.function.Supplier;
1922
import javax.annotation.Nonnull;
2023
import javax.annotation.Nullable;
@@ -28,6 +31,7 @@ final class ActivityPollTask implements Poller.PollTask<ActivityTask> {
2831
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
2932
private final Scope metricsScope;
3033
private final PollActivityTaskQueueRequest pollRequest;
34+
private final AtomicInteger pollGauge = new AtomicInteger();
3135

3236
@SuppressWarnings("deprecation")
3337
public ActivityPollTask(
@@ -91,6 +95,10 @@ public ActivityTask poll() {
9195
permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
9296
if (permit == null) return null;
9397

98+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
99+
.gauge(MetricsType.NUM_POLLERS)
100+
.update(pollGauge.incrementAndGet());
101+
94102
try {
95103
response =
96104
service
@@ -113,6 +121,10 @@ public ActivityTask poll() {
113121
permit,
114122
() -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
115123
} finally {
124+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
125+
.gauge(MetricsType.NUM_POLLERS)
126+
.update(pollGauge.decrementAndGet());
127+
116128
if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
117129
}
118130
}

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@
88
import io.temporal.api.taskqueue.v1.TaskQueue;
99
import io.temporal.api.workflowservice.v1.*;
1010
import io.temporal.internal.common.ProtobufTimeUtils;
11+
import io.temporal.serviceclient.MetricsTag;
1112
import io.temporal.serviceclient.WorkflowServiceStubs;
1213
import io.temporal.worker.MetricsType;
14+
import io.temporal.worker.PollerTypeMetricsTag;
1315
import io.temporal.worker.tuning.*;
1416
import java.util.Objects;
17+
import java.util.concurrent.atomic.AtomicInteger;
1518
import java.util.function.Supplier;
1619
import javax.annotation.Nonnull;
1720
import javax.annotation.Nullable;
@@ -25,6 +28,7 @@ final class NexusPollTask implements Poller.PollTask<NexusTask> {
2528
private final TrackingSlotSupplier<NexusSlotInfo> slotSupplier;
2629
private final Scope metricsScope;
2730
private final PollNexusTaskQueueRequest pollRequest;
31+
private final AtomicInteger pollGauge = new AtomicInteger();
2832

2933
@SuppressWarnings("deprecation")
3034
public NexusPollTask(
@@ -81,6 +85,10 @@ public NexusTask poll() {
8185
permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
8286
if (permit == null) return null;
8387

88+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK)
89+
.gauge(MetricsType.NUM_POLLERS)
90+
.update(pollGauge.incrementAndGet());
91+
8492
try {
8593
response =
8694
service
@@ -106,6 +114,10 @@ public NexusTask poll() {
106114
permit,
107115
() -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
108116
} finally {
117+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK)
118+
.gauge(MetricsType.NUM_POLLERS)
119+
.update(pollGauge.decrementAndGet());
120+
109121
if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
110122
}
111123
}

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
import io.temporal.serviceclient.MetricsTag;
1616
import io.temporal.serviceclient.WorkflowServiceStubs;
1717
import io.temporal.worker.MetricsType;
18+
import io.temporal.worker.PollerTypeMetricsTag;
1819
import io.temporal.worker.tuning.SlotPermit;
1920
import io.temporal.worker.tuning.SlotReleaseReason;
2021
import io.temporal.worker.tuning.SlotSupplierFuture;
2122
import io.temporal.worker.tuning.WorkflowSlotInfo;
2223
import java.util.Objects;
24+
import java.util.concurrent.atomic.AtomicInteger;
2325
import java.util.function.Supplier;
2426
import javax.annotation.Nonnull;
2527
import javax.annotation.Nullable;
@@ -36,6 +38,8 @@ final class WorkflowPollTask implements Poller.PollTask<WorkflowTask> {
3638
private final WorkflowServiceGrpc.WorkflowServiceBlockingStub serviceStub;
3739
private final PollWorkflowTaskQueueRequest pollRequest;
3840
private final PollWorkflowTaskQueueRequest stickyPollRequest;
41+
private final AtomicInteger normalPollGauge = new AtomicInteger();
42+
private final AtomicInteger stickyPollGauge = new AtomicInteger();
3943

4044
@SuppressWarnings("deprecation")
4145
public WorkflowPollTask(
@@ -131,6 +135,16 @@ public WorkflowTask poll() {
131135
Scope scope = isSticky ? stickyMetricsScope : metricsScope;
132136

133137
log.trace("poll request begin: {}", request);
138+
if (isSticky) {
139+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_STICKY_TASK)
140+
.gauge(MetricsType.NUM_POLLERS)
141+
.update(stickyPollGauge.incrementAndGet());
142+
} else {
143+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_TASK)
144+
.gauge(MetricsType.NUM_POLLERS)
145+
.update(normalPollGauge.incrementAndGet());
146+
}
147+
134148
try {
135149
PollWorkflowTaskQueueResponse response = doPoll(request, scope);
136150
if (response == null) {
@@ -141,6 +155,17 @@ public WorkflowTask poll() {
141155
slotSupplier.markSlotUsed(new WorkflowSlotInfo(response, pollRequest), permit);
142156
return new WorkflowTask(response, (rr) -> slotSupplier.releaseSlot(rr, permit));
143157
} finally {
158+
159+
if (isSticky) {
160+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_STICKY_TASK)
161+
.gauge(MetricsType.NUM_POLLERS)
162+
.update(stickyPollGauge.decrementAndGet());
163+
} else {
164+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_TASK)
165+
.gauge(MetricsType.NUM_POLLERS)
166+
.update(normalPollGauge.decrementAndGet());
167+
}
168+
144169
if (!isSuccessful) {
145170
slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
146171
stickyQueueBalancer.finishPoll(taskQueueKind, 0);

temporal-sdk/src/main/java/io/temporal/worker/MetricsType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ private MetricsType() {}
120120
public static final String WORKER_START_COUNTER = TEMPORAL_METRICS_PREFIX + "worker_start";
121121
public static final String POLLER_START_COUNTER = TEMPORAL_METRICS_PREFIX + "poller_start";
122122
// gauge
123+
public static final String NUM_POLLERS = TEMPORAL_METRICS_PREFIX + "num_pollers";
124+
123125
public static final String WORKER_TASK_SLOTS_AVAILABLE =
124126
TEMPORAL_METRICS_PREFIX + "worker_task_slots_available";
125127

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.temporal.worker;
2+
3+
import io.temporal.serviceclient.MetricsTag;
4+
5+
public class PollerTypeMetricsTag {
6+
public enum PollerType implements MetricsTag.TagValue {
7+
WORKFLOW_TASK("workflow_task"),
8+
WORKFLOW_STICKY_TASK("workflow_sticky_task"),
9+
ACTIVITY_TASK("activity_task"),
10+
NEXUS_TASK("nexus_task"),
11+
;
12+
13+
PollerType(String value) {
14+
this.value = value;
15+
}
16+
17+
private final String value;
18+
19+
@Override
20+
public String getTag() {
21+
return MetricsTag.POLLER_TYPE;
22+
}
23+
24+
public String getValue() {
25+
return value;
26+
}
27+
}
28+
}

temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.temporal.testing.TestEnvironmentOptions;
3232
import io.temporal.testing.TestWorkflowEnvironment;
3333
import io.temporal.testing.internal.SDKTestWorkflowRule;
34+
import io.temporal.worker.PollerTypeMetricsTag;
3435
import io.temporal.worker.Worker;
3536
import io.temporal.worker.WorkerFactoryOptions;
3637
import io.temporal.worker.WorkerMetricsTag;
@@ -89,6 +90,26 @@ public class MetricsTest {
8990
.put(MetricsTag.WORKER_TYPE, WorkerMetricsTag.WorkerType.WORKFLOW_WORKER.getValue())
9091
.build();
9192

93+
private static final Map<String, String> TAGS_WORKFLOW_NORMAL_POLLER =
94+
new ImmutableMap.Builder<String, String>()
95+
.putAll(TAGS_WORKFLOW_WORKER)
96+
.put(MetricsTag.POLLER_TYPE, PollerTypeMetricsTag.PollerType.WORKFLOW_TASK.getValue())
97+
.build();
98+
99+
private static final Map<String, String> TAGS_WORKFLOW_STICKY_POLLER =
100+
new ImmutableMap.Builder<String, String>()
101+
.putAll(TAGS_WORKFLOW_WORKER)
102+
.put(
103+
MetricsTag.POLLER_TYPE,
104+
PollerTypeMetricsTag.PollerType.WORKFLOW_STICKY_TASK.getValue())
105+
.build();
106+
107+
private static final Map<String, String> TAGS_ACTIVITY_POLLER =
108+
new ImmutableMap.Builder<String, String>()
109+
.putAll(TAGS_ACTIVITY_WORKER)
110+
.put(MetricsTag.POLLER_TYPE, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK.getValue())
111+
.build();
112+
92113
@Rule
93114
public TestWatcher watchman =
94115
new TestWatcher() {
@@ -150,7 +171,10 @@ public void testWorkerMetrics() throws InterruptedException {
150171
reporter.assertCounter("temporal_worker_start", TAGS_LOCAL_ACTIVITY_WORKER, 1);
151172

152173
reporter.assertCounter("temporal_poller_start", TAGS_WORKFLOW_WORKER, 5);
174+
reporter.assertGauge("temporal_num_pollers", TAGS_WORKFLOW_NORMAL_POLLER, 2);
175+
reporter.assertGauge("temporal_num_pollers", TAGS_WORKFLOW_STICKY_POLLER, 3);
153176
reporter.assertCounter("temporal_poller_start", TAGS_ACTIVITY_WORKER, 5);
177+
reporter.assertGauge("temporal_num_pollers", TAGS_ACTIVITY_POLLER, 5);
154178
}
155179

156180
@Test

temporal-serviceclient/src/main/java/io/temporal/serviceclient/MetricsTag.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class MetricsTag {
2424
public static final String EXCEPTION = "exception";
2525
public static final String OPERATION_NAME = "operation";
2626
public static final String TASK_FAILURE_TYPE = "failure_reason";
27+
public static final String POLLER_TYPE = "poller_type";
2728

2829
/** Used to pass metrics scope to the interceptor */
2930
public static final CallOptions.Key<Scope> METRICS_TAGS_CALL_OPTIONS_KEY =

0 commit comments

Comments
 (0)