Skip to content

[controller] integrate OpenTelemetry into Controller & add log compaction metrics #1665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 92 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
1df9788
ControllerMetricEntity enum class
Apr 1, 2025
1f9509a
[WIP] setup VeniceMetricsRepositoru for controller
Apr 1, 2025
52bc641
MetricsRepository -> VeniceMetricRepository in VeniceController and V…
Apr 18, 2025
635eb84
import otel in controller build.gradle
Apr 18, 2025
80c61f6
[wip] LogCompactionServiceStats.java
Apr 18, 2025
5f4c377
[WIP] Attributes builder
Apr 23, 2025
f178b0f
`LogCompactionService` basic constructor
Apr 25, 2025
f24ff9b
add JOB_RUN_STATUS dimension
Apr 25, 2025
972c64f
add LOG_COMPACTION_SELECTION_REASON dimension
Apr 25, 2025
5b5dab1
add VENICE_CONTROLLER_ENDPOINT dimension
Apr 26, 2025
1e16d89
REPUSH_STORE_ENDPOINT_CALL_COUNT in `ControllerMetricEntity`
May 7, 2025
bf91c0c
rebase: LogCompactionService scoped to cluster level
May 7, 2025
e64f1a0
rename LogCompactionServiceStats -> LogCompactionStats
May 7, 2025
888d88a
rebase
May 8, 2025
9ff8279
LogCompactionStats extends AbstractVeniceStats
May 8, 2025
f81819c
delete ControllerEndpoints.java
May 9, 2025
64addcc
add MetricsRepository to VeniceParentHelixAdmin constructor
May 9, 2025
5edbd05
adding `REPUSH_STORE_ENDPOINT_CALL_COUNT` metric
May 13, 2025
f7954c6
move repushStore endpoint to parent controller
May 16, 2025
5831ea2
add Optional<MetricsRepository> in AbstractTestVeniceParentHelixAdmin…
May 16, 2025
2680725
add metricName in ControllerMetricEntity to streamline access to metr…
May 16, 2025
99b6c64
LogCompactionStatsTest
May 16, 2025
1b1390b
nit delete testing print statement
May 20, 2025
47f633d
pass in `LogCompactionStats` to `CompactionManager`
May 20, 2025
f178b17
add STORE_NOMINATED_FOR_SCHEDULED_COMPACTION metric entity
May 23, 2025
b4bd0ca
add `STORE_NOMINATED_FOR_SCHEDULED_COMPACTION` to `LogCompactionStats`
May 23, 2025
59edfac
cleanup after rebase
May 23, 2025
43768ba
add createGuage()
May 23, 2025
d78489d
nit format
May 23, 2025
4cd1f6d
revert mistake
May 23, 2025
18ea64a
merge conflict
May 23, 2025
a4b95d7
add storeCurrentVersionNumber as metric dimension
May 24, 2025
9f6937b
emit storeNominatedForScheduledCompactionMetric for store nomination …
May 24, 2025
d782e36
[log] log for log compaction service is enabled
Jun 10, 2025
064b8f4
spotbug fix
Jun 12, 2025
7638d6a
[pr] nixed static vars
Jun 12, 2025
ce508bd
[pr] nixed static vars
Jun 12, 2025
ed80feb
[pr] use VeniceConcurrentHashMap
Jun 12, 2025
86319fa
[pr] rename JobRunStatus to ExecutionStatus
Jun 12, 2025
f0aef24
[nit] add Ms suffix in `timeSinceLastLogCompactionThresholdMs` for cl…
Jun 13, 2025
c2e2b7c
add JOB_RUN_STATUS dimension
Apr 25, 2025
bed8c45
rebase LogCompactionStatsTest
May 16, 2025
bc2d446
pass in `LogCompactionStats` to `CompactionManager`
May 20, 2025
37585d7
cleanup after rebase
May 23, 2025
cfc6441
[PR] rename `dimensions.ExecutionStatus` to `dimensions.VeniceExecuti…
Jun 13, 2025
28ad1b1
[PR] use `VeniceExecutionStatus` rather than `VeniceResponseStatusCat…
Jun 13, 2025
734bc55
[PR] putIfAbsent
Jun 13, 2025
681490b
fix for test for new dimensions & metric type
Jun 13, 2025
88e0712
fix test for new RepushJobResponse constructor
Jun 13, 2025
115e335
fix test for new validateLongPointData()
Jun 13, 2025
e333a8e
fix test for new metric name
Jun 13, 2025
4a31333
fix test for new metric name
Jun 13, 2025
fc125bb
fix test for new metric name
Jun 13, 2025
4c3ade9
fix test for new metric name
Jun 13, 2025
36d0aba
gauge
Jun 13, 2025
85a074e
build LongGauge
Jun 14, 2025
2b73206
spotbug propertiesList NPE
Jun 16, 2025
5a03db6
spotbug
Jun 16, 2025
eec3cc7
add log message to bookened log compaction cycles
Jun 17, 2025
6a65910
nit: gaugeMap typo
Jun 17, 2025
66b3e24
fix controller tests: init MetricsRepository rather than mock in test
Jun 17, 2025
b43200d
correct metric emmited for when store is repushed + cont'd refactor f…
Jun 18, 2025
79dc650
delete LogCompactionSelectionReason
Jun 23, 2025
ad619e6
reuse VeniceController final class fields
Jun 23, 2025
ba7de6f
use VeniceResponseStatusCategory instead of VeniceExecutionStatus.jav…
Jun 23, 2025
a3918a3
LongGuage -> DoubleGauge
Jun 23, 2025
56f6b1b
nix VENICE_STORE_VERSION metric dimension + cleanup VENICE_EXECUTION_…
Jun 23, 2025
ff8d84b
gh check: NPE statsMap
Jun 23, 2025
8620696
delete store version from store_nominated_for_scheduled_compaction me…
Jun 24, 2025
d3a703c
[log] Log compaction service start/stop logs with cluster name
Jun 24, 2025
a7d47d0
[log] add [log-compaction] tag to all log compaction related logs for…
Jun 24, 2025
6b09d13
[log] add [log-compaction] tag to LOGGER
Jun 24, 2025
7bef75a
nit: nix duplicate log
Jun 24, 2025
e022227
nix exception printing (automatically printed)
Jun 24, 2025
07b01d2
remove TODO.
Jun 24, 2025
d60585f
add two metrics: `STORE_NOMINATED_FOR_COMPACTION_COUNT` and `STORE_CO…
Jun 24, 2025
0415067
gh check: fix VeniceMetricsRepository init in test
Jun 26, 2025
bc4c4e7
`STORE_NOMINATION_TO_COMPACTION_COMPLETE_DURATION` reset metric to 0 …
Jun 27, 2025
0509b31
[pr] pass in just metricEntity to getFullMetricName() rather than dup…
Jun 27, 2025
1b18714
[pr] gauge metric type description
Jun 27, 2025
2b3bc6e
[pr] ModuleMetricEntityInterface.getUniqueMetricEntities
Jun 27, 2025
5201afe
[pr] ternary operator for propertiesList null check
Jun 27, 2025
c6cbf3b
[pr] clean up `storeNominationToCompactionTriggeredDurationMetric` ->…
Jun 27, 2025
754340e
[pr] clean up `storeNominationToCompactionTriggeredDurationMetric` ->…
Jun 27, 2025
4ab2483
[pr] rename repush.store.trigger.source -> repush.trigger.source
Jun 27, 2025
c362436
[pr] rename `REPUSH_STORE_ENDPOINT_CALL_COUNT` -> `REPUSH_CALL_COUNT`
Jun 27, 2025
874120b
[pr] rename `STORE_NOMINATION_TO_COMPACTION_COMPLETE_DURATION` -> `CO…
Jun 27, 2025
daa3a77
[pr] rename `STORE_NOMINATION_TO_COMPACTION_COMPLETE_DURATION` -> `CO…
Jun 27, 2025
c9f85ab
[pr] `STORE_COMPACTION_TRIGGER_STATUS` records trigger attempt succes…
Jun 27, 2025
874e34f
nit: log message
Jun 27, 2025
1143b85
[PR] toggle off COMPACTION_ELIGIBLE_STATE on successful push only
Jun 27, 2025
08935c0
[PR] reuse cluster name & store name
Jun 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.DoubleGauge;
import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.LongCounter;
Expand Down Expand Up @@ -142,6 +144,7 @@ public VeniceOpenTelemetryMetricsRepository(VeniceMetricsConfig metricsConfig) {
*/
private final VeniceConcurrentHashMap<String, DoubleHistogram> histogramMap = new VeniceConcurrentHashMap<>();
private final VeniceConcurrentHashMap<String, LongCounter> counterMap = new VeniceConcurrentHashMap<>();
private final VeniceConcurrentHashMap<String, DoubleGauge> gaugeMap = new VeniceConcurrentHashMap<>();

MetricExporter getOtlpHttpMetricExporter(VeniceMetricsConfig metricsConfig) {
OtlpHttpMetricExporterBuilder exporterBuilder =
Expand Down Expand Up @@ -182,7 +185,7 @@ private void setExponentialHistogramAggregation(SdkMeterProviderBuilder builder,

for (MetricEntity metricEntity: metricsConfig.getMetricEntities()) {
if (metricEntity.getMetricType() == MetricType.HISTOGRAM) {
uniqueHistogramMetricNames.add(getFullMetricName(getMetricPrefix(metricEntity), metricEntity.getMetricName()));
uniqueHistogramMetricNames.add(getFullMetricName(metricEntity));
}
}

Expand All @@ -202,8 +205,8 @@ private void setExponentialHistogramAggregation(SdkMeterProviderBuilder builder,
}
}

String getFullMetricName(String metricPrefix, String name) {
String fullMetricName = metricPrefix + "." + name;
String getFullMetricName(MetricEntity metricEntity) {
String fullMetricName = getMetricPrefix(metricEntity) + "." + metricEntity.getMetricName();
validateMetricName(fullMetricName);
return transformMetricName(fullMetricName, getMetricFormat());
}
Expand All @@ -227,7 +230,7 @@ public DoubleHistogram createHistogram(MetricEntity metricEntity) {
return null;
}
return histogramMap.computeIfAbsent(metricEntity.getMetricName(), key -> {
String fullMetricName = getFullMetricName(getMetricPrefix(metricEntity), metricEntity.getMetricName());
String fullMetricName = getFullMetricName(metricEntity);
DoubleHistogramBuilder builder = meter.histogramBuilder(fullMetricName)
.setUnit(metricEntity.getUnit().name())
.setDescription(metricEntity.getDescription());
Expand All @@ -244,14 +247,27 @@ public LongCounter createCounter(MetricEntity metricEntity) {
return null;
}
return counterMap.computeIfAbsent(metricEntity.getMetricName(), key -> {
String fullMetricName = getFullMetricName(getMetricPrefix(metricEntity), metricEntity.getMetricName());
String fullMetricName = getFullMetricName(metricEntity);
LongCounterBuilder builder = meter.counterBuilder(fullMetricName)
.setUnit(metricEntity.getUnit().name())
.setDescription(metricEntity.getDescription());
return builder.build();
});
}

public DoubleGauge createGuage(MetricEntity metricEntity) {
if (!emitOpenTelemetryMetrics()) {
return null;
}
return gaugeMap.computeIfAbsent(metricEntity.getMetricName(), key -> {
String fullMetricName = getFullMetricName(metricEntity);
DoubleGaugeBuilder builder = meter.gaugeBuilder(fullMetricName)
.setUnit(metricEntity.getUnit().name())
.setDescription(metricEntity.getDescription());
return builder.build();
});
}

public Object createInstrument(MetricEntity metricEntity) {
MetricType metricType = metricEntity.getMetricType();
switch (metricType) {
Expand All @@ -261,6 +277,8 @@ public Object createInstrument(MetricEntity metricEntity) {

case COUNTER:
return createCounter(metricEntity);
case GAUGE:
return createGuage(metricEntity);

default:
throw new VeniceException("Unknown metric type: " + metricType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.venice.stats.dimensions;

public enum RepushStoreTriggerSource implements VeniceDimensionInterface {
MANUAL, SCHEDULED;

private final String triggerSource;

RepushStoreTriggerSource() {
this.triggerSource = name().toLowerCase();
}

/**
* All the instances of this Enum should have the same dimension name.
* Refer {@link VeniceDimensionInterface#getDimensionName()} for more details.
*/
@Override
public VeniceMetricsDimensions getDimensionName() {
return VeniceMetricsDimensions.REPUSH_TRIGGER_SOURCE;
}

@Override
public String getDimensionValue() {
return this.triggerSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ public enum VeniceMetricsDimensions {
VENICE_MESSAGE_TYPE("venice.message.type"),

/** {@link RequestRetryAbortReason} */
VENICE_REQUEST_RETRY_ABORT_REASON("venice.request.retry_abort_reason");
VENICE_REQUEST_RETRY_ABORT_REASON("venice.request.retry_abort_reason"),

/** {@link RepushStoreTriggerSource} */
REPUSH_TRIGGER_SOURCE("repush.trigger.source");

private final String[] dimensionName = new String[VeniceOpenTelemetryMetricNamingFormat.SIZE];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleGauge;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import io.tehuti.metrics.MeasurableStat;
Expand Down Expand Up @@ -106,7 +107,9 @@ public void recordOtelMetric(double value, Attributes attributes) {
case COUNTER:
((LongCounter) otelMetric).add((long) value, attributes);
break;

case GAUGE:
((DoubleGauge) otelMetric).set((long) value, attributes);
break;
default:
throw new IllegalArgumentException("Unsupported metric type: " + metricType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ public enum MetricType {
/**
* For Counter: A simple counter that can be added to.
*/
COUNTER;
COUNTER,

/**
* For Gauge: Emits the absolute value of the metric value.
*/
GAUGE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.venice.stats.metrics.MetricType;
import com.linkedin.venice.stats.metrics.MetricUnit;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleGauge;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
Expand Down Expand Up @@ -111,7 +112,13 @@ public void testValidateMetricNameWithInvalidName() {
@Test
public void testTransformMetricName() {
when(mockMetricsConfig.getMetricNamingFormat()).thenReturn(SNAKE_CASE);
assertEquals(metricsRepository.getFullMetricName("prefix", "metric_name"), "prefix.metric_name");
MetricEntity metricEntity = MetricEntity.createInternalMetricEntityWithoutDimensions(
"metric_name",
MetricType.COUNTER,
MetricUnit.NUMBER,
"Test metric",
"prefix");
assertEquals(metricsRepository.getFullMetricName(metricEntity), "prefix.metric_name");

String transformedName =
transformMetricName("test.test_metric_name", VeniceOpenTelemetryMetricNamingFormat.PASCAL_CASE);
Expand Down Expand Up @@ -169,6 +176,12 @@ public void testCreateAndRecordMetricsForAllMetricTypes() {
"Instrument should be a LongCounter for metric type: " + metricType);
metricEntityState.recordOtelMetric(value, attributes);
break;
case GAUGE:
assertTrue(
instrument instanceof DoubleGauge,
"Instrument should be a DoubleGauge for metric type: " + metricType);
metricEntityState.recordOtelMetric(value, attributes);
break;
default:
fail("Unsupported metric type: " + metricType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ public void testGetDimensionNameInSnakeCase() {
case VENICE_REQUEST_RETRY_TYPE:
assertEquals(dimension.getDimensionName(format), "venice.request.retry_type");
break;
case VENICE_MESSAGE_TYPE:
assertEquals(dimension.getDimensionName(format), "venice.message.type");
break;
case VENICE_REQUEST_RETRY_ABORT_REASON:
assertEquals(dimension.getDimensionName(format), "venice.request.retry_abort_reason");
break;
case VENICE_MESSAGE_TYPE:
assertEquals(dimension.getDimensionName(format), "venice.message.type");
case REPUSH_TRIGGER_SOURCE:
assertEquals(dimension.getDimensionName(format), "repush.trigger.source");
break;
default:
throw new IllegalArgumentException("Unknown dimension: " + dimension);
Expand Down Expand Up @@ -78,6 +81,9 @@ public void testGetDimensionNameInCamelCase() {
case VENICE_MESSAGE_TYPE:
assertEquals(dimension.getDimensionName(format), "venice.message.type");
break;
case REPUSH_TRIGGER_SOURCE:
assertEquals(dimension.getDimensionName(format), "repush.trigger.source");
break;
default:
throw new IllegalArgumentException("Unknown dimension: " + dimension);
}
Expand Down Expand Up @@ -116,6 +122,9 @@ public void testGetDimensionNameInPascalCase() {
case VENICE_MESSAGE_TYPE:
assertEquals(dimension.getDimensionName(format), "Venice.Message.Type");
break;
case REPUSH_TRIGGER_SOURCE:
assertEquals(dimension.getDimensionName(format), "Repush.Trigger.Source");
break;
default:
throw new IllegalArgumentException("Unknown dimension: " + dimension);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer;
import com.linkedin.venice.stats.TehutiUtils;
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.TestUtils;
Expand Down Expand Up @@ -354,7 +354,12 @@ static StatefulServiceProvider<VeniceControllerWrapper> generateService(VeniceCo
}

D2Client d2Client = D2TestUtils.getAndStartD2Client(options.getZkAddress());
MetricsRepository metricsRepository = TehutiUtils.getMetricsRepository(D2_SERVICE_NAME);
MetricsRepository metricsRepository = VeniceMetricsRepository.getVeniceMetricsRepository(
D2_SERVICE_NAME,
VeniceController.CONTROLLER_SERVICE_METRIC_PREFIX,
VeniceController.CONTROLLER_SERVICE_METRIC_ENTITIES,
propertiesList.get(0).getAsMap() // TODO repush otel: not sure if properties is accessed this way
);

Optional<ClientConfig> consumerClientConfig = Optional.empty();
Object clientConfig = options.getExtraProperties().get(VeniceServerWrapper.CLIENT_CONFIG_FOR_CONSUMER);
Expand Down
3 changes: 3 additions & 0 deletions services/venice-controller/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {

implementation project(':internal:venice-common')
implementation project(':clients:venice-thin-client')
implementation project(':internal:venice-client-common')

implementation libraries.commonsIo
implementation libraries.fastUtil
Expand All @@ -43,11 +44,13 @@ dependencies {
implementation libraries.spark
// It's necessary to pull in the most recent version of zkclient explicitly, otherwise Helix won't have it...
implementation libraries.zkclient
implementation libraries.opentelemetryApi

testImplementation project(':services:venice-router')
testImplementation libraries.avroUtilFastserde
testImplementation libraries.kafkaClientsTest // TODO: Get rid of Kafka dependency in venice-common (used by TopicCreator)
testImplementation project(':internal:venice-test-common')
testImplementation libraries.openTelemetryTestSdk
}

jar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService;
import com.linkedin.venice.controller.logcompaction.CompactionManager;
import com.linkedin.venice.controller.repush.RepushJobRequest;
import com.linkedin.venice.controller.stats.LogCompactionStats;
import com.linkedin.venice.controllerapi.NodeReplicasReadinessState;
import com.linkedin.venice.controllerapi.RepushInfo;
import com.linkedin.venice.controllerapi.RepushJobResponse;
Expand Down Expand Up @@ -48,6 +49,7 @@
import com.linkedin.venice.utils.LogContext;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.writer.VeniceWriterFactory;
import java.io.Closeable;
import java.io.IOException;
Expand All @@ -72,6 +74,7 @@ class OfflinePushStatusInfo {
private String statusDetails;
private Map<String, String> extraDetails;
private List<UncompletedPartition> uncompletedPartitions;
final Map<String, LogCompactionStats> logCompactionStatsMap = new VeniceConcurrentHashMap<>();

/** N.B.: Test-only constructor ): */
public OfflinePushStatusInfo(ExecutionStatus executionStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ public HelixVeniceClusterResources(
}

if (config.isParent() && config.isLogCompactionSchedulingEnabled()) {
this.logCompactionService = new LogCompactionService(admin, clusterName, config);
LOGGER.info("Log compaction service is enabled for cluster: {}", clusterName);
this.logCompactionService = new LogCompactionService(admin, clusterName, config, metricsRepository);
} else {
this.logCompactionService = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.venice.controller.server.AdminSparkServer;
import com.linkedin.venice.controller.server.VeniceControllerGrpcServiceImpl;
import com.linkedin.venice.controller.server.VeniceControllerRequestHandler;
import com.linkedin.venice.controller.stats.ControllerMetricEntity;
import com.linkedin.venice.controller.stats.DeferredVersionSwapStats;
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.controller.supersetschema.SupersetSchemaGenerator;
Expand All @@ -35,6 +36,8 @@
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.servicediscovery.AsyncRetryingServiceDiscoveryAnnouncer;
import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer;
import com.linkedin.venice.stats.metrics.MetricEntity;
import com.linkedin.venice.stats.metrics.ModuleMetricEntityInterface;
import com.linkedin.venice.system.store.ControllerClientBackedSystemSchemaInitializer;
import com.linkedin.venice.utils.LogContext;
import com.linkedin.venice.utils.PropertyBuilder;
Expand All @@ -46,6 +49,7 @@
import io.grpc.ServerInterceptor;
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand All @@ -60,7 +64,10 @@
public class VeniceController {
private static final Logger LOGGER = LogManager.getLogger(VeniceController.class);
private static final String CONTROLLER_GRPC_SERVER_THREAD_NAME = "ControllerGrpcServer";
static final String CONTROLLER_SERVICE_NAME = "venice-controller";
public static final String CONTROLLER_SERVICE_NAME = "venice-controller";
public static final String CONTROLLER_SERVICE_METRIC_PREFIX = "controller";
public static final Collection<MetricEntity> CONTROLLER_SERVICE_METRIC_ENTITIES =
ModuleMetricEntityInterface.getUniqueMetricEntities(ControllerMetricEntity.class);

// services
private final VeniceControllerService controllerService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.controller.VeniceController.CONTROLLER_SERVICE_METRIC_ENTITIES;
import static com.linkedin.venice.controller.VeniceController.CONTROLLER_SERVICE_METRIC_PREFIX;
import static com.linkedin.venice.controller.VeniceController.CONTROLLER_SERVICE_NAME;

import com.linkedin.d2.balancer.D2Client;
Expand All @@ -10,7 +12,7 @@
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer;
import com.linkedin.venice.stats.TehutiUtils;
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.util.Collections;
Expand Down Expand Up @@ -154,7 +156,16 @@ public Builder setPubSubClientsFactory(PubSubClientsFactory pubSubClientsFactory

private void addDefaultValues() {
if (metricsRepository == null && !isMetricsRepositorySet) {
metricsRepository = TehutiUtils.getMetricsRepository(CONTROLLER_SERVICE_NAME);

metricsRepository = VeniceMetricsRepository.getVeniceMetricsRepository(
CONTROLLER_SERVICE_NAME,
CONTROLLER_SERVICE_METRIC_PREFIX,
CONTROLLER_SERVICE_METRIC_ENTITIES,
(propertiesList == null || propertiesList.isEmpty())
? new VeniceProperties().getAsMap()
: propertiesList.get(0).getAsMap());
// TODO OTel: today, this gets the properties of the first cluster. This is not ideal.
// We need to figure out how to build a common controller-specific config/properties list
}
if (serviceDiscoveryAnnouncers == null && !isServiceDiscoveryAnnouncerSet) {
serviceDiscoveryAnnouncers = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public VeniceControllerService(
externalSupersetSchemaGenerator,
pubSubTopicRepository,
initRoutineForPushJobDetailsSystemStore,
initRoutineForHeartbeatSystemStore);
initRoutineForHeartbeatSystemStore,
metricsRepository);
LOGGER.info("Controller works as a parent controller.");
} else {
this.admin = internalAdmin;
Expand Down
Loading