Skip to content

Commit d9f48a3

Browse files
qyryqGazizonoki
authored andcommitted
Add NFederatedTopic::TDeferredCommit implementation (#17430)
1 parent 35655ff commit d9f48a3

File tree

2 files changed

+143
-3
lines changed

2 files changed

+143
-3
lines changed

include/ydb-cpp-sdk/client/federated_topic/federated_topic.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ using TAsyncDescribeTopicResult = NTopic::TAsyncDescribeTopicResult;
2121
struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFederatedPartitionSession> {
2222
using TPtr = TIntrusivePtr<TFederatedPartitionSession>;
2323

24+
friend class TDeferredCommit;
25+
2426
public:
2527
TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession,
2628
std::shared_ptr<TDbInfo> db,
@@ -223,10 +225,10 @@ class TDeferredCommit {
223225
void Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent);
224226

225227
//! Add offsets range to set.
226-
void Add(const TFederatedPartitionSession& partitionSession, ui64 startOffset, ui64 endOffset);
228+
void Add(const TFederatedPartitionSession::TPtr& partitionSession, ui64 startOffset, ui64 endOffset);
227229

228230
//! Add offset to set.
229-
void Add(const TFederatedPartitionSession& partitionSession, ui64 offset);
231+
void Add(const TFederatedPartitionSession::TPtr& partitionSession, ui64 offset);
230232

231233
//! Commit all added offsets.
232234
void Commit();
@@ -399,7 +401,7 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
399401
//! See description in TFederatedEventHandlers class.
400402
FLUENT_SETTING(TFederatedEventHandlers, FederatedEventHandlers);
401403

402-
404+
403405

404406
//! Read policy settings
405407

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
#include <ydb-cpp-sdk/client/federated_topic/federated_topic.h>
2+
#include <src/client/topic/impl/read_session_impl.ipp>
3+
4+
#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>
5+
6+
namespace NYdb::inline V3::NFederatedTopic {
7+
8+
std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) {
9+
if (dataReceivedEvent.HasCompressedMessages()) {
10+
const auto& msg = dataReceivedEvent.GetCompressedMessages()[index];
11+
return {msg.GetOffset(), msg.GetOffset() + 1};
12+
}
13+
const auto& msg = dataReceivedEvent.GetMessages()[index];
14+
return {msg.GetOffset(), msg.GetOffset() + 1};
15+
}
16+
17+
18+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
19+
// NFederatedTopic::TDeferredCommit
20+
21+
class TDeferredCommit::TImpl {
22+
public:
23+
24+
void Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset);
25+
void Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 offset);
26+
27+
void Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message);
28+
void Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent);
29+
30+
void Commit();
31+
32+
private:
33+
static void Add(const TFederatedPartitionSession::TPtr& partitionStream, TDisjointIntervalTree<ui64>& offsetSet, ui64 startOffset, ui64 endOffset);
34+
35+
private:
36+
// Partition stream -> offsets set.
37+
std::unordered_map<TFederatedPartitionSession::TPtr, TDisjointIntervalTree<ui64>, THash<TFederatedPartitionSession::TPtr>> Offsets;
38+
};
39+
40+
TDeferredCommit::TDeferredCommit() {
41+
}
42+
43+
TDeferredCommit::TDeferredCommit(TDeferredCommit&&) = default;
44+
45+
TDeferredCommit& TDeferredCommit::operator=(TDeferredCommit&&) = default;
46+
47+
TDeferredCommit::~TDeferredCommit() {
48+
}
49+
50+
#define GET_IMPL() \
51+
if (!Impl) { \
52+
Impl = std::make_unique<TImpl>(); \
53+
} \
54+
Impl
55+
56+
void TDeferredCommit::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset) {
57+
GET_IMPL()->Add(partitionStream, startOffset, endOffset);
58+
}
59+
60+
void TDeferredCommit::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 offset) {
61+
GET_IMPL()->Add(partitionStream, offset);
62+
}
63+
64+
void TDeferredCommit::Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
65+
GET_IMPL()->Add(message);
66+
}
67+
68+
void TDeferredCommit::Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent) {
69+
GET_IMPL()->Add(dataReceivedEvent);
70+
}
71+
72+
#undef GET_IMPL
73+
74+
void TDeferredCommit::Commit() {
75+
if (Impl) {
76+
Impl->Commit();
77+
}
78+
}
79+
80+
void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
81+
Y_ASSERT(message.GetFederatedPartitionSession());
82+
Add(message.GetFederatedPartitionSession(), message.GetOffset());
83+
}
84+
85+
void TDeferredCommit::TImpl::Add(const TFederatedPartitionSession::TPtr& partitionStream, TDisjointIntervalTree<ui64>& offsetSet, ui64 startOffset, ui64 endOffset) {
86+
if (offsetSet.Intersects(startOffset, endOffset)) {
87+
ThrowFatalError(TStringBuilder() << "Commit set already has some offsets from half-interval ["
88+
<< startOffset << "; " << endOffset
89+
<< ") for partition stream with id " << partitionStream->GetPartitionSessionId());
90+
} else {
91+
offsetSet.InsertInterval(startOffset, endOffset);
92+
}
93+
}
94+
95+
void TDeferredCommit::TImpl::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset) {
96+
Y_ASSERT(partitionStream);
97+
Add(partitionStream, Offsets[partitionStream], startOffset, endOffset);
98+
}
99+
100+
void TDeferredCommit::TImpl::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 offset) {
101+
Y_ASSERT(partitionStream);
102+
auto& offsetSet = Offsets[partitionStream];
103+
if (offsetSet.Has(offset)) {
104+
ThrowFatalError(TStringBuilder() << "Commit set already has offset " << offset
105+
<< " for partition stream with id " << partitionStream->GetPartitionSessionId());
106+
} else {
107+
offsetSet.Insert(offset);
108+
}
109+
}
110+
111+
void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent) {
112+
const TFederatedPartitionSession::TPtr& partitionStream = dataReceivedEvent.GetFederatedPartitionSession();
113+
Y_ASSERT(partitionStream);
114+
auto& offsetSet = Offsets[partitionStream];
115+
auto [startOffset, endOffset] = GetMessageOffsetRange(dataReceivedEvent, 0);
116+
for (size_t i = 1; i < dataReceivedEvent.GetMessagesCount(); ++i) {
117+
auto msgOffsetRange = GetMessageOffsetRange(dataReceivedEvent, i);
118+
if (msgOffsetRange.first == endOffset) {
119+
endOffset= msgOffsetRange.second;
120+
} else {
121+
Add(partitionStream, offsetSet, startOffset, endOffset);
122+
startOffset = msgOffsetRange.first;
123+
endOffset = msgOffsetRange.second;
124+
}
125+
}
126+
Add(partitionStream, offsetSet, startOffset, endOffset);
127+
}
128+
129+
void TDeferredCommit::TImpl::Commit() {
130+
for (auto&& [partitionStream, offsetRanges] : Offsets) {
131+
for (auto&& [startOffset, endOffset] : offsetRanges) {
132+
static_cast<NTopic::TPartitionStreamImpl<false>*>(partitionStream.Get()->PartitionSession.Get())->Commit(startOffset, endOffset);
133+
}
134+
}
135+
Offsets.clear();
136+
}
137+
138+
}

0 commit comments

Comments
 (0)