Skip to content

feature: statistics reporting for topic writes #517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 64 additions & 13 deletions topic/src/main/java/tech/ydb/topic/write/WriteAck.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package tech.ydb.topic.write;

import java.time.Duration;

/**
* @author Nikolay Perfilov
*/
public class WriteAck {
private final long seqNo;
private final State state;
private final Details details;
private final Statistics statistics;

public WriteAck(long seqNo, State state, Details details) {
public WriteAck(long seqNo, State state, Details details, Statistics statistics) {
this.seqNo = seqNo;
this.state = state;
this.details = details;
this.statistics = statistics;
}

public enum State {
Expand All @@ -20,18 +24,6 @@ public enum State {
WRITTEN_IN_TX
}

public static class Details {
private final long offset;

public Details(long offset) {
this.offset = offset;
}

public long getOffset() {
return offset;
}
}

public long getSeqNo() {
return seqNo;
}
Expand All @@ -47,4 +39,63 @@ public State getState() {
public Details getDetails() {
return details;
}

/**
* Obtain message write statistics
* @return {@link Statistics} with timings if statistics are available or null otherwise
*/
public Statistics getStatistics() {
return statistics;
}

public static class Details {
private final long offset;

public Details(long offset) {
this.offset = offset;
}

public long getOffset() {
return offset;
}
}

public static class Statistics {
private final Duration persistingTime;
private final Duration partitionQuotaWaitTime;
private final Duration topicQuotaWaitTime;
private final Duration maxQueueWaitTime;
private final Duration minQueueWaitTime;

public Statistics(Duration persistingTime,
Duration partitionQuotaWaitTime, Duration topicQuotaWaitTime,
Duration maxQueueWaitTime, Duration minQueueWaitTime) {
this.persistingTime = persistingTime;
this.partitionQuotaWaitTime = partitionQuotaWaitTime;
this.topicQuotaWaitTime = topicQuotaWaitTime;
this.maxQueueWaitTime = maxQueueWaitTime;
this.minQueueWaitTime = minQueueWaitTime;
}

public Duration getPersistingTime() {
return persistingTime;
}

public Duration getPartitionQuotaWaitTime() {
return partitionQuotaWaitTime;
}

public Duration getTopicQuotaWaitTime() {
return topicQuotaWaitTime;
}

public Duration getMaxQueueWaitTime() {
return maxQueueWaitTime;
}

public Duration getMinQueueWaitTime() {
return minQueueWaitTime;
}
}

}
23 changes: 18 additions & 5 deletions topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
Expand Down Expand Up @@ -421,6 +422,17 @@ private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) {
private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) {
List<YdbTopic.StreamWriteMessage.WriteResponse.WriteAck> acks = response.getAcksList();
logger.debug("[{}] Received WriteResponse with {} WriteAcks", streamId, acks.size());
WriteAck.Statistics statistics = null;
if (response.getWriteStatistics() != null) {
YdbTopic.StreamWriteMessage.WriteResponse.WriteStatistics src = response.getWriteStatistics();
statistics = new WriteAck.Statistics(
ProtobufUtils.protoToDuration(src.getPersistingTime()),
ProtobufUtils.protoToDuration(src.getPartitionQuotaWaitTime()),
ProtobufUtils.protoToDuration(src.getTopicQuotaWaitTime()),
ProtobufUtils.protoToDuration(src.getMaxQueueWaitTime()),
ProtobufUtils.protoToDuration(src.getMinQueueWaitTime())
);
}
int inFlightFreed = 0;
long bytesFreed = 0;
for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack : acks) {
Expand All @@ -433,7 +445,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
inFlightFreed++;
bytesFreed += sentMessage.getSize();
sentMessages.remove();
processWriteAck(sentMessage, ack);
processWriteAck(sentMessage, statistics, ack);
break;
}
if (sentMessage.getSeqNo() < ack.getSeqNo()) {
Expand Down Expand Up @@ -474,20 +486,20 @@ private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) {
}
}

private void processWriteAck(EnqueuedMessage message,
private void processWriteAck(EnqueuedMessage message, WriteAck.Statistics statistics,
YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) {
logger.debug("[{}] Received WriteAck with seqNo {} and status {}", streamId, ack.getSeqNo(),
ack.getMessageWriteStatusCase());
WriteAck resultAck;
switch (ack.getMessageWriteStatusCase()) {
case WRITTEN:
WriteAck.Details details = new WriteAck.Details(ack.getWritten().getOffset());
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details);
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details, statistics);
break;
case SKIPPED:
switch (ack.getSkipped().getReason()) {
case REASON_ALREADY_WRITTEN:
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null);
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null, statistics);
break;
case REASON_UNSPECIFIED:
default:
Expand All @@ -497,7 +509,7 @@ private void processWriteAck(EnqueuedMessage message,
}
break;
case WRITTEN_IN_TX:
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null);
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null, statistics);
break;
default:
message.getFuture().completeExceptionally(
Expand All @@ -519,5 +531,6 @@ private void closeDueToError(Status status, Throwable th) {
protected void onStop() {
logger.debug("[{}] Session {} onStop called", streamId, sessionId);
}

}
}