Skip to content

[client][test] Add OTel key count metric #1791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 24, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public CompletableFuture<V> get(K key, V reusableValue) {
clientStatsForSingleGet.recordRequestKeyCount(1);
return trackRequest(clientStatsForSingleGet, () -> super.get(key, reusableValue)).whenComplete((v, throwable) -> {
if (throwable == null && v != null) {
clientStatsForSingleGet.recordSuccessRequestKeyCount(1);
clientStatsForSingleGet.recordResponseKeyCount(1);
}
});
}
Expand All @@ -91,8 +91,8 @@ public CompletableFuture<V> get(K key, V reusableValue) {
public CompletableFuture<Map<K, V>> batchGet(Set<K> keys) {
clientStatsForBatchGet.recordRequestKeyCount(keys.size());
return trackRequest(clientStatsForBatchGet, () -> super.batchGet(keys)).whenComplete((v, throwable) -> {
if (throwable == null && v != null) {
clientStatsForBatchGet.recordSuccessRequestKeyCount(v.size());
if (v != null) {
clientStatsForBatchGet.recordResponseKeyCount(v.size());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ private <R> CompletableFuture<R> recordRequestMetrics(
if (requestContext.responseDeserializationTime > 0) {
clientStats.recordResponseDeserializationTime(requestContext.responseDeserializationTime);
}
clientStats.recordSuccessRequestKeyCount(requestContext.successRequestKeyCount.get());
}
// We want to record the response key count number, no matter the request is healthy or unhealthy.
clientStats.recordResponseKeyCount(requestContext.successRequestKeyCount.get());

if (requestContext.noAvailableReplica) {
clientStats.recordNoAvailableReplicaRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ private void validateMetrics(
assertFalse(metrics.get(metricPrefix + "healthy_request_latency.Avg").value() > 0);
assertTrue(metrics.get(metricPrefix + "unhealthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(metricPrefix + "unhealthy_request_latency.Avg").value() > 0);
// as partial healthy request is still considered unhealthy, not incrementing the below metric
assertFalse(metrics.get(metricPrefix + "success_request_key_count.Max").value() > 0);
// as partial healthy request should still report success key count number.
assertTrue(metrics.get(metricPrefix + "success_request_key_count.Max").value() > 0);
if (batchGet) {
assertTrue(metrics.get(metricPrefix + "response_ttfr.Avg").value() > 0);
assertEquals(batchGetRequestContext.successRequestKeyCount.get(), (int) successKeyCount);
Expand Down Expand Up @@ -1294,7 +1294,7 @@ public void testComputeWithExceptionFromTransportLayerForOneRoute() throws IOExc
fail();
} catch (Exception e) {
assertTrue(e.getMessage().endsWith("At least one route did not complete"), e.getMessage());
validateComputeRequestMetrics(false, false, RequestType.COMPUTE, false, 2, 1);
validateComputeRequestMetrics(false, true, RequestType.COMPUTE, false, 2, 1);
} finally {
tearDown();
}
Expand Down Expand Up @@ -1525,7 +1525,7 @@ public void testStreamingComputeWithExceptionFromTransportLayerForOneRoute()
assertEquals(
response.get("test_key_1").get("name").toString(),
COMPUTE_REQUEST_VALUE_RESPONSE.get("test_key_1").get("name"));
validateComputeRequestMetrics(false, false, RequestType.COMPUTE_STREAMING, false, 2, 1);
validateComputeRequestMetrics(false, true /*partialHealthyRequest*/, RequestType.COMPUTE_STREAMING, false, 2, 1);
} finally {
tearDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public void setUp() {

@AfterClass(alwaysRun = true)
public void tearDown() {
spark.stop();
if (spark != null) {
spark.stop();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public void setUp() {

@AfterClass(alwaysRun = true)
public void tearDown() {
spark.stop();
if (spark != null) {
spark.stop();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ public void setUp() {

@AfterClass(alwaysRun = true)
public void tearDown() {
sparkContext.close();
spark.stop();
if (sparkContext != null) {
sparkContext.close();
}
if (spark != null) {
spark.stop();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import static com.linkedin.venice.stats.dimensions.HttpResponseStatusCodeCategory.getVeniceHttpResponseStatusCodeCategory;
import static com.linkedin.venice.stats.dimensions.HttpResponseStatusEnum.transformIntToHttpResponseStatusEnum;
import static com.linkedin.venice.stats.dimensions.MessageType.REQUEST;
import static com.linkedin.venice.stats.dimensions.MessageType.RESPONSE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.HTTP_RESPONSE_STATUS_CODE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.HTTP_RESPONSE_STATUS_CODE_CATEGORY;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_MESSAGE_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REQUEST_METHOD;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_RESPONSE_STATUS_CODE_CATEGORY;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME;
Expand All @@ -25,6 +28,7 @@
import com.linkedin.venice.stats.VeniceOpenTelemetryMetricsRepository;
import com.linkedin.venice.stats.dimensions.HttpResponseStatusCodeCategory;
import com.linkedin.venice.stats.dimensions.HttpResponseStatusEnum;
import com.linkedin.venice.stats.dimensions.MessageType;
import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions;
import com.linkedin.venice.stats.dimensions.VeniceResponseStatusCategory;
import com.linkedin.venice.stats.metrics.MetricEntity;
Expand Down Expand Up @@ -72,8 +76,10 @@ public class BasicClientStats extends AbstractVeniceHttpStats {
private final MetricEntityStateOneEnum<VeniceResponseStatusCategory> unhealthyRequestMetricForDavinciClient;
private final MetricEntityStateOneEnum<VeniceResponseStatusCategory> healthyLatencyMetricForDavinciClient;
private final MetricEntityStateOneEnum<VeniceResponseStatusCategory> unhealthyLatencyMetricForDavinciClient;
private final Sensor requestKeyCountSensor;
private final Sensor successRequestKeyCountSensor;
private final MetricEntityStateOneEnum<MessageType> requestKeyCount;
private final MetricEntityStateOneEnum<MessageType> responseKeyCount;
private final MetricEntityStateOneEnum<MessageType> requestKeyCountDvc;
private final MetricEntityStateOneEnum<MessageType> responseKeyCountDvc;
private final Sensor successRequestRatioSensor;
private final Sensor successRequestKeyRatioSensor;
private final Rate requestRate = new OccurrenceRate();
Expand Down Expand Up @@ -127,6 +133,7 @@ protected BasicClientStats(
// requestSensor will be a derived metric in OTel
requestSensor = registerSensor("request", requestRate);
Rate healthyRequestRate = new OccurrenceRate();
Rate requestKeyCountRate = new Rate();

if (clientType.equals(ClientType.DAVINCI_CLIENT)) {
healthyRequestMetricForDavinciClient = MetricEntityStateOneEnum.create(
Expand Down Expand Up @@ -171,10 +178,31 @@ protected BasicClientStats(
getFullMetricName(BasicClientTehutiMetricName.UNHEALTHY_REQUEST_LATENCY.getMetricName()))),
getBaseDimensionsMap(),
VeniceResponseStatusCategory.class);

requestKeyCountDvc = MetricEntityStateOneEnum.create(
BasicClientMetricEntity.KEY_COUNT_DVC.getMetricEntity(),
getOtelRepository(),
this::registerSensor,
BasicClientTehutiMetricName.REQUEST_KEY_COUNT,
Arrays.asList(requestKeyCountRate, new Avg(), new Max()),
baseDimensionsMap,
MessageType.class);

responseKeyCountDvc = MetricEntityStateOneEnum.create(
BasicClientMetricEntity.KEY_COUNT_DVC.getMetricEntity(),
otelRepository,
this::registerSensor,
BasicClientTehutiMetricName.SUCCESS_REQUEST_KEY_COUNT,
Arrays.asList(successRequestKeyCountRate, new Avg(), new Max()),
baseDimensionsMap,
MessageType.class);

healthyRequestMetric = null;
unhealthyRequestMetric = null;
healthyLatencyMetric = null;
unhealthyLatencyMetric = null;
requestKeyCount = null;
responseKeyCount = null;
} else {
healthyRequestMetric = MetricEntityStateThreeEnums.create(
BasicClientMetricEntity.CALL_COUNT.getMetricEntity(),
Expand Down Expand Up @@ -225,22 +253,38 @@ protected BasicClientStats(
HttpResponseStatusEnum.class,
HttpResponseStatusCodeCategory.class,
VeniceResponseStatusCategory.class);

// request key count
requestKeyCount = MetricEntityStateOneEnum.create(
BasicClientMetricEntity.KEY_COUNT.getMetricEntity(),
getOtelRepository(),
this::registerSensor,
BasicClientTehutiMetricName.REQUEST_KEY_COUNT,
Arrays.asList(requestKeyCountRate, new Avg(), new Max()),
baseDimensionsMap,
MessageType.class);

responseKeyCount = MetricEntityStateOneEnum.create(
BasicClientMetricEntity.KEY_COUNT.getMetricEntity(),
otelRepository,
this::registerSensor,
BasicClientTehutiMetricName.SUCCESS_REQUEST_KEY_COUNT,
Arrays.asList(successRequestKeyCountRate, new Avg(), new Max()),
baseDimensionsMap,
MessageType.class);

healthyRequestMetricForDavinciClient = null;
unhealthyRequestMetricForDavinciClient = null;
healthyLatencyMetricForDavinciClient = null;
unhealthyLatencyMetricForDavinciClient = null;
requestKeyCountDvc = null;
responseKeyCountDvc = null;
}

// successRequestRatioSensor will be a derived metric in OTel
successRequestRatioSensor =
registerSensor(new TehutiUtils.SimpleRatioStat(healthyRequestRate, requestRate, "success_request_ratio"));

// key count
Rate requestKeyCountRate = new Rate();
requestKeyCountSensor = registerSensor("request_key_count", requestKeyCountRate, new Avg(), new Max());
successRequestKeyCountSensor =
registerSensor("success_request_key_count", successRequestKeyCountRate, new Avg(), new Max());

successRequestKeyRatioSensor = registerSensor(
new TehutiUtils.SimpleRatioStat(successRequestKeyCountRate, requestKeyCountRate, "success_request_key_ratio"));
}
Expand Down Expand Up @@ -295,11 +339,15 @@ public void emitUnhealthyRequestMetricsForDavinciClient(double latency) {
}

public void recordRequestKeyCount(int keyCount) {
requestKeyCountSensor.record(keyCount);
MetricEntityStateOneEnum<MessageType> keyCountMetric =
ClientType.isDavinciClient(this.clientType) ? requestKeyCountDvc : requestKeyCount;
keyCountMetric.record(keyCount, REQUEST);
}

public void recordSuccessRequestKeyCount(int successKeyCount) {
successRequestKeyCountSensor.record(successKeyCount);
public void recordResponseKeyCount(int keyCount) {
MetricEntityStateOneEnum<MessageType> keyCountMetric =
ClientType.isDavinciClient(this.clientType) ? responseKeyCountDvc : responseKeyCount;
keyCountMetric.record(keyCount, RESPONSE);
}

protected final Rate getRequestRate() {
Expand Down Expand Up @@ -360,7 +408,8 @@ private Map<VeniceMetricsDimensions, String> getBaseDimensionsMap() {
* Metric names for tehuti metrics used in this class.
*/
public enum BasicClientTehutiMetricName implements TehutiMetricNameEnum {
HEALTHY_REQUEST, UNHEALTHY_REQUEST, HEALTHY_REQUEST_LATENCY, UNHEALTHY_REQUEST_LATENCY;
HEALTHY_REQUEST, UNHEALTHY_REQUEST, HEALTHY_REQUEST_LATENCY, UNHEALTHY_REQUEST_LATENCY, REQUEST_KEY_COUNT,
SUCCESS_REQUEST_KEY_COUNT;

private final String metricName;

Expand Down Expand Up @@ -400,6 +449,13 @@ public enum BasicClientMetricEntity implements ModuleMetricEntityInterface {
HTTP_RESPONSE_STATUS_CODE_CATEGORY,
VENICE_RESPONSE_STATUS_CODE_CATEGORY)
),
/**
* Count of keys for venice client request and response.
*/
KEY_COUNT(
MetricType.HISTOGRAM, MetricUnit.NUMBER, "Count of keys for venice client request and response",
setOf(VENICE_STORE_NAME, VENICE_REQUEST_METHOD, VENICE_MESSAGE_TYPE)
),
/**
* Count of all DaVinci requests: as DaVinci is local reads, we do not track HTTP response codes
* But keeping the same name call_count across all clients for consistency
Expand All @@ -415,6 +471,14 @@ public enum BasicClientMetricEntity implements ModuleMetricEntityInterface {
CALL_TIME.name().toLowerCase(), MetricType.HISTOGRAM, MetricUnit.MILLISECOND,
"Latency for all DaVinci Client responses",
setOf(VENICE_STORE_NAME, VENICE_REQUEST_METHOD, VENICE_RESPONSE_STATUS_CODE_CATEGORY)
),
/**
* Count of keys for all DaVinci Client request and response
*/
KEY_COUNT_DVC(
KEY_COUNT.name().toLowerCase(), MetricType.HISTOGRAM, MetricUnit.NUMBER,
"Count of keys for all DaVinci Client request and response",
setOf(VENICE_STORE_NAME, VENICE_REQUEST_METHOD, VENICE_MESSAGE_TYPE)
);

private final MetricEntity metricEntity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private static void handleMetricTrackingForStreamingCallback(
} else {
clientStats.emitHealthyRequestMetrics(latency, successKeyCnt);
}
clientStats.recordSuccessRequestKeyCount(successKeyCnt);
clientStats.recordResponseKeyCount(successKeyCnt);
clientStats.recordSuccessDuplicateRequestKeyCount(duplicateEntryCnt);
}

Expand All @@ -289,7 +289,7 @@ public ComputeRequestBuilder<K> compute() throws VeniceClientException {
}

clientStats.emitHealthyRequestMetrics(latency, value);
clientStats.recordSuccessRequestKeyCount(getSuccessfulKeyCount(value));
clientStats.recordResponseKeyCount(getSuccessfulKeyCount(value));
return value;
};
}
Expand Down
Loading
Loading