Skip to content

Commit 421e268

Browse files
Alek5andr-KotovGazizonoki
authored andcommitted
Moved commit "code EES_WRITTEN_IN_TX" from ydb repo
1 parent 48594d3 commit 421e268

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

include/ydb-cpp-sdk/client/topic/write_events.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ struct TWriteSessionEvent {
5656
enum EEventState {
5757
EES_WRITTEN, //! Successfully written.
5858
EES_ALREADY_WRITTEN, //! Skipped on SeqNo deduplication.
59-
EES_DISCARDED //! In case of destruction of writer or retry policy discarded future retries in this writer.
59+
EES_DISCARDED, //! In case of destruction of writer or retry policy discarded future retries in this writer.
60+
EES_WRITTEN_IN_TX, //! Successfully written in tx.
6061
};
6162
//! Details of successfully written message.
6263
struct TWrittenMessageDetails {

src/client/topic/impl/write_session_impl.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,17 +1002,23 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
10021002
writeStat->PartitionQuotedTime = durationConv(stat.partition_quota_wait_time());
10031003
writeStat->TopicQuotedTime = durationConv(stat.topic_quota_wait_time());
10041004

1005-
for (size_t messageIndex = 0, endIndex = batchWriteResponse.acks_size(); messageIndex != endIndex; ++messageIndex) {
1005+
for (const auto& ack : batchWriteResponse.acks()) {
10061006
// TODO: Fill writer statistics
1007-
auto ack = batchWriteResponse.acks(messageIndex);
10081007
uint64_t sequenceNumber = ack.seq_no();
10091008

1010-
Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped());
1011-
auto msgWriteStatus = ack.has_written()
1012-
? TWriteSessionEvent::TWriteAck::EES_WRITTEN
1013-
: (ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN
1014-
? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN
1015-
: TWriteSessionEvent::TWriteAck::EES_DISCARDED);
1009+
Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped() || ack.has_written_in_tx());
1010+
1011+
TWriteSessionEvent::TWriteAck::EEventState msgWriteStatus;
1012+
if (ack.has_written_in_tx()) {
1013+
msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX;
1014+
} else if (ack.has_written()) {
1015+
msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN;
1016+
} else {
1017+
msgWriteStatus =
1018+
(ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN)
1019+
? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN
1020+
: TWriteSessionEvent::TWriteAck::EES_DISCARDED;
1021+
}
10161022

10171023
uint64_t offset = ack.has_written() ? ack.written().offset() : 0;
10181024

0 commit comments

Comments
 (0)