diff --git a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java index b7b082f1..7d8b2107 100644 --- a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java +++ b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java @@ -1,5 +1,7 @@ package tech.ydb.topic.write; +import java.time.Duration; + /** * @author Nikolay Perfilov */ @@ -7,11 +9,13 @@ public class WriteAck { private final long seqNo; private final State state; private final Details details; + private final Statistics statistics; - public WriteAck(long seqNo, State state, Details details) { + public WriteAck(long seqNo, State state, Details details, Statistics statistics) { this.seqNo = seqNo; this.state = state; this.details = details; + this.statistics = statistics; } public enum State { @@ -20,18 +24,6 @@ public enum State { WRITTEN_IN_TX } - public static class Details { - private final long offset; - - public Details(long offset) { - this.offset = offset; - } - - public long getOffset() { - return offset; - } - } - public long getSeqNo() { return seqNo; } @@ -47,4 +39,95 @@ public State getState() { public Details getDetails() { return details; } + + /** + * Returns write statistics associated with this write confirmation. + * Note: The statistics may cover multiple messages confirmed together by the server. + * Although this WriteAck corresponds to a single written message, the server may confirm several messages in a single response. + * Therefore, the returned statistics may represent the combined data for all messages included in the same write confirmation from the server. + * @return {@link Statistics} with timings if statistics are available or null otherwise + */ + public Statistics getStatistics() { + return statistics; + } + + public static class Details { + private final long offset; + + public Details(long offset) { + this.offset = offset; + } + + public long getOffset() { + return offset; + } + } + + /** + * Messages batch statistics. + * All messages within the batch are persisted together so write + * statistics is for the whole messages batch. + */ + public static class Statistics { + private final Duration persistingTime; + private final Duration partitionQuotaWaitTime; + private final Duration topicQuotaWaitTime; + private final Duration maxQueueWaitTime; + private final Duration minQueueWaitTime; + + /** + * Create the messages batch statistics object, for a single messages batch. + * + * @param persistingTime + * @param partitionQuotaWaitTime + * @param topicQuotaWaitTime + * @param maxQueueWaitTime + * @param minQueueWaitTime + */ + public Statistics(Duration persistingTime, + Duration partitionQuotaWaitTime, Duration topicQuotaWaitTime, + Duration maxQueueWaitTime, Duration minQueueWaitTime) { + this.persistingTime = persistingTime; + this.partitionQuotaWaitTime = partitionQuotaWaitTime; + this.topicQuotaWaitTime = topicQuotaWaitTime; + this.maxQueueWaitTime = maxQueueWaitTime; + this.minQueueWaitTime = minQueueWaitTime; + } + + /** + * @return Time spent in persisting of data. + */ + public Duration getPersistingTime() { + return persistingTime; + } + + /** + * @return Time spent awaiting for partition write quota. + */ + public Duration getPartitionQuotaWaitTime() { + return partitionQuotaWaitTime; + } + + /** + * @return Time spent awaiting for topic write quota. + */ + public Duration getTopicQuotaWaitTime() { + return topicQuotaWaitTime; + } + + /** + * @return Time spent in queue before persisting, maximal of all messages in response. + */ + public Duration getMaxQueueWaitTime() { + return maxQueueWaitTime; + } + + /** + * @return Time spent in queue before persisting, minimal of all messages in response. + */ + public Duration getMinQueueWaitTime() { + return minQueueWaitTime; + } + } + } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 58733d67..7617fe9f 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -20,6 +20,7 @@ import tech.ydb.core.Issue; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; +import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.proto.StatusCodesProtos; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; @@ -421,6 +422,17 @@ private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) { private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) { List acks = response.getAcksList(); logger.debug("[{}] Received WriteResponse with {} WriteAcks", streamId, acks.size()); + WriteAck.Statistics statistics = null; + if (response.getWriteStatistics() != null) { + YdbTopic.StreamWriteMessage.WriteResponse.WriteStatistics src = response.getWriteStatistics(); + statistics = new WriteAck.Statistics( + ProtobufUtils.protoToDuration(src.getPersistingTime()), + ProtobufUtils.protoToDuration(src.getPartitionQuotaWaitTime()), + ProtobufUtils.protoToDuration(src.getTopicQuotaWaitTime()), + ProtobufUtils.protoToDuration(src.getMaxQueueWaitTime()), + ProtobufUtils.protoToDuration(src.getMinQueueWaitTime()) + ); + } int inFlightFreed = 0; long bytesFreed = 0; for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack : acks) { @@ -433,7 +445,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) inFlightFreed++; bytesFreed += sentMessage.getSize(); sentMessages.remove(); - processWriteAck(sentMessage, ack); + processWriteAck(sentMessage, statistics, ack); break; } if (sentMessage.getSeqNo() < ack.getSeqNo()) { @@ -474,7 +486,7 @@ private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) { } } - private void processWriteAck(EnqueuedMessage message, + private void processWriteAck(EnqueuedMessage message, WriteAck.Statistics statistics, YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) { logger.debug("[{}] Received WriteAck with seqNo {} and status {}", streamId, ack.getSeqNo(), ack.getMessageWriteStatusCase()); @@ -482,12 +494,12 @@ private void processWriteAck(EnqueuedMessage message, switch (ack.getMessageWriteStatusCase()) { case WRITTEN: WriteAck.Details details = new WriteAck.Details(ack.getWritten().getOffset()); - resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details); + resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details, statistics); break; case SKIPPED: switch (ack.getSkipped().getReason()) { case REASON_ALREADY_WRITTEN: - resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null); + resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null, statistics); break; case REASON_UNSPECIFIED: default: @@ -497,7 +509,7 @@ private void processWriteAck(EnqueuedMessage message, } break; case WRITTEN_IN_TX: - resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null); + resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null, statistics); break; default: message.getFuture().completeExceptionally( @@ -519,5 +531,6 @@ private void closeDueToError(Status status, Throwable th) { protected void onStop() { logger.debug("[{}] Session {} onStop called", streamId, sessionId); } + } }