Skip to content

feature: statistics reporting for topic writes #517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 96 additions & 13 deletions topic/src/main/java/tech/ydb/topic/write/WriteAck.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package tech.ydb.topic.write;

import java.time.Duration;

/**
* @author Nikolay Perfilov
*/
public class WriteAck {
private final long seqNo;
private final State state;
private final Details details;
private final Statistics statistics;

public WriteAck(long seqNo, State state, Details details) {
public WriteAck(long seqNo, State state, Details details, Statistics statistics) {
this.seqNo = seqNo;
this.state = state;
this.details = details;
this.statistics = statistics;
}

public enum State {
Expand All @@ -20,18 +24,6 @@ public enum State {
WRITTEN_IN_TX
}

public static class Details {
private final long offset;

public Details(long offset) {
this.offset = offset;
}

public long getOffset() {
return offset;
}
}

public long getSeqNo() {
return seqNo;
}
Expand All @@ -47,4 +39,95 @@ public State getState() {
public Details getDetails() {
return details;
}

/**
* Returns write statistics associated with this write confirmation.
* Note: The statistics may cover multiple messages confirmed together by the server.
* Although this WriteAck corresponds to a single written message, the server may confirm several messages in a single response.
* Therefore, the returned statistics may represent the combined data for all messages included in the same write confirmation from the server.
* @return {@link Statistics} with timings if statistics are available or null otherwise
*/
public Statistics getStatistics() {
return statistics;
}

public static class Details {
private final long offset;

public Details(long offset) {
this.offset = offset;
}

public long getOffset() {
return offset;
}
}

/**
* Messages batch statistics.
* All messages within the batch are persisted together so write
* statistics is for the whole messages batch.
*/
public static class Statistics {
private final Duration persistingTime;
private final Duration partitionQuotaWaitTime;
private final Duration topicQuotaWaitTime;
private final Duration maxQueueWaitTime;
private final Duration minQueueWaitTime;

/**
* Create the messages batch statistics object, for a single messages batch.
*
* @param persistingTime
* @param partitionQuotaWaitTime
* @param topicQuotaWaitTime
* @param maxQueueWaitTime
* @param minQueueWaitTime
*/
public Statistics(Duration persistingTime,
Duration partitionQuotaWaitTime, Duration topicQuotaWaitTime,
Duration maxQueueWaitTime, Duration minQueueWaitTime) {
this.persistingTime = persistingTime;
this.partitionQuotaWaitTime = partitionQuotaWaitTime;
this.topicQuotaWaitTime = topicQuotaWaitTime;
this.maxQueueWaitTime = maxQueueWaitTime;
this.minQueueWaitTime = minQueueWaitTime;
}

/**
* @return Time spent in persisting of data.
*/
public Duration getPersistingTime() {
return persistingTime;
}

/**
* @return Time spent awaiting for partition write quota.
*/
public Duration getPartitionQuotaWaitTime() {
return partitionQuotaWaitTime;
}

/**
* @return Time spent awaiting for topic write quota.
*/
public Duration getTopicQuotaWaitTime() {
return topicQuotaWaitTime;
}

/**
* @return Time spent in queue before persisting, maximal of all messages in response.
*/
public Duration getMaxQueueWaitTime() {
return maxQueueWaitTime;
}

/**
* @return Time spent in queue before persisting, minimal of all messages in response.
*/
public Duration getMinQueueWaitTime() {
return minQueueWaitTime;
}
}

}
23 changes: 18 additions & 5 deletions topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
Expand Down Expand Up @@ -421,6 +422,17 @@ private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) {
private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) {
List<YdbTopic.StreamWriteMessage.WriteResponse.WriteAck> acks = response.getAcksList();
logger.debug("[{}] Received WriteResponse with {} WriteAcks", streamId, acks.size());
WriteAck.Statistics statistics = null;
if (response.getWriteStatistics() != null) {
YdbTopic.StreamWriteMessage.WriteResponse.WriteStatistics src = response.getWriteStatistics();
statistics = new WriteAck.Statistics(
ProtobufUtils.protoToDuration(src.getPersistingTime()),
ProtobufUtils.protoToDuration(src.getPartitionQuotaWaitTime()),
ProtobufUtils.protoToDuration(src.getTopicQuotaWaitTime()),
ProtobufUtils.protoToDuration(src.getMaxQueueWaitTime()),
ProtobufUtils.protoToDuration(src.getMinQueueWaitTime())
);
}
int inFlightFreed = 0;
long bytesFreed = 0;
for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack : acks) {
Expand All @@ -433,7 +445,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
inFlightFreed++;
bytesFreed += sentMessage.getSize();
sentMessages.remove();
processWriteAck(sentMessage, ack);
processWriteAck(sentMessage, statistics, ack);
break;
}
if (sentMessage.getSeqNo() < ack.getSeqNo()) {
Expand Down Expand Up @@ -474,20 +486,20 @@ private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) {
}
}

private void processWriteAck(EnqueuedMessage message,
private void processWriteAck(EnqueuedMessage message, WriteAck.Statistics statistics,
YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) {
logger.debug("[{}] Received WriteAck with seqNo {} and status {}", streamId, ack.getSeqNo(),
ack.getMessageWriteStatusCase());
WriteAck resultAck;
switch (ack.getMessageWriteStatusCase()) {
case WRITTEN:
WriteAck.Details details = new WriteAck.Details(ack.getWritten().getOffset());
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details);
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details, statistics);
break;
case SKIPPED:
switch (ack.getSkipped().getReason()) {
case REASON_ALREADY_WRITTEN:
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null);
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null, statistics);
break;
case REASON_UNSPECIFIED:
default:
Expand All @@ -497,7 +509,7 @@ private void processWriteAck(EnqueuedMessage message,
}
break;
case WRITTEN_IN_TX:
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null);
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null, statistics);
break;
default:
message.getFuture().completeExceptionally(
Expand All @@ -519,5 +531,6 @@ private void closeDueToError(Status status, Throwable th) {
protected void onStop() {
logger.debug("[{}] Session {} onStop called", streamId, sessionId);
}

}
}
Loading