Skip to content

[controller] integrate OpenTelemetry into Controller & add log compaction metrics #1665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 33 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
08af724
ControllerMetricEntity enum class
Apr 1, 2025
0e02387
[WIP] setup VeniceMetricsRepositoru for controller
Apr 1, 2025
7f976b6
MetricsRepository -> VeniceMetricRepository in VeniceController and V…
Apr 18, 2025
afa726d
import otel in controller build.gradle
Apr 18, 2025
c979a9f
[wip] LogCompactionServiceStats.java
Apr 18, 2025
7655f14
[WIP] Attributes builder
Apr 23, 2025
8f3d55b
`LogCompactionService` basic constructor
Apr 25, 2025
2a86ec0
add JOB_RUN_STATUS dimension
Apr 25, 2025
6b679d4
add LOG_COMPACTION_SELECTION_REASON dimension
Apr 25, 2025
d415a71
add VENICE_CONTROLLER_ENDPOINT dimension
Apr 26, 2025
69c1743
REPUSH_STORE_ENDPOINT_CALL_COUNT in `ControllerMetricEntity`
May 7, 2025
e08f1e8
rebase: LogCompactionService scoped to cluster level
May 7, 2025
55fd989
rename LogCompactionServiceStats -> LogCompactionStats
May 7, 2025
1a39ced
rebase
May 8, 2025
301ce62
LogCompactionStats extends AbstractVeniceStats
May 8, 2025
e9f3afa
delete ControllerEndpoints.java
May 9, 2025
36d69bc
add MetricsRepository to VeniceParentHelixAdmin constructor
May 9, 2025
77eb063
adding `REPUSH_STORE_ENDPOINT_CALL_COUNT` metric
May 13, 2025
7e92b84
move repushStore endpoint to parent controller
May 16, 2025
55f6247
add Optional<MetricsRepository> in AbstractTestVeniceParentHelixAdmin…
May 16, 2025
4915107
add metricName in ControllerMetricEntity to streamline access to metr…
May 16, 2025
86e4f32
LogCompactionStatsTest
May 16, 2025
1caf418
nit delete testing print statement
May 20, 2025
80032f0
pass in `LogCompactionStats` to `CompactionManager`
May 20, 2025
6d11f0b
add STORE_NOMINATED_FOR_SCHEDULED_COMPACTION metric entity
May 23, 2025
2062312
add `STORE_NOMINATED_FOR_SCHEDULED_COMPACTION` to `LogCompactionStats`
May 23, 2025
a78dff5
cleanup after rebase
May 23, 2025
76be48e
add createGuage()
May 23, 2025
dccf750
nit format
May 23, 2025
3b918b0
revert mistake
May 23, 2025
d48b396
merge conflict
May 23, 2025
d1c0a2f
add storeCurrentVersionNumber as metric dimension
May 24, 2025
522ff6f
emit storeNominatedForScheduledCompactionMetric for store nomination …
May 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public VeniceOpenTelemetryMetricsRepository(VeniceMetricsConfig metricsConfig) {
*/
private final VeniceConcurrentHashMap<String, DoubleHistogram> histogramMap = new VeniceConcurrentHashMap<>();
private final VeniceConcurrentHashMap<String, LongCounter> counterMap = new VeniceConcurrentHashMap<>();
private final VeniceConcurrentHashMap<String, LongCounter> guageMap = new VeniceConcurrentHashMap<>();

MetricExporter getOtlpHttpMetricExporter(VeniceMetricsConfig metricsConfig) {
OtlpHttpMetricExporterBuilder exporterBuilder =
Expand Down Expand Up @@ -252,6 +253,19 @@ public LongCounter createCounter(MetricEntity metricEntity) {
});
}

public LongCounter createGuage(MetricEntity metricEntity) {
if (!emitOpenTelemetryMetrics()) {
return null;
}
return guageMap.computeIfAbsent(metricEntity.getMetricName(), key -> {
String fullMetricName = getFullMetricName(getMetricPrefix(metricEntity), metricEntity.getMetricName());
LongCounterBuilder builder = meter.counterBuilder(fullMetricName)
.setUnit(metricEntity.getUnit().name())
.setDescription(metricEntity.getDescription());
return builder.build();
});
}

public Object createInstrument(MetricEntity metricEntity) {
MetricType metricType = metricEntity.getMetricType();
switch (metricType) {
Expand All @@ -261,6 +275,8 @@ public Object createInstrument(MetricEntity metricEntity) {

case COUNTER:
return createCounter(metricEntity);
case GAUGE:
return createGuage(metricEntity);

default:
throw new VeniceException("Unknown metric type: " + metricType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.venice.stats.dimensions;

public enum JobRunStatus implements VeniceDimensionInterface {
FAILED, SUCCESS;

private final String status;

JobRunStatus() {
this.status = name().toLowerCase();
}

/**
* All the instances of this Enum should have the same dimension name.
* Refer {@link VeniceDimensionInterface#getDimensionName()} for more details.
*/
@Override
public VeniceMetricsDimensions getDimensionName() {
return VeniceMetricsDimensions.JOB_EXECUTION_STATUS;
}

@Override
public String getDimensionValue() {
return this.status;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.venice.stats.dimensions;

public enum LogCompactionSelectionReason implements VeniceDimensionInterface {
TIME_SINCE_LAST_VERSION_CREATION_EXCEEDS_THRESHOLD;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there another reason which we are thinking to add in the future?


private final String reason;

LogCompactionSelectionReason() {
this.reason = name().toLowerCase();
}

/**
* All the instances of this Enum should have the same dimension name.
* Refer {@link VeniceDimensionInterface#getDimensionName()} for more details.
*/
@Override
public VeniceMetricsDimensions getDimensionName() {
return VeniceMetricsDimensions.LOG_COMPACTION_SELECTION_REASON;
}

@Override
public String getDimensionValue() {
return this.reason;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.venice.stats.dimensions;

public enum RepushStoreTriggerSource implements VeniceDimensionInterface {
MANUAL, SCHEDULED;

private final String triggerSource;

RepushStoreTriggerSource() {
this.triggerSource = name().toLowerCase();
}

/**
* All the instances of this Enum should have the same dimension name.
* Refer {@link VeniceDimensionInterface#getDimensionName()} for more details.
*/
@Override
public VeniceMetricsDimensions getDimensionName() {
return VeniceMetricsDimensions.REPUSH_STORE_TRIGGER_SOURCE;
}

@Override
public String getDimensionValue() {
return this.triggerSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

public enum VeniceMetricsDimensions {
VENICE_STORE_NAME("venice.store.name"), VENICE_CLUSTER_NAME("venice.cluster.name"),
VENICE_STORE_VERSION("venice.store.version"),

/** {@link com.linkedin.venice.read.RequestType} */
VENICE_REQUEST_METHOD("venice.request.method"),
Expand All @@ -28,7 +29,16 @@ public enum VeniceMetricsDimensions {
VENICE_REQUEST_RETRY_TYPE("venice.request.retry_type"),

/** {@link RequestRetryAbortReason} */
VENICE_REQUEST_RETRY_ABORT_REASON("venice.request.retry_abort_reason");
VENICE_REQUEST_RETRY_ABORT_REASON("venice.request.retry_abort_reason"),

/** {@link JobRunStatus} */
JOB_EXECUTION_STATUS("job.execution.status"),

/** {@link RepushStoreTriggerSource} */
REPUSH_STORE_TRIGGER_SOURCE("repush.store.trigger.source"),

/** {@link LogCompactionSelectionReason} */
LOG_COMPACTION_SELECTION_REASON("log.compaction.selection.reason");

private final String[] dimensionName = new String[VeniceOpenTelemetryMetricNamingFormat.SIZE];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ public enum MetricType {
/**
* For Counter: A simple counter that can be added to.
*/
COUNTER;
COUNTER,

/**
* For UpDownCounter: A counter that can be added to or subtracted from. Emits the absolute value of the metric value.
*/
GAUGE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@
import com.linkedin.venice.controller.VeniceControllerContext;
import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService;
import com.linkedin.venice.controller.stats.ControllerMetricEntity;
import com.linkedin.venice.controller.supersetschema.SupersetSchemaGenerator;
import com.linkedin.venice.d2.D2Server;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer;
import com.linkedin.venice.stats.TehutiUtils;
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.stats.metrics.MetricEntity;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.TestUtils;
Expand All @@ -77,6 +79,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -100,6 +103,12 @@ public class VeniceControllerWrapper extends ProcessWrapper {
public static final String PARENT_D2_CLUSTER_NAME = "ParentControllerD2Cluster";
public static final String PARENT_D2_SERVICE_NAME = "ParentController";

public static final String CONTROLLER_SERVICE_METRIC_PREFIX = "controller";
public static final Collection<MetricEntity> CONTROLLER_SERVICE_METRIC_ENTITIES = Collections.unmodifiableList(
Arrays.stream(ControllerMetricEntity.values())
.map(ControllerMetricEntity::getMetricEntity)
.collect(Collectors.toList()));

public static final String SUPERSET_SCHEMA_GENERATOR = "SupersetSchemaGenerator";

public static final double DEFAULT_STORAGE_ENGINE_OVERHEAD_RATIO = 0.85d;
Expand Down Expand Up @@ -354,7 +363,12 @@ static StatefulServiceProvider<VeniceControllerWrapper> generateService(VeniceCo
}

D2Client d2Client = D2TestUtils.getAndStartD2Client(options.getZkAddress());
MetricsRepository metricsRepository = TehutiUtils.getMetricsRepository(D2_SERVICE_NAME);
MetricsRepository metricsRepository = VeniceMetricsRepository.getVeniceMetricsRepository(
D2_SERVICE_NAME,
CONTROLLER_SERVICE_METRIC_PREFIX,
CONTROLLER_SERVICE_METRIC_ENTITIES,
propertiesList.get(0).getAsMap() // TODO repush otel: not sure if properties is accessed this way
);

Optional<ClientConfig> consumerClientConfig = Optional.empty();
Object clientConfig = options.getExtraProperties().get(VeniceServerWrapper.CLIENT_CONFIG_FOR_CONSUMER);
Expand Down
3 changes: 3 additions & 0 deletions services/venice-controller/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {

implementation project(':internal:venice-common')
implementation project(':clients:venice-thin-client')
implementation project(':internal:venice-client-common')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not under venice-common rather than under venice-client-common. We can fix this if needed separately, no AI for you on this.


implementation libraries.commonsIo
implementation libraries.fastUtil
Expand All @@ -43,11 +44,13 @@ dependencies {
implementation libraries.spark
// It's necessary to pull in the most recent version of zkclient explicitly, otherwise Helix won't have it...
implementation libraries.zkclient
implementation libraries.opentelemetryApi

testImplementation project(':services:venice-router')
testImplementation libraries.avroUtilFastserde
testImplementation libraries.kafkaClientsTest // TODO: Get rid of Kafka dependency in venice-common (used by TopicCreator)
testImplementation project(':internal:venice-test-common')
testImplementation libraries.openTelemetryTestSdk
}

jar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public HelixVeniceClusterResources(
}

if (config.isParent() && config.isLogCompactionSchedulingEnabled()) {
this.logCompactionService = new LogCompactionService(admin, clusterName, config);
this.logCompactionService = new LogCompactionService(admin, clusterName, config, metricsRepository);
} else {
this.logCompactionService = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.venice.controller.server.AdminSparkServer;
import com.linkedin.venice.controller.server.VeniceControllerGrpcServiceImpl;
import com.linkedin.venice.controller.server.VeniceControllerRequestHandler;
import com.linkedin.venice.controller.stats.ControllerMetricEntity;
import com.linkedin.venice.controller.stats.DeferredVersionSwapStats;
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.controller.supersetschema.SupersetSchemaGenerator;
Expand All @@ -35,6 +36,7 @@
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.servicediscovery.AsyncRetryingServiceDiscoveryAnnouncer;
import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer;
import com.linkedin.venice.stats.metrics.MetricEntity;
import com.linkedin.venice.system.store.ControllerClientBackedSystemSchemaInitializer;
import com.linkedin.venice.utils.LogContext;
import com.linkedin.venice.utils.PropertyBuilder;
Expand All @@ -46,10 +48,13 @@
import io.grpc.ServerInterceptor;
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -61,6 +66,11 @@ public class VeniceController {
private static final Logger LOGGER = LogManager.getLogger(VeniceController.class);
private static final String CONTROLLER_GRPC_SERVER_THREAD_NAME = "ControllerGrpcServer";
static final String CONTROLLER_SERVICE_NAME = "venice-controller";
public static final String CONTROLLER_SERVICE_METRIC_PREFIX = "controller";
public static final Collection<MetricEntity> CONTROLLER_SERVICE_METRIC_ENTITIES = Collections.unmodifiableList(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why duplicated?

Arrays.stream(ControllerMetricEntity.values())
.map(ControllerMetricEntity::getMetricEntity)
.collect(Collectors.toList()));

// services
private final VeniceControllerService controllerService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.controller.VeniceController.CONTROLLER_SERVICE_METRIC_ENTITIES;
import static com.linkedin.venice.controller.VeniceController.CONTROLLER_SERVICE_METRIC_PREFIX;
import static com.linkedin.venice.controller.VeniceController.CONTROLLER_SERVICE_NAME;

import com.linkedin.d2.balancer.D2Client;
Expand All @@ -10,7 +12,7 @@
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer;
import com.linkedin.venice.stats.TehutiUtils;
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.util.Collections;
Expand Down Expand Up @@ -154,7 +156,13 @@ public Builder setPubSubClientsFactory(PubSubClientsFactory pubSubClientsFactory

private void addDefaultValues() {
if (metricsRepository == null && !isMetricsRepositorySet) {
metricsRepository = TehutiUtils.getMetricsRepository(CONTROLLER_SERVICE_NAME);
metricsRepository = VeniceMetricsRepository.getVeniceMetricsRepository(
CONTROLLER_SERVICE_NAME,
CONTROLLER_SERVICE_METRIC_PREFIX,
CONTROLLER_SERVICE_METRIC_ENTITIES,
propertiesList.get(0).getAsMap());
// TODO OTel: today, this gets the properties of the first cluster. This is not ideal.
// We need to figure out how to build a common controller-specific config/properties list
}
if (serviceDiscoveryAnnouncers == null && !isServiceDiscoveryAnnouncerSet) {
serviceDiscoveryAnnouncers = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public VeniceControllerService(
externalSupersetSchemaGenerator,
pubSubTopicRepository,
initRoutineForPushJobDetailsSystemStore,
initRoutineForHeartbeatSystemStore);
initRoutineForHeartbeatSystemStore,
metricsRepository);
LOGGER.info("Controller works as a parent controller.");
} else {
this.admin = internalAdmin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.linkedin.venice.PushJobCheckpoints;
import com.linkedin.venice.SSLConfig;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.annotation.VisibleForTesting;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
Expand Down Expand Up @@ -71,6 +72,7 @@
import com.linkedin.venice.controller.stats.AddVersionLatencyStats;
import com.linkedin.venice.controller.stats.DeadStoreStats;
import com.linkedin.venice.controller.stats.DisabledPartitionStats;
import com.linkedin.venice.controller.stats.LogCompactionStats;
import com.linkedin.venice.controller.stats.PushJobStatusStats;
import com.linkedin.venice.controllerapi.AdminOperationProtocolVersionControllerResponse;
import com.linkedin.venice.controllerapi.ControllerClient;
Expand Down Expand Up @@ -267,6 +269,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -627,16 +630,22 @@ public VeniceHelixAdmin(
new DataRecoveryManager(this, icProvider, pubSubTopicRepository, participantStoreClientsManager);

if (multiClusterConfigs.isLogCompactionEnabled()) {
// TODO LC: extends interchangeable with implements?
Class<? extends RepushOrchestrator> repushOrchestratorClass =
ReflectUtils.loadClass(multiClusterConfigs.getRepushOrchestratorClassName());
try {
RepushOrchestrator repushOrchestrator = ReflectUtils.callConstructor(
repushOrchestratorClass,
new Class[] { VeniceProperties.class },
new Object[] { multiClusterConfigs.getRepushOrchestratorConfigs() });
compactionManager =
new CompactionManager(repushOrchestrator, multiClusterConfigs.getTimeSinceLastLogCompactionThresholdMS());
compactionManager = new CompactionManager(
repushOrchestrator,
multiClusterConfigs.getTimeSinceLastLogCompactionThresholdMS(),
multiClusterConfigs.getClusters()
.stream()
.collect(
Collectors.toMap(
Function.identity(),
clusterName -> new LogCompactionStats(metricsRepository, clusterName))));
} catch (Exception e) {
LOGGER.error("Failed to enable " + LogCompactionService.class.getSimpleName(), e);
throw new VeniceException(e);
Expand Down Expand Up @@ -9230,8 +9239,8 @@ public boolean isClusterWipeAllowed(String clusterName) {
return multiClusterConfigs.getControllerConfig(clusterName).isClusterWipeAllowed();
}

// Visible for testing
VeniceControllerMultiClusterConfig getMultiClusterConfigs() {
@VisibleForTesting
public VeniceControllerMultiClusterConfig getMultiClusterConfigs() {
return multiClusterConfigs;
}

Expand Down
Loading
Loading