Skip to content

Commit 16120ac

Browse files
Alek5andr-KotovGazizonoki
authored andcommitted
Moved commits "Optimization of the UpdateOffsetsInTransaction calls" and "Commit offsets in a transaction and out of a transaction" from ydb repo
1 parent 5b6226f commit 16120ac

17 files changed

+394
-152
lines changed

include/ydb-cpp-sdk/client/topic/read_events.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ struct TReadSessionEvent {
200200
return CompressedMessages;
201201
}
202202

203+
void SetReadInTransaction() {
204+
ReadInTransaction = true;
205+
}
206+
203207
//! Commits all messages in batch.
204208
void Commit();
205209

@@ -218,6 +222,7 @@ struct TReadSessionEvent {
218222
std::vector<TMessage> Messages;
219223
std::vector<TCompressedMessage> CompressedMessages;
220224
std::vector<std::pair<uint64_t, uint64_t>> OffsetRanges;
225+
bool ReadInTransaction = false;
221226
};
222227

223228
//! Acknowledgement for commit request.

src/client/table/impl/transaction.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,19 @@ TAsyncStatus TTransaction::TImpl::Precommit() const
1414
auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {}));
1515

1616
for (auto& callback : PrecommitCallbacks) {
17-
auto action = [curr = callback()](const TAsyncStatus& prev) {
17+
if (!callback) {
18+
continue;
19+
}
20+
21+
// If you send multiple requests in parallel, the `KQP` service can respond with `SESSION_BUSY`.
22+
// Therefore, precommit operations are performed sequentially. Here we capture the closure to
23+
// trigger it later.
24+
auto action = [callback = std::move(callback)](const TAsyncStatus& prev) {
1825
if (const TStatus& status = prev.GetValue(); !status.IsSuccess()) {
1926
return prev;
2027
}
2128

22-
return curr;
29+
return callback();
2330
};
2431

2532
result = result.Apply(action);
@@ -39,6 +46,8 @@ TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSetting
3946
return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), std::nullopt));
4047
}
4148

49+
PrecommitCallbacks.clear();
50+
4251
return Session_.Client_->CommitTransaction(Session_,
4352
TxId_,
4453
settings);

src/client/table/impl/transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class TTransaction::TImpl {
3131
std::string TxId_;
3232

3333
bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet
34-
std::vector<TPrecommitTransactionCallback> PrecommitCallbacks;
34+
mutable std::vector<TPrecommitTransactionCallback> PrecommitCallbacks;
3535
};
3636

3737
}

src/client/topic/impl/CMakeLists.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@ target_sources(client-ydb_topic-impl
2727
common.cpp
2828
deferred_commit.cpp
2929
event_handlers.cpp
30+
offsets_collector.cpp
3031
read_session_event.cpp
3132
read_session.cpp
32-
write_session.cpp
33-
write_session_impl.cpp
3433
topic_impl.cpp
3534
topic.cpp
35+
transaction.cpp
36+
write_session_impl.cpp
37+
write_session.cpp
3638
)
3739

3840
_ydb_sdk_install_targets(TARGETS client-ydb_topic-impl)
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#include "offsets_collector.h"
2+
3+
namespace NYdb::NTopic {
4+
5+
std::vector<TTopicOffsets> TOffsetsCollector::GetOffsets() const
6+
{
7+
std::vector<TTopicOffsets> topics;
8+
9+
for (auto& [path, partitions] : Ranges) {
10+
TTopicOffsets topic;
11+
topic.Path = path;
12+
13+
topics.push_back(std::move(topic));
14+
15+
for (auto& [id, ranges] : partitions) {
16+
TPartitionOffsets partition;
17+
partition.PartitionId = id;
18+
19+
TTopicOffsets& t = topics.back();
20+
t.Partitions.push_back(std::move(partition));
21+
22+
for (auto& range : ranges) {
23+
TPartitionOffsets& p = t.Partitions.back();
24+
25+
TOffsetsRange r;
26+
r.Start = range.first;
27+
r.End = range.second;
28+
29+
p.Offsets.push_back(r);
30+
}
31+
}
32+
}
33+
34+
return topics;
35+
}
36+
37+
void TOffsetsCollector::CollectOffsets(const std::vector<TReadSessionEvent::TEvent>& events)
38+
{
39+
for (auto& event : events) {
40+
if (auto* e = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) {
41+
CollectOffsets(*e);
42+
}
43+
}
44+
}
45+
46+
void TOffsetsCollector::CollectOffsets(const TReadSessionEvent::TEvent& event)
47+
{
48+
if (auto* e = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) {
49+
CollectOffsets(*e);
50+
}
51+
}
52+
53+
void TOffsetsCollector::CollectOffsets(const TReadSessionEvent::TDataReceivedEvent& event)
54+
{
55+
const auto& session = *event.GetPartitionSession();
56+
const std::string& topicPath = session.GetTopicPath();
57+
uint32_t partitionId = session.GetPartitionId();
58+
59+
if (event.HasCompressedMessages()) {
60+
for (auto& message : event.GetCompressedMessages()) {
61+
uint64_t offset = message.GetOffset();
62+
Ranges[topicPath][partitionId].InsertInterval(offset, offset + 1);
63+
}
64+
} else {
65+
for (auto& message : event.GetMessages()) {
66+
uint64_t offset = message.GetOffset();
67+
Ranges[topicPath][partitionId].InsertInterval(offset, offset + 1);
68+
}
69+
}
70+
}
71+
72+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include "topic_impl.h"
4+
#include "transaction.h"
5+
6+
#include <ydb-cpp-sdk/client/table/table.h>
7+
#include <ydb-cpp-sdk/client/topic/read_events.h>
8+
9+
#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>
10+
11+
#include <util/generic/hash.h>
12+
#include <util/generic/string.h>
13+
#include <util/generic/vector.h>
14+
15+
#include <memory>
16+
17+
namespace NYdb::NTopic {
18+
19+
class TOffsetsCollector {
20+
public:
21+
std::vector<TTopicOffsets> GetOffsets() const;
22+
23+
void CollectOffsets(const std::vector<TReadSessionEvent::TEvent>& events);
24+
void CollectOffsets(const TReadSessionEvent::TEvent& event);
25+
26+
private:
27+
// topic -> partition -> (begin, end)
28+
using TOffsetRanges = std::unordered_map<std::string, std::unordered_map<uint64_t, TDisjointIntervalTree<uint64_t>>>;
29+
30+
void CollectOffsets(const TReadSessionEvent::TDataReceivedEvent& event);
31+
32+
TOffsetRanges Ranges;
33+
};
34+
35+
}

src/client/topic/impl/read_session.cpp

Lines changed: 12 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ namespace NYdb::NTopic {
1111

1212
static const std::string DRIVER_IS_STOPPING_DESCRIPTION = "Driver is stopping";
1313

14+
void SetReadInTransaction(TReadSessionEvent::TEvent& event)
15+
{
16+
if (auto* e = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) {
17+
e->SetReadInTransaction();
18+
}
19+
}
20+
1421
TReadSession::TReadSession(const TReadSessionSettings& settings,
1522
std::shared_ptr<TTopicClient::TImpl> client,
1623
std::shared_ptr<TGRpcConnectionsImpl> connections,
@@ -136,12 +143,10 @@ std::vector<TReadSessionEvent::TEvent> TReadSession::GetEvents(const TReadSessio
136143
auto events = GetEvents(settings.Block_, settings.MaxEventsCount_, settings.MaxByteSize_);
137144
if (!events.empty() && settings.Tx_) {
138145
auto& tx = settings.Tx_->get();
146+
CbContext->TryGet()->CollectOffsets(tx, events, Client);
139147
for (auto& event : events) {
140-
if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&event)) {
141-
CollectOffsets(tx, *dataEvent);
142-
}
148+
SetReadInTransaction(event);
143149
}
144-
UpdateOffsets(tx);
145150
}
146151
return events;
147152
}
@@ -157,92 +162,14 @@ std::optional<TReadSessionEvent::TEvent> TReadSession::GetEvent(bool block, size
157162
std::optional<TReadSessionEvent::TEvent> TReadSession::GetEvent(const TReadSessionGetEventSettings& settings)
158163
{
159164
auto event = GetEvent(settings.Block_, settings.MaxByteSize_);
160-
if (event) {
165+
if (event && settings.Tx_) {
161166
auto& tx = settings.Tx_->get();
162-
if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) {
163-
CollectOffsets(tx, *dataEvent);
164-
}
165-
UpdateOffsets(tx);
167+
CbContext->TryGet()->CollectOffsets(tx, *event, Client);
168+
SetReadInTransaction(*event);
166169
}
167170
return event;
168171
}
169172

170-
void TReadSession::CollectOffsets(NTable::TTransaction& tx,
171-
const TReadSessionEvent::TDataReceivedEvent& event)
172-
{
173-
const auto& session = *event.GetPartitionSession();
174-
175-
if (event.HasCompressedMessages()) {
176-
for (auto& message : event.GetCompressedMessages()) {
177-
CollectOffsets(tx, session.GetTopicPath(), session.GetPartitionId(), message.GetOffset());
178-
}
179-
} else {
180-
for (auto& message : event.GetMessages()) {
181-
CollectOffsets(tx, session.GetTopicPath(), session.GetPartitionId(), message.GetOffset());
182-
}
183-
}
184-
}
185-
186-
void TReadSession::CollectOffsets(NTable::TTransaction& tx,
187-
const std::string& topicPath, ui32 partitionId, ui64 offset)
188-
{
189-
const std::string& sessionId = tx.GetSession().GetId();
190-
const std::string& txId = tx.GetId();
191-
TOffsetRanges& ranges = OffsetRanges[std::make_pair(sessionId, txId)];
192-
ranges[topicPath][partitionId].InsertInterval(offset, offset + 1);
193-
}
194-
195-
void TReadSession::UpdateOffsets(const NTable::TTransaction& tx)
196-
{
197-
const std::string& sessionId = tx.GetSession().GetId();
198-
const std::string& txId = tx.GetId();
199-
200-
auto p = OffsetRanges.find(std::make_pair(sessionId, txId));
201-
if (p == OffsetRanges.end()) {
202-
return;
203-
}
204-
205-
std::vector<TTopicOffsets> topics;
206-
for (auto& [path, partitions] : p->second) {
207-
TTopicOffsets topic;
208-
topic.Path = path;
209-
210-
topics.push_back(std::move(topic));
211-
212-
for (auto& [id, ranges] : partitions) {
213-
TPartitionOffsets partition;
214-
partition.PartitionId = id;
215-
216-
TTopicOffsets& t = topics.back();
217-
t.Partitions.push_back(std::move(partition));
218-
219-
for (auto& range : ranges) {
220-
TPartitionOffsets& p = t.Partitions.back();
221-
222-
TOffsetsRange r;
223-
r.Start = range.first;
224-
r.End = range.second;
225-
226-
p.Offsets.push_back(r);
227-
}
228-
}
229-
}
230-
231-
Y_ABORT_UNLESS(!topics.empty());
232-
233-
auto result = Client->UpdateOffsetsInTransaction(tx,
234-
topics,
235-
Settings.ConsumerName_,
236-
{}).GetValueSync();
237-
Y_ABORT_UNLESS(!result.IsTransportError());
238-
239-
if (!result.IsSuccess()) {
240-
ythrow yexception() << "error on update offsets: " << result;
241-
}
242-
243-
OffsetRanges.erase(std::make_pair(sessionId, txId));
244-
}
245-
246173
bool TReadSession::Close(TDuration timeout) {
247174
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
248175
// Log final counters.

src/client/topic/impl/read_session.h

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,6 @@ class TReadSession : public IReadSession {
6363
void AbortImpl(EStatus statusCode, const std::string& message, TDeferredActions<false>& deferred);
6464

6565
private:
66-
using TOffsetRanges = std::unordered_map<std::string, std::unordered_map<ui64, TDisjointIntervalTree<ui64>>>;
67-
68-
void CollectOffsets(NTable::TTransaction& tx,
69-
const TReadSessionEvent::TDataReceivedEvent& event);
70-
void CollectOffsets(NTable::TTransaction& tx,
71-
const std::string& topicPath, ui32 partitionId, ui64 offset);
72-
void UpdateOffsets(const NTable::TTransaction& tx);
73-
74-
//
75-
// (session, tx) -> topic -> partition -> (begin, end)
76-
//
77-
std::unordered_map<std::pair<std::string, std::string>, TOffsetRanges, THash<std::pair<std::string, std::string>>> OffsetRanges;
7866

7967
TReadSessionSettings Settings;
8068
const std::string SessionId;

src/client/topic/impl/read_session_event.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,10 @@ TDataReceivedEvent::TDataReceivedEvent(std::vector<TMessage> messages, std::vect
248248
}
249249

250250
void TDataReceivedEvent::Commit() {
251+
if (ReadInTransaction) {
252+
ythrow yexception() << "Offsets cannot be commited explicitly when reading in a transaction";
253+
}
254+
251255
for (auto [from, to] : OffsetRanges) {
252256
static_cast<TPartitionStreamImpl<false>*>(PartitionSession.Get())->Commit(from, to);
253257
}

0 commit comments

Comments
 (0)