Skip to content

Commit e598567

Browse files
Optimization of the UpdateOffsetsInTransaction calls (#10273)
1 parent a689229 commit e598567

15 files changed

+341
-157
lines changed

ydb/public/sdk/cpp/client/ydb_table/impl/transaction.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,19 @@ TAsyncStatus TTransaction::TImpl::Precommit() const
1414
auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {}));
1515

1616
for (auto& callback : PrecommitCallbacks) {
17-
auto action = [curr = callback()](const TAsyncStatus& prev) {
17+
if (!callback) {
18+
continue;
19+
}
20+
21+
// If you send multiple requests in parallel, the `KQP` service can respond with `SESSION_BUSY`.
22+
// Therefore, precommit operations are performed sequentially. Here we capture the closure to
23+
// trigger it later.
24+
auto action = [callback = std::move(callback)](const TAsyncStatus& prev) {
1825
if (const TStatus& status = prev.GetValue(); !status.IsSuccess()) {
1926
return prev;
2027
}
2128

22-
return curr;
29+
return callback();
2330
};
2431

2532
result = result.Apply(action);
@@ -39,6 +46,8 @@ TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSetting
3946
return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), Nothing()));
4047
}
4148

49+
PrecommitCallbacks.clear();
50+
4251
return Session_.Client_->CommitTransaction(Session_,
4352
TxId_,
4453
settings);

ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class TTransaction::TImpl {
3131
TString TxId_;
3232

3333
bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet
34-
TVector<TPrecommitTransactionCallback> PrecommitCallbacks;
34+
mutable TVector<TPrecommitTransactionCallback> PrecommitCallbacks;
3535
};
3636

3737
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#include "offsets_collector.h"
2+
3+
namespace NYdb::NTopic {
4+
5+
TVector<TTopicOffsets> TOffsetsCollector::GetOffsets() const
6+
{
7+
TVector<TTopicOffsets> topics;
8+
9+
for (auto& [path, partitions] : Ranges) {
10+
TTopicOffsets topic;
11+
topic.Path = path;
12+
13+
topics.push_back(std::move(topic));
14+
15+
for (auto& [id, ranges] : partitions) {
16+
TPartitionOffsets partition;
17+
partition.PartitionId = id;
18+
19+
TTopicOffsets& t = topics.back();
20+
t.Partitions.push_back(std::move(partition));
21+
22+
for (auto& range : ranges) {
23+
TPartitionOffsets& p = t.Partitions.back();
24+
25+
TOffsetsRange r;
26+
r.Start = range.first;
27+
r.End = range.second;
28+
29+
p.Offsets.push_back(r);
30+
}
31+
}
32+
}
33+
34+
return topics;
35+
}
36+
37+
void TOffsetsCollector::CollectOffsets(const TVector<TReadSessionEvent::TEvent>& events)
38+
{
39+
for (auto& event : events) {
40+
if (auto* e = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) {
41+
CollectOffsets(*e);
42+
}
43+
}
44+
}
45+
46+
void TOffsetsCollector::CollectOffsets(const TReadSessionEvent::TEvent& event)
47+
{
48+
if (auto* e = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) {
49+
CollectOffsets(*e);
50+
}
51+
}
52+
53+
void TOffsetsCollector::CollectOffsets(const TReadSessionEvent::TDataReceivedEvent& event)
54+
{
55+
const auto& session = *event.GetPartitionSession();
56+
const TString& topicPath = session.GetTopicPath();
57+
ui32 partitionId = session.GetPartitionId();
58+
59+
if (event.HasCompressedMessages()) {
60+
for (auto& message : event.GetCompressedMessages()) {
61+
ui64 offset = message.GetOffset();
62+
Ranges[topicPath][partitionId].InsertInterval(offset, offset + 1);
63+
}
64+
} else {
65+
for (auto& message : event.GetMessages()) {
66+
ui64 offset = message.GetOffset();
67+
Ranges[topicPath][partitionId].InsertInterval(offset, offset + 1);
68+
}
69+
}
70+
}
71+
72+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include "topic_impl.h"
4+
#include "transaction.h"
5+
6+
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
7+
#include <ydb/public/sdk/cpp/client/ydb_topic/include/read_events.h>
8+
9+
#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>
10+
11+
#include <util/generic/hash.h>
12+
#include <util/generic/string.h>
13+
#include <util/generic/vector.h>
14+
15+
#include <memory>
16+
17+
namespace NYdb::NTopic {
18+
19+
class TOffsetsCollector {
20+
public:
21+
TVector<TTopicOffsets> GetOffsets() const;
22+
23+
void CollectOffsets(const TVector<TReadSessionEvent::TEvent>& events);
24+
void CollectOffsets(const TReadSessionEvent::TEvent& event);
25+
26+
private:
27+
// topic -> partition -> (begin, end)
28+
using TOffsetRanges = THashMap<TString, THashMap<ui64, TDisjointIntervalTree<ui64>>>;
29+
30+
void CollectOffsets(const TReadSessionEvent::TDataReceivedEvent& event);
31+
32+
TOffsetRanges Ranges;
33+
};
34+
35+
}

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp

Lines changed: 3 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,7 @@ TVector<TReadSessionEvent::TEvent> TReadSession::GetEvents(const TReadSessionGet
136136
auto events = GetEvents(settings.Block_, settings.MaxEventsCount_, settings.MaxByteSize_);
137137
if (!events.empty() && settings.Tx_) {
138138
auto& tx = settings.Tx_->get();
139-
for (auto& event : events) {
140-
if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) {
141-
CollectOffsets(tx, *dataEvent);
142-
}
143-
}
144-
UpdateOffsets(tx);
139+
CbContext->TryGet()->CollectOffsets(tx, events, Client);
145140
}
146141
return events;
147142
}
@@ -157,99 +152,13 @@ TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size_t maxB
157152
TMaybe<TReadSessionEvent::TEvent> TReadSession::GetEvent(const TReadSessionGetEventSettings& settings)
158153
{
159154
auto event = GetEvent(settings.Block_, settings.MaxByteSize_);
160-
if (event) {
155+
if (event && settings.Tx_) {
161156
auto& tx = settings.Tx_->get();
162-
if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) {
163-
CollectOffsets(tx, *dataEvent);
164-
}
165-
UpdateOffsets(tx);
157+
CbContext->TryGet()->CollectOffsets(tx, *event, Client);
166158
}
167159
return event;
168160
}
169161

170-
void TReadSession::CollectOffsets(NTable::TTransaction& tx,
171-
const TReadSessionEvent::TDataReceivedEvent& event)
172-
{
173-
const auto& session = *event.GetPartitionSession();
174-
175-
if (event.HasCompressedMessages()) {
176-
for (auto& message : event.GetCompressedMessages()) {
177-
CollectOffsets(tx, session.GetTopicPath(), session.GetPartitionId(), message.GetOffset());
178-
}
179-
} else {
180-
for (auto& message : event.GetMessages()) {
181-
CollectOffsets(tx, session.GetTopicPath(), session.GetPartitionId(), message.GetOffset());
182-
}
183-
}
184-
}
185-
186-
void TReadSession::CollectOffsets(NTable::TTransaction& tx,
187-
const TString& topicPath, ui32 partitionId, ui64 offset)
188-
{
189-
const TString& sessionId = tx.GetSession().GetId();
190-
const TString& txId = tx.GetId();
191-
TOffsetRanges& ranges = OffsetRanges[std::make_pair(sessionId, txId)];
192-
ranges[topicPath][partitionId].InsertInterval(offset, offset + 1);
193-
}
194-
195-
void TReadSession::UpdateOffsets(const NTable::TTransaction& tx)
196-
{
197-
const TString& sessionId = tx.GetSession().GetId();
198-
const TString& txId = tx.GetId();
199-
200-
auto p = OffsetRanges.find(std::make_pair(sessionId, txId));
201-
if (p == OffsetRanges.end()) {
202-
return;
203-
}
204-
205-
TVector<TTopicOffsets> topics;
206-
for (auto& [path, partitions] : p->second) {
207-
TTopicOffsets topic;
208-
topic.Path = path;
209-
210-
topics.push_back(std::move(topic));
211-
212-
for (auto& [id, ranges] : partitions) {
213-
TPartitionOffsets partition;
214-
partition.PartitionId = id;
215-
216-
TTopicOffsets& t = topics.back();
217-
t.Partitions.push_back(std::move(partition));
218-
219-
for (auto& range : ranges) {
220-
TPartitionOffsets& p = t.Partitions.back();
221-
222-
TOffsetsRange r;
223-
r.Start = range.first;
224-
r.End = range.second;
225-
226-
p.Offsets.push_back(r);
227-
}
228-
}
229-
}
230-
231-
Y_ABORT_UNLESS(!topics.empty());
232-
233-
while (true) {
234-
auto result = Client->UpdateOffsetsInTransaction(tx,
235-
topics,
236-
Settings.ConsumerName_,
237-
{}).GetValueSync();
238-
Y_ABORT_UNLESS(!result.IsTransportError());
239-
240-
if (result.GetStatus() != EStatus::SESSION_BUSY) {
241-
if (!result.IsSuccess()) {
242-
ythrow yexception() << "error on update offsets: " << result;
243-
}
244-
break;
245-
}
246-
247-
Sleep(TDuration::MilliSeconds(1));
248-
}
249-
250-
OffsetRanges.erase(std::make_pair(sessionId, txId));
251-
}
252-
253162
bool TReadSession::Close(TDuration timeout) {
254163
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
255164
// Log final counters.

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,6 @@ class TReadSession : public IReadSession {
6363
void AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<false>& deferred);
6464

6565
private:
66-
using TOffsetRanges = THashMap<TString, THashMap<ui64, TDisjointIntervalTree<ui64>>>;
67-
68-
void CollectOffsets(NTable::TTransaction& tx,
69-
const TReadSessionEvent::TDataReceivedEvent& event);
70-
void CollectOffsets(NTable::TTransaction& tx,
71-
const TString& topicPath, ui32 partitionId, ui64 offset);
72-
void UpdateOffsets(const NTable::TTransaction& tx);
73-
74-
//
75-
// (session, tx) -> topic -> partition -> (begin, end)
76-
//
77-
THashMap<std::pair<TString, TString>, TOffsetRanges> OffsetRanges;
78-
7966
TReadSessionSettings Settings;
8067
const TString SessionId;
8168
const TInstant StartSessionTime = TInstant::Now();

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,15 @@
66

77
#include "common.h"
88
#include "counters_logger.h"
9+
#include "offsets_collector.h"
10+
#include "transaction.h"
911

1012
#include <ydb/public/sdk/cpp/client/ydb_topic/include/read_session.h>
1113
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/include/read_session.h>
1214
#include <ydb/public/sdk/cpp/client/ydb_topic/common/callback_context.h>
15+
#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h>
16+
17+
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
1318

1419
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
1520

@@ -1159,6 +1164,13 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
11591164
EventsQueue->SetCallbackContext(TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>::SelfContext);
11601165
}
11611166

1167+
void CollectOffsets(NTable::TTransaction& tx,
1168+
const TVector<TReadSessionEvent::TEvent>& events,
1169+
std::shared_ptr<TTopicClient::TImpl> client);
1170+
void CollectOffsets(NTable::TTransaction& tx,
1171+
const TReadSessionEvent::TEvent& event,
1172+
std::shared_ptr<TTopicClient::TImpl> client);
1173+
11621174
private:
11631175
void BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred);
11641176

@@ -1303,6 +1315,22 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
13031315
};
13041316

13051317
private:
1318+
struct TTransactionInfo {
1319+
TSpinLock Lock;
1320+
bool IsActive = false;
1321+
bool Subscribed = false;
1322+
bool CommitCalled = false;
1323+
TOffsetsCollector OffsetsCollector;
1324+
};
1325+
1326+
using TTransactionInfoPtr = std::shared_ptr<TTransactionInfo>;
1327+
using TTransactionMap = THashMap<TTransactionId, TTransactionInfoPtr>;
1328+
1329+
void TrySubscribeOnTransactionCommit(NTable::TTransaction& tx,
1330+
std::shared_ptr<TTopicClient::TImpl> client);
1331+
TTransactionInfoPtr GetOrCreateTxInfo(const TTransactionId& txId);
1332+
void DeleteTx(const TTransactionId& txId);
1333+
13061334
const TAReadSessionSettings<UseMigrationProtocol> Settings;
13071335
const TString Database;
13081336
const TString SessionId;
@@ -1350,6 +1378,8 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
13501378

13511379
std::unordered_map<ui32, std::vector<TParentInfo>> HierarchyData;
13521380
std::unordered_set<ui64> ReadingFinishedData;
1381+
1382+
TTransactionMap Txs;
13531383
};
13541384

13551385
} // namespace NYdb::NTopic

0 commit comments

Comments
 (0)