Skip to content

[client][test] Add OTel key count metric #1791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,18 @@ public CompletableFuture<V> get(K key) {
public CompletableFuture<V> get(K key, V reusableValue) {
clientStatsForSingleGet.recordRequestKeyCount(1);
return trackRequest(clientStatsForSingleGet, () -> super.get(key, reusableValue)).whenComplete((v, throwable) -> {
if (throwable == null && v != null) {
clientStatsForSingleGet.recordSuccessRequestKeyCount(1);
}
int responseKeyCount = (throwable == null && v != null) ? 1 : 0;
clientStatsForSingleGet.recordResponseKeyCount(responseKeyCount);
});
}

@Override
public CompletableFuture<Map<K, V>> batchGet(Set<K> keys) {
clientStatsForBatchGet.recordRequestKeyCount(keys.size());
return trackRequest(clientStatsForBatchGet, () -> super.batchGet(keys)).whenComplete((v, throwable) -> {
if (throwable == null && v != null) {
clientStatsForBatchGet.recordSuccessRequestKeyCount(v.size());
}
// Always record the response key count number, no matter the request is healthy or not.
int responseKeyCount = (v != null) ? v.size() : 0;
clientStatsForBatchGet.recordResponseKeyCount(responseKeyCount);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public void testGet(boolean reuseApi) throws ExecutionException, InterruptedExce
assertTrue(metrics.get(".test_store--healthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(".test_store--unhealthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(".test_store--healthy_request_latency.Avg").value() > 0);
assertEquals(metrics.get(".test_store--success_request_key_count.Avg").value(), 1.0);
// we have 2 requests, one success and one failure and we would record the key count for the success request as 1
// and the key count for the failure request as 0.
assertEquals(metrics.get(".test_store--success_request_key_count.Avg").value(), 1.0 / 2);
assertEquals(metrics.get(".test_store--success_request_key_count.Max").value(), 1.0);
assertTrue(metrics.get(".test_store--success_request_ratio.SimpleRatioStat").value() < 1.0);
assertTrue(metrics.get(".test_store--success_request_key_ratio.SimpleRatioStat").value() < 1.0);
Expand Down Expand Up @@ -105,7 +107,9 @@ public void testBatchGet() throws ExecutionException, InterruptedException {
assertTrue(metrics.get(".test_store--multiget_healthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(".test_store--multiget_unhealthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(".test_store--multiget_healthy_request_latency.Avg").value() > 0);
assertEquals(metrics.get(".test_store--multiget_success_request_key_count.Avg").value(), 2.0);
// We have 3 batch get requests, one success with 2 keys, one failure, and one with run time exception.
// Key count for the success one is 2, failure one is 0, and the run time exception one is never recorded.
assertEquals(metrics.get(".test_store--multiget_success_request_key_count.Avg").value(), 2.0 / 2);
assertEquals(metrics.get(".test_store--multiget_success_request_key_count.Max").value(), 2.0);
assertTrue(metrics.get(".test_store--multiget_success_request_ratio.SimpleRatioStat").value() < 1.0);
assertTrue(metrics.get(".test_store--multiget_success_request_key_ratio.SimpleRatioStat").value() < 1.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ private <R> CompletableFuture<R> recordRequestMetrics(
if (requestContext.responseDeserializationTime > 0) {
clientStats.recordResponseDeserializationTime(requestContext.responseDeserializationTime);
}
clientStats.recordSuccessRequestKeyCount(requestContext.successRequestKeyCount.get());
}
// We want to record the response key count number, no matter the request is healthy or unhealthy.
clientStats.recordResponseKeyCount(requestContext.successRequestKeyCount.get());

if (requestContext.noAvailableReplica) {
clientStats.recordNoAvailableReplicaRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ private void validateMetrics(
assertFalse(metrics.get(metricPrefix + "healthy_request_latency.Avg").value() > 0);
assertTrue(metrics.get(metricPrefix + "unhealthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(metricPrefix + "unhealthy_request_latency.Avg").value() > 0);
// as partial healthy request is still considered unhealthy, not incrementing the below metric
assertFalse(metrics.get(metricPrefix + "success_request_key_count.Max").value() > 0);
// as partial healthy request should still report success key count number.
assertTrue(metrics.get(metricPrefix + "success_request_key_count.Max").value() > 0);
if (batchGet) {
assertTrue(metrics.get(metricPrefix + "response_ttfr.Avg").value() > 0);
assertEquals(batchGetRequestContext.successRequestKeyCount.get(), (int) successKeyCount);
Expand Down Expand Up @@ -1294,7 +1294,7 @@ public void testComputeWithExceptionFromTransportLayerForOneRoute() throws IOExc
fail();
} catch (Exception e) {
assertTrue(e.getMessage().endsWith("At least one route did not complete"), e.getMessage());
validateComputeRequestMetrics(false, false, RequestType.COMPUTE, false, 2, 1);
validateComputeRequestMetrics(false, true, RequestType.COMPUTE, false, 2, 1);
} finally {
tearDown();
}
Expand Down Expand Up @@ -1525,7 +1525,7 @@ public void testStreamingComputeWithExceptionFromTransportLayerForOneRoute()
assertEquals(
response.get("test_key_1").get("name").toString(),
COMPUTE_REQUEST_VALUE_RESPONSE.get("test_key_1").get("name"));
validateComputeRequestMetrics(false, false, RequestType.COMPUTE_STREAMING, false, 2, 1);
validateComputeRequestMetrics(false, true /*partialHealthyRequest*/, RequestType.COMPUTE_STREAMING, false, 2, 1);
} finally {
tearDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public void setUp() {

@AfterClass(alwaysRun = true)
public void tearDown() {
spark.stop();
if (spark != null) {
spark.stop();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public void setUp() {

@AfterClass(alwaysRun = true)
public void tearDown() {
spark.stop();
if (spark != null) {
spark.stop();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ public void setUp() {

@AfterClass(alwaysRun = true)
public void tearDown() {
sparkContext.close();
spark.stop();
if (sparkContext != null) {
sparkContext.close();
}
if (spark != null) {
spark.stop();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import static com.linkedin.venice.stats.dimensions.HttpResponseStatusCodeCategory.getVeniceHttpResponseStatusCodeCategory;
import static com.linkedin.venice.stats.dimensions.HttpResponseStatusEnum.transformIntToHttpResponseStatusEnum;
import static com.linkedin.venice.stats.dimensions.MessageType.REQUEST;
import static com.linkedin.venice.stats.dimensions.MessageType.RESPONSE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.HTTP_RESPONSE_STATUS_CODE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.HTTP_RESPONSE_STATUS_CODE_CATEGORY;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_MESSAGE_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REQUEST_METHOD;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_RESPONSE_STATUS_CODE_CATEGORY;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME;
Expand All @@ -25,6 +28,7 @@
import com.linkedin.venice.stats.VeniceOpenTelemetryMetricsRepository;
import com.linkedin.venice.stats.dimensions.HttpResponseStatusCodeCategory;
import com.linkedin.venice.stats.dimensions.HttpResponseStatusEnum;
import com.linkedin.venice.stats.dimensions.MessageType;
import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions;
import com.linkedin.venice.stats.dimensions.VeniceResponseStatusCategory;
import com.linkedin.venice.stats.metrics.MetricEntity;
Expand Down Expand Up @@ -72,8 +76,8 @@ public class BasicClientStats extends AbstractVeniceHttpStats {
private final MetricEntityStateOneEnum<VeniceResponseStatusCategory> unhealthyRequestMetricForDavinciClient;
private final MetricEntityStateOneEnum<VeniceResponseStatusCategory> healthyLatencyMetricForDavinciClient;
private final MetricEntityStateOneEnum<VeniceResponseStatusCategory> unhealthyLatencyMetricForDavinciClient;
private final Sensor requestKeyCountSensor;
private final Sensor successRequestKeyCountSensor;
private final MetricEntityStateOneEnum<MessageType> requestKeyCount;
private final MetricEntityStateOneEnum<MessageType> successResponseKeyCount;
private final Sensor successRequestRatioSensor;
private final Sensor successRequestKeyRatioSensor;
private final Rate requestRate = new OccurrenceRate();
Expand Down Expand Up @@ -127,8 +131,9 @@ protected BasicClientStats(
// requestSensor will be a derived metric in OTel
requestSensor = registerSensor("request", requestRate);
Rate healthyRequestRate = new OccurrenceRate();
Rate requestKeyCountRate = new Rate();

if (clientType.equals(ClientType.DAVINCI_CLIENT)) {
if (ClientType.isDavinciClient(clientType)) {
healthyRequestMetricForDavinciClient = MetricEntityStateOneEnum.create(
BasicClientMetricEntity.CALL_COUNT_DVC.getMetricEntity(),
otelRepository,
Expand Down Expand Up @@ -171,6 +176,7 @@ protected BasicClientStats(
getFullMetricName(BasicClientTehutiMetricName.UNHEALTHY_REQUEST_LATENCY.getMetricName()))),
getBaseDimensionsMap(),
VeniceResponseStatusCategory.class);

healthyRequestMetric = null;
unhealthyRequestMetric = null;
healthyLatencyMetric = null;
Expand Down Expand Up @@ -231,16 +237,29 @@ protected BasicClientStats(
unhealthyLatencyMetricForDavinciClient = null;
}

// request key count
requestKeyCount = MetricEntityStateOneEnum.create(
BasicClientMetricEntity.KEY_COUNT.getMetricEntity(),
getOtelRepository(),
this::registerSensor,
BasicClientTehutiMetricName.REQUEST_KEY_COUNT,
Arrays.asList(requestKeyCountRate, new Avg(), new Max()),
baseDimensionsMap,
MessageType.class);

successResponseKeyCount = MetricEntityStateOneEnum.create(
BasicClientMetricEntity.KEY_COUNT.getMetricEntity(),
otelRepository,
this::registerSensor,
BasicClientTehutiMetricName.SUCCESS_REQUEST_KEY_COUNT,
Arrays.asList(successRequestKeyCountRate, new Avg(), new Max()),
baseDimensionsMap,
MessageType.class);

// successRequestRatioSensor will be a derived metric in OTel
successRequestRatioSensor =
registerSensor(new TehutiUtils.SimpleRatioStat(healthyRequestRate, requestRate, "success_request_ratio"));

// key count
Rate requestKeyCountRate = new Rate();
requestKeyCountSensor = registerSensor("request_key_count", requestKeyCountRate, new Avg(), new Max());
successRequestKeyCountSensor =
registerSensor("success_request_key_count", successRequestKeyCountRate, new Avg(), new Max());

successRequestKeyRatioSensor = registerSensor(
new TehutiUtils.SimpleRatioStat(successRequestKeyCountRate, requestKeyCountRate, "success_request_key_ratio"));
}
Expand Down Expand Up @@ -295,11 +314,11 @@ public void emitUnhealthyRequestMetricsForDavinciClient(double latency) {
}

public void recordRequestKeyCount(int keyCount) {
requestKeyCountSensor.record(keyCount);
requestKeyCount.record(keyCount, REQUEST);
}

public void recordSuccessRequestKeyCount(int successKeyCount) {
successRequestKeyCountSensor.record(successKeyCount);
public void recordResponseKeyCount(int keyCount) {
successResponseKeyCount.record(keyCount, RESPONSE);
}

protected final Rate getRequestRate() {
Expand Down Expand Up @@ -360,7 +379,8 @@ private Map<VeniceMetricsDimensions, String> getBaseDimensionsMap() {
* Metric names for tehuti metrics used in this class.
*/
public enum BasicClientTehutiMetricName implements TehutiMetricNameEnum {
HEALTHY_REQUEST, UNHEALTHY_REQUEST, HEALTHY_REQUEST_LATENCY, UNHEALTHY_REQUEST_LATENCY;
HEALTHY_REQUEST, UNHEALTHY_REQUEST, HEALTHY_REQUEST_LATENCY, UNHEALTHY_REQUEST_LATENCY, REQUEST_KEY_COUNT,
SUCCESS_REQUEST_KEY_COUNT;

private final String metricName;

Expand Down Expand Up @@ -400,6 +420,13 @@ public enum BasicClientMetricEntity implements ModuleMetricEntityInterface {
HTTP_RESPONSE_STATUS_CODE_CATEGORY,
VENICE_RESPONSE_STATUS_CODE_CATEGORY)
),
/**
* Count of keys for venice client request and response.
*/
KEY_COUNT(
MetricType.HISTOGRAM, MetricUnit.NUMBER, "Count of keys for venice client request and response",
setOf(VENICE_STORE_NAME, VENICE_REQUEST_METHOD, VENICE_MESSAGE_TYPE)
),
/**
* Count of all DaVinci requests: as DaVinci is local reads, we do not track HTTP response codes
* But keeping the same name call_count across all clients for consistency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private static void handleMetricTrackingForStreamingCallback(
} else {
clientStats.emitHealthyRequestMetrics(latency, successKeyCnt);
}
clientStats.recordSuccessRequestKeyCount(successKeyCnt);
clientStats.recordResponseKeyCount(successKeyCnt);
clientStats.recordSuccessDuplicateRequestKeyCount(duplicateEntryCnt);
}

Expand All @@ -289,7 +289,7 @@ public ComputeRequestBuilder<K> compute() throws VeniceClientException {
}

clientStats.emitHealthyRequestMetrics(latency, value);
clientStats.recordSuccessRequestKeyCount(getSuccessfulKeyCount(value));
clientStats.recordResponseKeyCount(getSuccessfulKeyCount(value));
return value;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
import static com.linkedin.venice.stats.VeniceMetricsRepository.getVeniceMetricsRepository;
import static com.linkedin.venice.stats.dimensions.HttpResponseStatusCodeCategory.getVeniceHttpResponseStatusCodeCategory;
import static com.linkedin.venice.stats.dimensions.HttpResponseStatusEnum.transformIntToHttpResponseStatusEnum;
import static com.linkedin.venice.stats.dimensions.MessageType.*;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.HTTP_RESPONSE_STATUS_CODE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.HTTP_RESPONSE_STATUS_CODE_CATEGORY;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_MESSAGE_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REQUEST_METHOD;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REQUEST_RETRY_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_RESPONSE_STATUS_CODE_CATEGORY;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceResponseStatusCategory.SUCCESS;
import static com.linkedin.venice.utils.OpenTelemetryDataPointTestUtils.getExponentialHistogramPointData;
import static com.linkedin.venice.utils.OpenTelemetryDataPointTestUtils.getLongPointData;
import static com.linkedin.venice.utils.OpenTelemetryDataPointTestUtils.validateExponentialHistogramPointData;
Expand All @@ -25,12 +28,14 @@
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.stats.ClientType;
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.stats.dimensions.MessageType;
import com.linkedin.venice.stats.dimensions.RequestRetryType;
import com.linkedin.venice.stats.dimensions.VeniceResponseStatusCategory;
import com.linkedin.venice.stats.metrics.MetricEntity;
import com.linkedin.venice.stats.metrics.MetricType;
import com.linkedin.venice.stats.metrics.MetricUnit;
import com.linkedin.venice.stats.metrics.ModuleMetricEntityInterface;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.Utils;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
Expand Down Expand Up @@ -86,13 +91,7 @@ public void testEmitHealthyMetrics() {
stats.emitHealthyRequestMetrics(90.0, 2);

validateTehutiMetrics(stats.getMetricsRepository(), ".test_store", true, 90.0);
validateOtelMetrics(
inMemoryMetricReader,
"test_store",
SC_OK,
VeniceResponseStatusCategory.SUCCESS,
90.0,
THIN_CLIENT.getMetricsPrefix());
validateOtelMetrics(inMemoryMetricReader, "test_store", SC_OK, SUCCESS, 90.0, THIN_CLIENT.getMetricsPrefix());
}

@Test
Expand All @@ -102,12 +101,7 @@ public void testEmitHealthyRequestMetricsForDavinciClient() {
stats.emitHealthyRequestMetricsForDavinciClient(90.0);

validateTehutiMetrics(stats.getMetricsRepository(), ".test_store", true, 90.0);
validateOtelMetrics(
inMemoryMetricReader,
"test_store",
VeniceResponseStatusCategory.SUCCESS,
90.0,
DAVINCI_CLIENT.getMetricsPrefix());
validateOtelMetrics(inMemoryMetricReader, "test_store", SUCCESS, 90.0, DAVINCI_CLIENT.getMetricsPrefix());
}

@Test
Expand Down Expand Up @@ -159,6 +153,42 @@ public void testEmitUnhealthyRequestMetricsForDavinciClientWithWrongClientType()
Assert.assertFalse(metrics.get(".test_store--request.OccurrenceRate").value() > 0.0);
}

@Test(dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False")
public void testKeyCountMetrics(boolean isRequest) {
for (ClientType client: ClientType.values()) {
// verify that the following works for all client types.
InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create();
BasicClientStats stats = createStats(inMemoryMetricReader, client);

int keyCount = 10;

if (isRequest) {
stats.recordRequestKeyCount(keyCount);
} else {
stats.recordResponseKeyCount(keyCount);
}

// Check Tehuti metrics
Map<String, ? extends Metric> metrics = stats.getMetricsRepository().metrics();
String storeName = "test_store";
if (isRequest) {
Assert
.assertEquals((int) metrics.get(String.format(".%s--request_key_count.Max", storeName)).value(), keyCount);
} else {
Assert.assertEquals(
(int) metrics.get(String.format(".%s--success_request_key_count.Max", storeName)).value(),
keyCount);
}

// Check OpenTelemetry metrics
Collection<MetricData> metricsData = inMemoryMetricReader.collectAllMetrics();
Attributes expectedAttr = getAttributes(storeName, isRequest ? REQUEST : RESPONSE);
ExponentialHistogramPointData data =
getExponentialHistogramPointData(metricsData, "key_count", client.getMetricsPrefix());
validateExponentialHistogramPointData(data, keyCount, keyCount, 1, keyCount, expectedAttr);
}
}

@Test
public void testEmitRequestRetryMetrics() {
InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create();
Expand Down Expand Up @@ -299,6 +329,14 @@ public void testClientMetricEntities() {
MetricUnit.MILLISECOND,
"Latency for all DaVinci Client responses",
Utils.setOf(VENICE_STORE_NAME, VENICE_REQUEST_METHOD, VENICE_RESPONSE_STATUS_CODE_CATEGORY)));
expectedMetrics.put(
BasicClientStats.BasicClientMetricEntity.KEY_COUNT,
new MetricEntity(
"key_count",
MetricType.HISTOGRAM,
MetricUnit.NUMBER,
"Count of keys for venice client request and response",
Utils.setOf(VENICE_STORE_NAME, VENICE_REQUEST_METHOD, VENICE_MESSAGE_TYPE)));
expectedMetrics.put(
ClientMetricEntity.RETRY_COUNT,
new MetricEntity(
Expand Down Expand Up @@ -405,4 +443,12 @@ private Attributes getExpectedAttributes(
}
return builder.build();
}

private Attributes getAttributes(String storeName, MessageType type) {
AttributesBuilder builder = Attributes.builder()
.put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), storeName)
.put(VENICE_REQUEST_METHOD.getDimensionNameInDefaultFormat(), SINGLE_GET.getDimensionValue())
.put(VENICE_MESSAGE_TYPE.getDimensionNameInDefaultFormat(), type.getDimensionValue());
return builder.build();
}
}
Loading
Loading