Skip to content

Commit 557f1e0

Browse files
authored
Merge pull request #517 from zinal/feature-pq_write_stats
feature: statistics reporting for topic writes
2 parents 6e8133b + 12b2b03 commit 557f1e0

File tree

2 files changed

+114
-18
lines changed

2 files changed

+114
-18
lines changed
Lines changed: 96 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
package tech.ydb.topic.write;
22

3+
import java.time.Duration;
4+
35
/**
46
* @author Nikolay Perfilov
57
*/
68
public class WriteAck {
79
private final long seqNo;
810
private final State state;
911
private final Details details;
12+
private final Statistics statistics;
1013

11-
public WriteAck(long seqNo, State state, Details details) {
14+
public WriteAck(long seqNo, State state, Details details, Statistics statistics) {
1215
this.seqNo = seqNo;
1316
this.state = state;
1417
this.details = details;
18+
this.statistics = statistics;
1519
}
1620

1721
public enum State {
@@ -20,18 +24,6 @@ public enum State {
2024
WRITTEN_IN_TX
2125
}
2226

23-
public static class Details {
24-
private final long offset;
25-
26-
public Details(long offset) {
27-
this.offset = offset;
28-
}
29-
30-
public long getOffset() {
31-
return offset;
32-
}
33-
}
34-
3527
public long getSeqNo() {
3628
return seqNo;
3729
}
@@ -47,4 +39,95 @@ public State getState() {
4739
public Details getDetails() {
4840
return details;
4941
}
42+
43+
/**
44+
* Returns write statistics associated with this write confirmation.
45+
* Note: The statistics may cover multiple messages confirmed together by the server.
46+
* Although this WriteAck corresponds to a single written message, the server may confirm several messages in a single response.
47+
* Therefore, the returned statistics may represent the combined data for all messages included in the same write confirmation from the server.
48+
* @return {@link Statistics} with timings if statistics are available or null otherwise
49+
*/
50+
public Statistics getStatistics() {
51+
return statistics;
52+
}
53+
54+
public static class Details {
55+
private final long offset;
56+
57+
public Details(long offset) {
58+
this.offset = offset;
59+
}
60+
61+
public long getOffset() {
62+
return offset;
63+
}
64+
}
65+
66+
/**
67+
* Messages batch statistics.
68+
* All messages within the batch are persisted together so write
69+
* statistics is for the whole messages batch.
70+
*/
71+
public static class Statistics {
72+
private final Duration persistingTime;
73+
private final Duration partitionQuotaWaitTime;
74+
private final Duration topicQuotaWaitTime;
75+
private final Duration maxQueueWaitTime;
76+
private final Duration minQueueWaitTime;
77+
78+
/**
79+
* Create the messages batch statistics object, for a single messages batch.
80+
*
81+
* @param persistingTime
82+
* @param partitionQuotaWaitTime
83+
* @param topicQuotaWaitTime
84+
* @param maxQueueWaitTime
85+
* @param minQueueWaitTime
86+
*/
87+
public Statistics(Duration persistingTime,
88+
Duration partitionQuotaWaitTime, Duration topicQuotaWaitTime,
89+
Duration maxQueueWaitTime, Duration minQueueWaitTime) {
90+
this.persistingTime = persistingTime;
91+
this.partitionQuotaWaitTime = partitionQuotaWaitTime;
92+
this.topicQuotaWaitTime = topicQuotaWaitTime;
93+
this.maxQueueWaitTime = maxQueueWaitTime;
94+
this.minQueueWaitTime = minQueueWaitTime;
95+
}
96+
97+
/**
98+
* @return Time spent in persisting of data.
99+
*/
100+
public Duration getPersistingTime() {
101+
return persistingTime;
102+
}
103+
104+
/**
105+
* @return Time spent awaiting for partition write quota.
106+
*/
107+
public Duration getPartitionQuotaWaitTime() {
108+
return partitionQuotaWaitTime;
109+
}
110+
111+
/**
112+
* @return Time spent awaiting for topic write quota.
113+
*/
114+
public Duration getTopicQuotaWaitTime() {
115+
return topicQuotaWaitTime;
116+
}
117+
118+
/**
119+
* @return Time spent in queue before persisting, maximal of all messages in response.
120+
*/
121+
public Duration getMaxQueueWaitTime() {
122+
return maxQueueWaitTime;
123+
}
124+
125+
/**
126+
* @return Time spent in queue before persisting, minimal of all messages in response.
127+
*/
128+
public Duration getMinQueueWaitTime() {
129+
return minQueueWaitTime;
130+
}
131+
}
132+
50133
}

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import tech.ydb.core.Issue;
2121
import tech.ydb.core.Status;
2222
import tech.ydb.core.StatusCode;
23+
import tech.ydb.core.utils.ProtobufUtils;
2324
import tech.ydb.proto.StatusCodesProtos;
2425
import tech.ydb.proto.topic.YdbTopic;
2526
import tech.ydb.topic.TopicRpc;
@@ -421,6 +422,17 @@ private void onInitResponse(YdbTopic.StreamWriteMessage.InitResponse response) {
421422
private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response) {
422423
List<YdbTopic.StreamWriteMessage.WriteResponse.WriteAck> acks = response.getAcksList();
423424
logger.debug("[{}] Received WriteResponse with {} WriteAcks", streamId, acks.size());
425+
WriteAck.Statistics statistics = null;
426+
if (response.getWriteStatistics() != null) {
427+
YdbTopic.StreamWriteMessage.WriteResponse.WriteStatistics src = response.getWriteStatistics();
428+
statistics = new WriteAck.Statistics(
429+
ProtobufUtils.protoToDuration(src.getPersistingTime()),
430+
ProtobufUtils.protoToDuration(src.getPartitionQuotaWaitTime()),
431+
ProtobufUtils.protoToDuration(src.getTopicQuotaWaitTime()),
432+
ProtobufUtils.protoToDuration(src.getMaxQueueWaitTime()),
433+
ProtobufUtils.protoToDuration(src.getMinQueueWaitTime())
434+
);
435+
}
424436
int inFlightFreed = 0;
425437
long bytesFreed = 0;
426438
for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack : acks) {
@@ -433,7 +445,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
433445
inFlightFreed++;
434446
bytesFreed += sentMessage.getSize();
435447
sentMessages.remove();
436-
processWriteAck(sentMessage, ack);
448+
processWriteAck(sentMessage, statistics, ack);
437449
break;
438450
}
439451
if (sentMessage.getSeqNo() < ack.getSeqNo()) {
@@ -474,20 +486,20 @@ private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) {
474486
}
475487
}
476488

477-
private void processWriteAck(EnqueuedMessage message,
489+
private void processWriteAck(EnqueuedMessage message, WriteAck.Statistics statistics,
478490
YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) {
479491
logger.debug("[{}] Received WriteAck with seqNo {} and status {}", streamId, ack.getSeqNo(),
480492
ack.getMessageWriteStatusCase());
481493
WriteAck resultAck;
482494
switch (ack.getMessageWriteStatusCase()) {
483495
case WRITTEN:
484496
WriteAck.Details details = new WriteAck.Details(ack.getWritten().getOffset());
485-
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details);
497+
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN, details, statistics);
486498
break;
487499
case SKIPPED:
488500
switch (ack.getSkipped().getReason()) {
489501
case REASON_ALREADY_WRITTEN:
490-
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null);
502+
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null, statistics);
491503
break;
492504
case REASON_UNSPECIFIED:
493505
default:
@@ -497,7 +509,7 @@ private void processWriteAck(EnqueuedMessage message,
497509
}
498510
break;
499511
case WRITTEN_IN_TX:
500-
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null);
512+
resultAck = new WriteAck(ack.getSeqNo(), WriteAck.State.WRITTEN_IN_TX, null, statistics);
501513
break;
502514
default:
503515
message.getFuture().completeExceptionally(
@@ -519,5 +531,6 @@ private void closeDueToError(Status status, Throwable th) {
519531
protected void onStop() {
520532
logger.debug("[{}] Session {} onStop called", streamId, sessionId);
521533
}
534+
522535
}
523536
}

0 commit comments

Comments
 (0)