From 897c775f25350f2f114687885a6237706eb88d9c Mon Sep 17 00:00:00 2001 From: Max Zinal Date: Thu, 10 Jul 2025 12:26:49 +0300 Subject: [PATCH 1/6] support statistics reporting for topic writes --- .../java/tech/ydb/topic/write/WriteAck.java | 84 ++++++++++++++++--- .../tech/ydb/topic/write/impl/WriterImpl.java | 15 ++-- 2 files changed, 81 insertions(+), 18 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java index b7b082f15..eacd640be 100644 --- a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java +++ b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java @@ -1,5 +1,9 @@ package tech.ydb.topic.write; +import java.time.Duration; + +import tech.ydb.proto.topic.YdbTopic; + /** * @author Nikolay Perfilov */ @@ -7,11 +11,13 @@ public class WriteAck { private final long seqNo; private final State state; private final Details details; + private final Statistics statistics; - public WriteAck(long seqNo, State state, Details details) { + public WriteAck(long seqNo, State state, Details details, Statistics statistics) { this.seqNo = seqNo; this.state = state; this.details = details; + this.statistics = statistics; } public enum State { @@ -20,18 +26,6 @@ public enum State { WRITTEN_IN_TX } - public static class Details { - private final long offset; - - public Details(long offset) { - this.offset = offset; - } - - public long getOffset() { - return offset; - } - } - public long getSeqNo() { return seqNo; } @@ -47,4 +41,68 @@ public State getState() { public Details getDetails() { return details; } + + /** + * Obtain message write statistics + * @return {@link Statistics} with timings if statistics are available or null otherwise + */ + public Statistics getStatistics() { + return statistics; + } + + private static Duration convert(com.google.protobuf.Duration d) { + if (d == null) { + return Duration.ZERO; + } + return Duration.ofSeconds(d.getSeconds(), d.getNanos()); + } + + public static class Details { + private final long offset; + + public Details(long offset) { + this.offset = offset; + } + + public long getOffset() { + return offset; + } + } + + public static class Statistics { + private final Duration persistingTime; + private final Duration partitionQuotaWaitTime; + private final Duration topicQuotaWaitTime; + private final Duration maxQueueWaitTime; + private final Duration minQueueWaitTime; + + public Statistics(YdbTopic.StreamWriteMessage.WriteResponse.WriteStatistics src) { + this.persistingTime = convert(src.getPersistingTime()); + this.partitionQuotaWaitTime = convert(src.getPartitionQuotaWaitTime()); + this.topicQuotaWaitTime = convert(src.getTopicQuotaWaitTime()); + this.maxQueueWaitTime = convert(src.getMaxQueueWaitTime()); + this.minQueueWaitTime = convert(src.getMinQueueWaitTime()); + } + + public Duration getPersistingTime() { + return persistingTime; + } + + public Duration getPartitionQuotaWaitTime() { + return partitionQuotaWaitTime; + } + + public Duration getTopicQuotaWaitTime() { + return topicQuotaWaitTime; + } + + public Duration getMaxQueueWaitTime() { + return maxQueueWaitTime; + } + + public Duration getMinQueueWaitTime() { + return minQueueWaitTime; + } + } + } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 58733d676..13617ab76 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -421,6 +421,10 @@ private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) { private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) { List acks = response.getAcksList(); logger.debug("[{}] Received WriteResponse with {} WriteAcks", streamId, acks.size()); + WriteAck.Statistics statistics = null; + if (response.getWriteStatistics() != null) { + statistics = new WriteAck.Statistics(response.getWriteStatistics()); + } int inFlightFreed = 0; long bytesFreed = 0; for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack : acks) { @@ -433,7 +437,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) inFlightFreed++; bytesFreed += sentMessage.getSize(); sentMessages.remove(); - processWriteAck(sentMessage, ack); + processWriteAck(sentMessage, statistics, ack); break; } if (sentMessage.getSeqNo() < ack.getSeqNo()) { @@ -474,7 +478,7 @@ private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) { } } - private void processWriteAck(EnqueuedMessage message, + private void processWriteAck(EnqueuedMessage message, WriteAck.Statistics statistics, YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) { logger.debug("[{}] Received WriteAck with seqNo {} and status {}", streamId, ack.getSeqNo(), ack.getMessageWriteStatusCase()); @@ -482,12 +486,12 @@ private void processWriteAck(EnqueuedMessage message, switch (ack.getMessageWriteStatusCase()) { case WRITTEN: WriteAck.Details details = new WriteAck.Details(ack.getWritten().getOffset()); - resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details); + resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details, statistics); break; case SKIPPED: switch (ack.getSkipped().getReason()) { case REASON_ALREADY_WRITTEN: - resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null); + resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null, statistics); break; case REASON_UNSPECIFIED: default: @@ -497,7 +501,7 @@ private void processWriteAck(EnqueuedMessage message, } break; case WRITTEN_IN_TX: - resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null); + resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null, statistics); break; default: message.getFuture().completeExceptionally( @@ -519,5 +523,6 @@ private void closeDueToError(Status status, Throwable th) { protected void onStop() { logger.debug("[{}] Session {} onStop called", streamId, sessionId); } + } } From 4e117f79a8d1db4f1eb32dd2fd0aec6ce7e5a302 Mon Sep 17 00:00:00 2001 From: Max Zinal Date: Thu, 10 Jul 2025 12:42:39 +0300 Subject: [PATCH 2/6] removed private protobuf dependency from public class WriteAck --- .../java/tech/ydb/topic/write/WriteAck.java | 23 +++++++------------ .../tech/ydb/topic/write/impl/WriterImpl.java | 17 +++++++++++++- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java index eacd640be..2699849e9 100644 --- a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java +++ b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java @@ -2,8 +2,6 @@ import java.time.Duration; -import tech.ydb.proto.topic.YdbTopic; - /** * @author Nikolay Perfilov */ @@ -50,13 +48,6 @@ public Statistics getStatistics() { return statistics; } - private static Duration convert(com.google.protobuf.Duration d) { - if (d == null) { - return Duration.ZERO; - } - return Duration.ofSeconds(d.getSeconds(), d.getNanos()); - } - public static class Details { private final long offset; @@ -76,12 +67,14 @@ public static class Statistics { private final Duration maxQueueWaitTime; private final Duration minQueueWaitTime; - public Statistics(YdbTopic.StreamWriteMessage.WriteResponse.WriteStatistics src) { - this.persistingTime = convert(src.getPersistingTime()); - this.partitionQuotaWaitTime = convert(src.getPartitionQuotaWaitTime()); - this.topicQuotaWaitTime = convert(src.getTopicQuotaWaitTime()); - this.maxQueueWaitTime = convert(src.getMaxQueueWaitTime()); - this.minQueueWaitTime = convert(src.getMinQueueWaitTime()); + public Statistics(Duration persistingTime, + Duration partitionQuotaWaitTime, Duration topicQuotaWaitTime, + Duration maxQueueWaitTime, Duration minQueueWaitTime) { + this.persistingTime = persistingTime; + this.partitionQuotaWaitTime = partitionQuotaWaitTime; + this.topicQuotaWaitTime = topicQuotaWaitTime; + this.maxQueueWaitTime = maxQueueWaitTime; + this.minQueueWaitTime = minQueueWaitTime; } public Duration getPersistingTime() { diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 13617ab76..137a91adb 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -1,6 +1,7 @@ package tech.ydb.topic.write.impl; import java.io.IOException; +import java.time.Duration; import java.util.Deque; import java.util.LinkedList; import java.util.List; @@ -320,6 +321,13 @@ protected void onShutdown(String reason) { } } + private static Duration convertDuration(com.google.protobuf.Duration d) { + if (d == null) { + return Duration.ZERO; + } + return Duration.ofSeconds(d.getSeconds(), d.getNanos()); + } + private class WriteSessionImpl extends WriteSession { protected String sessionId; private final MessageSender messageSender; @@ -423,7 +431,14 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) logger.debug("[{}] Received WriteResponse with {} WriteAcks", streamId, acks.size()); WriteAck.Statistics statistics = null; if (response.getWriteStatistics() != null) { - statistics = new WriteAck.Statistics(response.getWriteStatistics()); + YdbTopic.StreamWriteMessage.WriteResponse.WriteStatistics src = response.getWriteStatistics(); + statistics = new WriteAck.Statistics( + convertDuration(src.getPersistingTime()), + convertDuration(src.getPartitionQuotaWaitTime()), + convertDuration(src.getTopicQuotaWaitTime()), + convertDuration(src.getMaxQueueWaitTime()), + convertDuration(src.getMinQueueWaitTime()) + ); } int inFlightFreed = 0; long bytesFreed = 0; From 23cea4a12a21b2b0d4a1ff717cabf9ad74181c4a Mon Sep 17 00:00:00 2001 From: Max Zinal Date: Thu, 10 Jul 2025 12:55:32 +0300 Subject: [PATCH 3/6] re-use ProtobufUtils.protoToDuration() for stats conversion --- .../tech/ydb/topic/write/impl/WriterImpl.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 137a91adb..90ccdbcff 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -21,6 +21,7 @@ import tech.ydb.core.Issue; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; +import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.proto.StatusCodesProtos; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; @@ -321,13 +322,6 @@ protected void onShutdown(String reason) { } } - private static Duration convertDuration(com.google.protobuf.Duration d) { - if (d == null) { - return Duration.ZERO; - } - return Duration.ofSeconds(d.getSeconds(), d.getNanos()); - } - private class WriteSessionImpl extends WriteSession { protected String sessionId; private final MessageSender messageSender; @@ -433,11 +427,11 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) if (response.getWriteStatistics() != null) { YdbTopic.StreamWriteMessage.WriteResponse.WriteStatistics src = response.getWriteStatistics(); statistics = new WriteAck.Statistics( - convertDuration(src.getPersistingTime()), - convertDuration(src.getPartitionQuotaWaitTime()), - convertDuration(src.getTopicQuotaWaitTime()), - convertDuration(src.getMaxQueueWaitTime()), - convertDuration(src.getMinQueueWaitTime()) + ProtobufUtils.protoToDuration(src.getPersistingTime()), + ProtobufUtils.protoToDuration(src.getPartitionQuotaWaitTime()), + ProtobufUtils.protoToDuration(src.getTopicQuotaWaitTime()), + ProtobufUtils.protoToDuration(src.getMaxQueueWaitTime()), + ProtobufUtils.protoToDuration(src.getMinQueueWaitTime()) ); } int inFlightFreed = 0; From 312bb07d911ec8494942de47513e606f10805a17 Mon Sep 17 00:00:00 2001 From: Max Zinal Date: Thu, 10 Jul 2025 12:59:15 +0300 Subject: [PATCH 4/6] removed unused import --- topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 90ccdbcff..7617fe9f9 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -1,7 +1,6 @@ package tech.ydb.topic.write.impl; import java.io.IOException; -import java.time.Duration; import java.util.Deque; import java.util.LinkedList; import java.util.List; From 7a490e184c6810ca396affe7e2d3f9f109f4e8ee Mon Sep 17 00:00:00 2001 From: Max Zinal Date: Thu, 10 Jul 2025 15:06:47 +0300 Subject: [PATCH 5/6] added comments on statistics in WriteAck --- .../java/tech/ydb/topic/write/WriteAck.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java index 2699849e9..6a401494d 100644 --- a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java +++ b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java @@ -41,7 +41,10 @@ public Details getDetails() { } /** - * Obtain message write statistics + * Returns write statistics associated with this write confirmation. + * Note: The statistics may cover multiple messages confirmed together by the server. + * Although this WriteAck corresponds to a single written message, the server may confirm several messages in a single response. + * Therefore, the returned statistics may represent the combined data for all messages included in the same write confirmation from the server. * @return {@link Statistics} with timings if statistics are available or null otherwise */ public Statistics getStatistics() { @@ -60,6 +63,11 @@ public long getOffset() { } } + /** + * Messages batch statistics. + * All messages within the batch are persisted together so write + * statistics is for the whole messages batch. + */ public static class Statistics { private final Duration persistingTime; private final Duration partitionQuotaWaitTime; @@ -67,6 +75,15 @@ public static class Statistics { private final Duration maxQueueWaitTime; private final Duration minQueueWaitTime; + /** + * Create the messages batch statistics object, for a single messages batch. + * + * @param persistingTime + * @param partitionQuotaWaitTime + * @param topicQuotaWaitTime + * @param maxQueueWaitTime + * @param minQueueWaitTime + */ public Statistics(Duration persistingTime, Duration partitionQuotaWaitTime, Duration topicQuotaWaitTime, Duration maxQueueWaitTime, Duration minQueueWaitTime) { @@ -77,22 +94,37 @@ public Statistics(Duration persistingTime, this.minQueueWaitTime = minQueueWaitTime; } + /** + * @return Time spent in persisting of data. + */ public Duration getPersistingTime() { return persistingTime; } + /** + * @return Time spent awaiting for partition write quota. + */ public Duration getPartitionQuotaWaitTime() { return partitionQuotaWaitTime; } + /** + * @return Time spent awaiting for topic write quota. + */ public Duration getTopicQuotaWaitTime() { return topicQuotaWaitTime; } + /** + * @return Time spent in queue before persisting, maximal of all messages in response. + */ public Duration getMaxQueueWaitTime() { return maxQueueWaitTime; } + /** + * @return Time spent in queue before persisting, minimal of all messages in response. + */ public Duration getMinQueueWaitTime() { return minQueueWaitTime; } From 12b2b03a1a95bbd49dedcb2d5ee1292620691449 Mon Sep 17 00:00:00 2001 From: Max Zinal Date: Thu, 10 Jul 2025 15:59:42 +0300 Subject: [PATCH 6/6] make linter happy --- topic/src/main/java/tech/ydb/topic/write/WriteAck.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java index 6a401494d..7d8b21075 100644 --- a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java +++ b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java @@ -77,12 +77,12 @@ public static class Statistics { /** * Create the messages batch statistics object, for a single messages batch. - * + * * @param persistingTime * @param partitionQuotaWaitTime * @param topicQuotaWaitTime * @param maxQueueWaitTime - * @param minQueueWaitTime + * @param minQueueWaitTime */ public Statistics(Duration persistingTime, Duration partitionQuotaWaitTime, Duration topicQuotaWaitTime,