Skip to content

[client][test] Add OTel key count metric #1791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 24, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public CompletableFuture<V> get(K key, V reusableValue) {
return trackRequest(clientStatsForSingleGet, () -> super.get(key, reusableValue)).whenComplete((v, throwable) -> {
if (throwable == null && v != null) {
clientStatsForSingleGet.recordSuccessRequestKeyCount(1);
clientStatsForSingleGet.recordFailedRequestKeyCount(0);
} else {
clientStatsForSingleGet.recordSuccessRequestKeyCount(0);
clientStatsForSingleGet.recordFailedRequestKeyCount(1);
}
});
}
Expand All @@ -91,8 +95,12 @@ public CompletableFuture<V> get(K key, V reusableValue) {
public CompletableFuture<Map<K, V>> batchGet(Set<K> keys) {
clientStatsForBatchGet.recordRequestKeyCount(keys.size());
return trackRequest(clientStatsForBatchGet, () -> super.batchGet(keys)).whenComplete((v, throwable) -> {
if (throwable == null && v != null) {
if (v != null) {
clientStatsForBatchGet.recordSuccessRequestKeyCount(v.size());
clientStatsForBatchGet.recordFailedRequestKeyCount(keys.size() - v.size());
} else {
clientStatsForBatchGet.recordSuccessRequestKeyCount(0);
clientStatsForBatchGet.recordFailedRequestKeyCount(keys.size());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testGet(boolean reuseApi) throws ExecutionException, InterruptedExce
assertTrue(metrics.get(".test_store--healthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(".test_store--unhealthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(".test_store--healthy_request_latency.Avg").value() > 0);
assertEquals(metrics.get(".test_store--success_request_key_count.Avg").value(), 1.0);
assertEquals(metrics.get(".test_store--success_request_key_count.Avg").value(), 1.0 / 2);
assertEquals(metrics.get(".test_store--success_request_key_count.Max").value(), 1.0);
assertTrue(metrics.get(".test_store--success_request_ratio.SimpleRatioStat").value() < 1.0);
assertTrue(metrics.get(".test_store--success_request_key_ratio.SimpleRatioStat").value() < 1.0);
Expand Down Expand Up @@ -105,7 +105,7 @@ public void testBatchGet() throws ExecutionException, InterruptedException {
assertTrue(metrics.get(".test_store--multiget_healthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(".test_store--multiget_unhealthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(".test_store--multiget_healthy_request_latency.Avg").value() > 0);
assertEquals(metrics.get(".test_store--multiget_success_request_key_count.Avg").value(), 2.0);
assertEquals(metrics.get(".test_store--multiget_success_request_key_count.Avg").value(), 1.0); // round (2.0 / 3)
assertEquals(metrics.get(".test_store--multiget_success_request_key_count.Max").value(), 2.0);
assertTrue(metrics.get(".test_store--multiget_success_request_ratio.SimpleRatioStat").value() < 1.0);
assertTrue(metrics.get(".test_store--multiget_success_request_key_ratio.SimpleRatioStat").value() < 1.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ private <R> CompletableFuture<R> recordRequestMetrics(
}

if (exceptionReceived) {
// We still want to record the partial success key count number even if the request is unhealthy.
clientStats.emitUnhealthyRequestMetrics(latency, throwable);
} else {
clientStats.emitHealthyRequestMetrics(latency, requestContext.successRequestKeyCount.get());
Expand All @@ -176,13 +177,15 @@ private <R> CompletableFuture<R> recordRequestMetrics(
if (requestContext.responseDeserializationTime > 0) {
clientStats.recordResponseDeserializationTime(requestContext.responseDeserializationTime);
}
clientStats.recordSuccessRequestKeyCount(requestContext.successRequestKeyCount.get());
}

clientStats.recordSuccessRequestKeyCount(requestContext.successRequestKeyCount.get());

if (requestContext.noAvailableReplica) {
clientStats.recordNoAvailableReplicaRequest();
}

int retrySuccessKeyCount = 0;
if (requestContext instanceof GetRequestContext) {
GetRequestContext getRequestContext = (GetRequestContext) requestContext;

Expand All @@ -198,7 +201,8 @@ private <R> CompletableFuture<R> recordRequestMetrics(
if (!exceptionReceived) {
if (getRequestContext.retryContext.retryWin) {
clientStats.recordRetryRequestWin();
clientStats.recordRetryRequestSuccessKeyCount(1);
retrySuccessKeyCount = 1;
clientStats.recordRetryRequestSuccessKeyCount(retrySuccessKeyCount);
}
}
}
Expand All @@ -213,14 +217,20 @@ private <R> CompletableFuture<R> recordRequestMetrics(
clientStats.recordRetryRequestKeyCount(retryRequestContext.numKeysInRequest);
clientStats.recordRetryFanoutSize(retryRequestContext.getFanoutSize());
if (!exceptionReceived) {
clientStats.recordRetryRequestSuccessKeyCount(retryRequestContext.numKeysCompleted.get());
retrySuccessKeyCount = retryRequestContext.numKeysCompleted.get();
clientStats.recordRetryRequestSuccessKeyCount(retrySuccessKeyCount);
if (retryRequestContext.numKeysCompleted.get() > 0) {
clientStats.recordRetryRequestWin();
}
}
}
}

// numberOfKeys = successKeyCount + retrySuccessKeyCount + failedKeyCount.
clientStats.recordFailedRequestKeyCount(
numberOfKeys - requestContext.successRequestKeyCount.get() - retrySuccessKeyCount,
throwable);

if (exceptionReceived) {
// throw an exception after incrementing some error related metrics
if (throwable instanceof VeniceClientException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ private void validateMetrics(
assertFalse(metrics.get(metricPrefix + "healthy_request_latency.Avg").value() > 0);
assertTrue(metrics.get(metricPrefix + "unhealthy_request.OccurrenceRate").value() > 0);
assertTrue(metrics.get(metricPrefix + "unhealthy_request_latency.Avg").value() > 0);
// as partial healthy request is still considered unhealthy, not incrementing the below metric
assertFalse(metrics.get(metricPrefix + "success_request_key_count.Max").value() > 0);
// as partial healthy request is should still report success key count.
assertTrue(metrics.get(metricPrefix + "success_request_key_count.Max").value() > 0);
if (batchGet) {
assertTrue(metrics.get(metricPrefix + "response_ttfr.Avg").value() > 0);
assertEquals(batchGetRequestContext.successRequestKeyCount.get(), (int) successKeyCount);
Expand Down Expand Up @@ -1294,7 +1294,7 @@ public void testComputeWithExceptionFromTransportLayerForOneRoute() throws IOExc
fail();
} catch (Exception e) {
assertTrue(e.getMessage().endsWith("At least one route did not complete"), e.getMessage());
validateComputeRequestMetrics(false, false, RequestType.COMPUTE, false, 2, 1);
validateComputeRequestMetrics(false, true, RequestType.COMPUTE, false, 2, 1);
} finally {
tearDown();
}
Expand Down Expand Up @@ -1525,7 +1525,7 @@ public void testStreamingComputeWithExceptionFromTransportLayerForOneRoute()
assertEquals(
response.get("test_key_1").get("name").toString(),
COMPUTE_REQUEST_VALUE_RESPONSE.get("test_key_1").get("name"));
validateComputeRequestMetrics(false, false, RequestType.COMPUTE_STREAMING, false, 2, 1);
validateComputeRequestMetrics(false, true /*partialHealthyRequest*/, RequestType.COMPUTE_STREAMING, false, 2, 1);
} finally {
tearDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public void setUp() {

@AfterClass(alwaysRun = true)
public void tearDown() {
spark.stop();
if (spark != null) {
spark.stop();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public void setUp() {

@AfterClass(alwaysRun = true)
public void tearDown() {
spark.stop();
if (spark != null) {
spark.stop();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ public void setUp() {

@AfterClass(alwaysRun = true)
public void tearDown() {
sparkContext.close();
spark.stop();
if (sparkContext != null) {
sparkContext.close();
}
if (spark != null) {
spark.stop();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static com.linkedin.venice.stats.metrics.ModuleMetricEntityInterface.getUniqueMetricEntities;
import static com.linkedin.venice.utils.CollectionUtils.setOf;
import static org.apache.hc.core5.http.HttpStatus.SC_NOT_FOUND;
import static org.apache.hc.core5.http.HttpStatus.SC_NO_CONTENT;
import static org.apache.hc.core5.http.HttpStatus.SC_OK;

import com.linkedin.venice.client.exceptions.VeniceClientHttpException;
Expand Down Expand Up @@ -73,6 +74,10 @@ public class BasicClientStats extends AbstractVeniceHttpStats {
private final MetricEntityStateOneEnum<VeniceResponseStatusCategory> healthyLatencyMetricForDavinciClient;
private final MetricEntityStateOneEnum<VeniceResponseStatusCategory> unhealthyLatencyMetricForDavinciClient;
private final Sensor requestKeyCountSensor;
private final MetricEntityStateThreeEnums<HttpResponseStatusEnum, HttpResponseStatusCodeCategory, VeniceResponseStatusCategory> healthyKeyCountMetric;
private final MetricEntityStateThreeEnums<HttpResponseStatusEnum, HttpResponseStatusCodeCategory, VeniceResponseStatusCategory> unhealthyKeyCountMetric;
private final MetricEntityStateOneEnum<VeniceResponseStatusCategory> healthyKeyCountMetricForDavinciClient;
private final MetricEntityStateOneEnum<VeniceResponseStatusCategory> unhealthyKeyCountMetricForDavinciClient;
private final Sensor successRequestKeyCountSensor;
private final Sensor successRequestRatioSensor;
private final Sensor successRequestKeyRatioSensor;
Expand Down Expand Up @@ -171,10 +176,31 @@ protected BasicClientStats(
getFullMetricName(BasicClientTehutiMetricName.UNHEALTHY_REQUEST_LATENCY.getMetricName()))),
getBaseDimensionsMap(),
VeniceResponseStatusCategory.class);

healthyKeyCountMetricForDavinciClient = MetricEntityStateOneEnum.create(
BasicClientMetricEntity.KEY_COUNT_DVC.getMetricEntity(),
getOtelRepository(),
this::registerSensor,
BasicClientTehutiMetricName.SUCCESS_REQUEST_KEY_COUNT,
Arrays.asList(successRequestKeyCountRate, new Avg(), new Max()),
baseDimensionsMap,
VeniceResponseStatusCategory.class);

unhealthyKeyCountMetricForDavinciClient = MetricEntityStateOneEnum.create(
BasicClientMetricEntity.KEY_COUNT_DVC.getMetricEntity(),
otelRepository,
this::registerSensor,
BasicClientTehutiMetricName.FAILED_REQUEST_KEY_COUNT,
Arrays.asList(new Rate(), new Avg(), new Max()),
baseDimensionsMap,
VeniceResponseStatusCategory.class);

healthyRequestMetric = null;
unhealthyRequestMetric = null;
healthyLatencyMetric = null;
unhealthyLatencyMetric = null;
healthyKeyCountMetric = null;
unhealthyKeyCountMetric = null;
} else {
healthyRequestMetric = MetricEntityStateThreeEnums.create(
BasicClientMetricEntity.CALL_COUNT.getMetricEntity(),
Expand Down Expand Up @@ -225,10 +251,34 @@ protected BasicClientStats(
HttpResponseStatusEnum.class,
HttpResponseStatusCodeCategory.class,
VeniceResponseStatusCategory.class);
// key count
healthyKeyCountMetric = MetricEntityStateThreeEnums.create(
BasicClientMetricEntity.KEY_COUNT.getMetricEntity(),
getOtelRepository(),
this::registerSensor,
BasicClientTehutiMetricName.SUCCESS_REQUEST_KEY_COUNT,
Arrays.asList(successRequestKeyCountRate, new Avg(), new Max()),
baseDimensionsMap,
HttpResponseStatusEnum.class,
HttpResponseStatusCodeCategory.class,
VeniceResponseStatusCategory.class);
unhealthyKeyCountMetric = MetricEntityStateThreeEnums.create(
BasicClientMetricEntity.KEY_COUNT.getMetricEntity(),
otelRepository,
this::registerSensor,
BasicClientTehutiMetricName.FAILED_REQUEST_KEY_COUNT,
Arrays.asList(new Rate(), new Avg(), new Max()),
baseDimensionsMap,
HttpResponseStatusEnum.class,
HttpResponseStatusCodeCategory.class,
VeniceResponseStatusCategory.class);

healthyRequestMetricForDavinciClient = null;
unhealthyRequestMetricForDavinciClient = null;
healthyLatencyMetricForDavinciClient = null;
unhealthyLatencyMetricForDavinciClient = null;
healthyKeyCountMetricForDavinciClient = null;
unhealthyKeyCountMetricForDavinciClient = null;
}

// successRequestRatioSensor will be a derived metric in OTel
Expand Down Expand Up @@ -300,6 +350,34 @@ public void recordRequestKeyCount(int keyCount) {

public void recordSuccessRequestKeyCount(int successKeyCount) {
successRequestKeyCountSensor.record(successKeyCount);
if (ClientType.isDavinciClient(this.clientType)) {
healthyKeyCountMetricForDavinciClient.record(successKeyCount, SUCCESS);
} else {
int httpStatus = getHealthyRequestHttpStatus(successKeyCount);
HttpResponseStatusEnum statusEnum = transformIntToHttpResponseStatusEnum(httpStatus);
HttpResponseStatusCodeCategory httpCategory = getVeniceHttpResponseStatusCodeCategory(httpStatus);
healthyKeyCountMetric.record(successKeyCount, statusEnum, httpCategory, SUCCESS);
}
}

public void recordFailedRequestKeyCount(int failedKeyCount, Throwable throwable) {
if (ClientType.isDavinciClient(this.clientType)) {
unhealthyKeyCountMetricForDavinciClient.record(failedKeyCount, FAIL);
} else {
/**
* When throwable is null and the failed key count is 0, it means that the request was successful. However,
* we still need to record the failed key count as 0, and thus we use a default http status of SC_NO_CONTENT
* to indicate success.
*/
int httpStatus = throwable != null ? getUnhealthyRequestHttpStatus(throwable) : SC_NO_CONTENT;
HttpResponseStatusEnum statusEnum = transformIntToHttpResponseStatusEnum(httpStatus);
HttpResponseStatusCodeCategory httpCategory = getVeniceHttpResponseStatusCodeCategory(httpStatus);
unhealthyKeyCountMetric.record(failedKeyCount, statusEnum, httpCategory, FAIL);
}
}

public void recordFailedRequestKeyCount(int failedKeyCount) {
recordFailedRequestKeyCount(failedKeyCount, null);
}

protected final Rate getRequestRate() {
Expand Down Expand Up @@ -360,7 +438,8 @@ private Map<VeniceMetricsDimensions, String> getBaseDimensionsMap() {
* Metric names for tehuti metrics used in this class.
*/
public enum BasicClientTehutiMetricName implements TehutiMetricNameEnum {
HEALTHY_REQUEST, UNHEALTHY_REQUEST, HEALTHY_REQUEST_LATENCY, UNHEALTHY_REQUEST_LATENCY;
HEALTHY_REQUEST, UNHEALTHY_REQUEST, HEALTHY_REQUEST_LATENCY, UNHEALTHY_REQUEST_LATENCY, SUCCESS_REQUEST_KEY_COUNT,
FAILED_REQUEST_KEY_COUNT;

private final String metricName;

Expand Down Expand Up @@ -400,6 +479,18 @@ public enum BasicClientMetricEntity implements ModuleMetricEntityInterface {
HTTP_RESPONSE_STATUS_CODE_CATEGORY,
VENICE_RESPONSE_STATUS_CODE_CATEGORY)
),
/**
* Count of keys during response handling along with response codes
*/
KEY_COUNT(
MetricType.HISTOGRAM, MetricUnit.NUMBER, "Count of keys during response handling along with response codes",
setOf(
VENICE_STORE_NAME,
VENICE_REQUEST_METHOD,
HTTP_RESPONSE_STATUS_CODE,
HTTP_RESPONSE_STATUS_CODE_CATEGORY,
VENICE_RESPONSE_STATUS_CODE_CATEGORY)
),
/**
* Count of all DaVinci requests: as DaVinci is local reads, we do not track HTTP response codes
* But keeping the same name call_count across all clients for consistency
Expand All @@ -415,6 +506,14 @@ public enum BasicClientMetricEntity implements ModuleMetricEntityInterface {
CALL_TIME.name().toLowerCase(), MetricType.HISTOGRAM, MetricUnit.MILLISECOND,
"Latency for all DaVinci Client responses",
setOf(VENICE_STORE_NAME, VENICE_REQUEST_METHOD, VENICE_RESPONSE_STATUS_CODE_CATEGORY)
),
/**
* Count of keys during response handling along with response codes
*/
KEY_COUNT_DVC(
KEY_COUNT.name().toLowerCase(), MetricType.HISTOGRAM, MetricUnit.NUMBER,
"Count of keys for all DaVinci Client responses",
setOf(VENICE_STORE_NAME, VENICE_REQUEST_METHOD, VENICE_RESPONSE_STATUS_CODE_CATEGORY)
);

private final MetricEntity metricEntity;
Expand Down
Loading
Loading