Skip to content

Commit 12b392e

Browse files
Alek5andr-KotovGazizonoki
authored andcommitted
Moved commit "The Commit call waits Acks" from ydb repo
1 parent cd37952 commit 12b392e

File tree

17 files changed

+496
-168
lines changed

17 files changed

+496
-168
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#include <ydb-cpp-sdk/client/topic/client.h>
2+
#include <ydb-cpp-sdk/client/table/table.h>
3+
4+
void ThrowOnError(const NYdb::TStatus& status)
5+
{
6+
if (status.IsSuccess()) {
7+
return;
8+
}
9+
10+
ythrow yexception() << status;
11+
}
12+
13+
int main()
14+
{
15+
const std::string ENDPOINT = "HOST:PORT";
16+
const std::string DATABASE = "DATABASE";
17+
const std::string TOPIC = "PATH/TO/TOPIC";
18+
19+
NYdb::TDriverConfig config;
20+
config.SetEndpoint(ENDPOINT);
21+
config.SetDatabase(DATABASE);
22+
NYdb::TDriver driver(config);
23+
24+
NYdb::NTable::TTableClient tableClient(driver);
25+
auto getTableSessionResult = tableClient.GetSession().GetValueSync();
26+
ThrowOnError(getTableSessionResult);
27+
auto tableSession = getTableSessionResult.GetSession();
28+
29+
NYdb::NTopic::TTopicClient topicClient(driver);
30+
auto topicSessionSettings = NYdb::NTopic::TWriteSessionSettings()
31+
.Path(TOPIC)
32+
.DeduplicationEnabled(true);
33+
auto topicSession = topicClient.CreateSimpleBlockingWriteSession(topicSessionSettings);
34+
35+
auto beginTransactionResult = tableSession.BeginTransaction().GetValueSync();
36+
ThrowOnError(beginTransactionResult);
37+
auto transaction = beginTransactionResult.GetTransaction();
38+
39+
NYdb::NTopic::TWriteMessage writeMessage("message");
40+
41+
topicSession->Write(std::move(writeMessage), &transaction);
42+
43+
transaction.Commit().GetValueSync();
44+
}

include/ydb-cpp-sdk/client/table/table.h

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1694,6 +1694,8 @@ struct TReadTableSettings : public TRequestSettings<TReadTableSettings> {
16941694
FLUENT_SETTING_OPTIONAL(bool, ReturnNotNullAsOptional);
16951695
};
16961696

1697+
using TPrecommitTransactionCallback = std::function<TAsyncStatus ()>;
1698+
16971699
//! Represents all session operations
16981700
//! Session is transparent logic representation of connection
16991701
class TSession {
@@ -1831,26 +1833,21 @@ TAsyncStatus TTableClient::RetryOperation(
18311833
class TTransaction {
18321834
friend class TTableClient;
18331835
public:
1834-
const std::string& GetId() const {
1835-
return TxId_;
1836-
}
1837-
1838-
bool IsActive() const {
1839-
return !TxId_.empty();
1840-
}
1836+
const std::string& GetId() const;
1837+
bool IsActive() const;
18411838

18421839
TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
18431840
TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());
18441841

1845-
TSession GetSession() const {
1846-
return Session_;
1847-
}
1842+
TSession GetSession() const;
1843+
void AddPrecommitCallback(TPrecommitTransactionCallback cb);
18481844

18491845
private:
18501846
TTransaction(const TSession& session, const std::string& txId);
18511847

1852-
TSession Session_;
1853-
std::string TxId_;
1848+
class TImpl;
1849+
1850+
std::shared_ptr<TImpl> TransactionImpl_;
18541851
};
18551852

18561853
////////////////////////////////////////////////////////////////////////////////

include/ydb-cpp-sdk/client/topic/write_session.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ namespace NYdb::NTable {
1717

1818
namespace NYdb::NTopic {
1919

20+
using TTransaction = NTable::TTransaction;
21+
2022
//! Settings for write session.
2123
struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> {
2224
using TSelf = TWriteSessionSettings;
@@ -190,9 +192,9 @@ struct TWriteMessage {
190192
FLUENT_SETTING(TMessageMeta, MessageMeta);
191193

192194
//! Transaction id
193-
FLUENT_SETTING_OPTIONAL(std::reference_wrapper<NTable::TTransaction>, Tx);
195+
FLUENT_SETTING_OPTIONAL(std::reference_wrapper<TTransaction>, Tx);
194196

195-
const NTable::TTransaction* GetTxPtr() const
197+
TTransaction* GetTxPtr() const
196198
{
197199
return Tx_ ? &Tx_->get() : nullptr;
198200
}
@@ -204,7 +206,9 @@ class ISimpleBlockingWriteSession : public TThrRefBase {
204206
//! Write single message. Blocks for up to blockTimeout if inflight is full or memoryUsage is exceeded;
205207
//! return - true if write succeeded, false if message was not enqueued for write within blockTimeout.
206208
//! no Ack is provided.
207-
virtual bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) = 0;
209+
virtual bool Write(TWriteMessage&& message,
210+
NTable::TTransaction* tx = nullptr,
211+
const TDuration& blockTimeout = TDuration::Max()) = 0;
208212

209213

210214
//! Write single message. Deprecated method with only basic message options.
@@ -249,15 +253,17 @@ class IWriteSession {
249253

250254
//! Write single message.
251255
//! continuationToken - a token earlier provided to client with ReadyToAccept event.
252-
virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) = 0;
256+
virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message,
257+
NTable::TTransaction* tx = nullptr) = 0;
253258

254259
//! Write single message. Old method with only basic message options.
255260
virtual void Write(TContinuationToken&& continuationToken, std::string_view data, std::optional<uint64_t> seqNo = std::nullopt,
256261
std::optional<TInstant> createTimestamp = std::nullopt) = 0;
257262

258263
//! Write single message that is already coded by codec.
259264
//! continuationToken - a token earlier provided to client with ReadyToAccept event.
260-
virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params) = 0;
265+
virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params,
266+
NTable::TTransaction* tx = nullptr) = 0;
261267

262268
//! Write single message that is already compressed by codec. Old method with only basic message options.
263269
virtual void WriteEncoded(TContinuationToken&& continuationToken, std::string_view data, ECodec codec, uint32_t originalSize,

src/client/federated_topic/impl/federated_write_session.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,16 @@ class TFederatedWriteSession : public NTopic::IWriteSession,
154154
NThreading::TFuture<uint64_t> GetInitSeqNo() override {
155155
return TryGetImpl()->GetInitSeqNo();
156156
}
157-
void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message) override {
157+
void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message, NTable::TTransaction* tx = nullptr) override {
158+
if (tx) {
159+
ythrow yexception() << "transactions are not supported";
160+
}
158161
TryGetImpl()->Write(std::move(continuationToken), std::move(message));
159162
}
160-
void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params) override {
163+
void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params, NTable::TTransaction* tx = nullptr) override {
164+
if (tx) {
165+
ythrow yexception() << "transactions are not supported";
166+
}
161167
TryGetImpl()->WriteEncoded(std::move(continuationToken), std::move(params));
162168
}
163169
void Write(NTopic::TContinuationToken&& continuationToken, std::string_view data, std::optional<uint64_t> seqNo = std::nullopt,

src/client/table/impl/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ target_sources(client-ydb_table-impl PRIVATE
2121
readers.cpp
2222
request_migrator.cpp
2323
table_client.cpp
24+
transaction.cpp
2425
)
2526

2627
target_compile_options(client-ydb_table-impl PRIVATE

src/client/table/impl/client_session.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ namespace NTable {
2020

2121
using TSessionInspectorFn = std::function<void(TAsyncCreateSessionResult future)>;
2222

23-
2423
class TSession::TImpl : public TKqpSessionCommon {
2524
friend class TTableClient;
2625
friend class TSession;

src/client/table/impl/table_client.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -646,12 +646,12 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio
646646
return promise.GetFuture();
647647
}
648648

649-
TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TTransaction& tx,
649+
TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const std::string& txId,
650650
const TCommitTxSettings& settings)
651651
{
652652
auto request = MakeOperationRequest<Ydb::Table::CommitTransactionRequest>(settings);
653653
request.set_session_id(TStringType{session.GetId()});
654-
request.set_tx_id(TStringType{tx.GetId()});
654+
request.set_tx_id(TStringType{txId});
655655
request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));
656656

657657
auto promise = NewPromise<TCommitTransactionResult>();
@@ -684,12 +684,12 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess
684684
return promise.GetFuture();
685685
}
686686

687-
TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TTransaction& tx,
687+
TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const std::string& txId,
688688
const TRollbackTxSettings& settings)
689689
{
690690
auto request = MakeOperationRequest<Ydb::Table::RollbackTransactionRequest>(settings);
691691
request.set_session_id(TStringType{session.GetId()});
692-
request.set_tx_id(TStringType{tx.GetId()});
692+
request.set_tx_id(TStringType{txId});
693693

694694
return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>(
695695
std::move(request),

src/client/table/impl/table_client.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public
109109

110110
TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings,
111111
const TBeginTxSettings& settings);
112-
TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TTransaction& tx,
112+
TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const std::string& txId,
113113
const TCommitTxSettings& settings);
114-
TAsyncStatus RollbackTransaction(const TSession& session, const TTransaction& tx,
114+
TAsyncStatus RollbackTransaction(const TSession& session, const std::string& txId,
115115
const TRollbackTxSettings& settings);
116116

117117
TAsyncExplainDataQueryResult ExplainDataQuery(const TSession& session, const std::string& query,

src/client/table/impl/transaction.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#include "transaction.h"
2+
#include "table_client.h"
3+
4+
namespace NYdb::NTable {
5+
6+
TTransaction::TImpl::TImpl(const TSession& session, const std::string& txId)
7+
: Session_(session)
8+
, TxId_(txId)
9+
{
10+
}
11+
12+
TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSettings& settings)
13+
{
14+
ChangesAreAccepted = false;
15+
16+
auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {}));
17+
18+
for (auto& callback : PrecommitCallbacks) {
19+
auto action = [curr = callback()](const TAsyncStatus& prev) {
20+
if (const TStatus& status = prev.GetValue(); !status.IsSuccess()) {
21+
return prev;
22+
}
23+
24+
return curr;
25+
};
26+
27+
result = result.Apply(action);
28+
}
29+
30+
auto precommitsCompleted = [this, settings](const TAsyncStatus& result) mutable {
31+
if (const TStatus& status = result.GetValue(); !status.IsSuccess()) {
32+
return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), std::nullopt));
33+
}
34+
35+
return Session_.Client_->CommitTransaction(Session_,
36+
TxId_,
37+
settings);
38+
};
39+
40+
return result.Apply(precommitsCompleted);
41+
}
42+
43+
TAsyncStatus TTransaction::TImpl::Rollback(const TRollbackTxSettings& settings)
44+
{
45+
ChangesAreAccepted = false;
46+
return Session_.Client_->RollbackTransaction(Session_, TxId_, settings);
47+
}
48+
49+
void TTransaction::TImpl::AddPrecommitCallback(TPrecommitTransactionCallback cb)
50+
{
51+
if (!ChangesAreAccepted) {
52+
ythrow TContractViolation("Changes are no longer accepted");
53+
}
54+
55+
PrecommitCallbacks.push_back(std::move(cb));
56+
}
57+
58+
}

src/client/table/impl/transaction.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#pragma once
2+
3+
#include <ydb-cpp-sdk/client/table/table.h>
4+
5+
namespace NYdb::NTable {
6+
7+
class TTransaction::TImpl {
8+
public:
9+
TImpl(const TSession& session, const std::string& txId);
10+
11+
const std::string& GetId() const {
12+
return TxId_;
13+
}
14+
15+
bool IsActive() const {
16+
return !TxId_.empty();
17+
}
18+
19+
TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
20+
TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());
21+
22+
TSession GetSession() const {
23+
return Session_;
24+
}
25+
26+
void AddPrecommitCallback(TPrecommitTransactionCallback cb);
27+
28+
private:
29+
TSession Session_;
30+
std::string TxId_;
31+
32+
bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet
33+
std::vector<TPrecommitTransactionCallback> PrecommitCallbacks;
34+
};
35+
36+
}

src/client/table/table.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <src/client/table/impl/data_query.h>
1919
#include <src/client/table/impl/request_migrator.h>
2020
#include <src/client/table/impl/table_client.h>
21+
#include <src/client/table/impl/transaction.h>
2122
#include <ydb-cpp-sdk/client/resources/ydb_resources.h>
2223

2324
#include <google/protobuf/util/time_util.h>
@@ -1998,16 +1999,35 @@ TTxControl::TTxControl(const TTxSettings& begin)
19981999
////////////////////////////////////////////////////////////////////////////////
19992000

20002001
TTransaction::TTransaction(const TSession& session, const std::string& txId)
2001-
: Session_(session)
2002-
, TxId_(txId)
2002+
: TransactionImpl_(new TTransaction::TImpl(session, txId))
20032003
{}
20042004

2005+
const std::string& TTransaction::GetId() const
2006+
{
2007+
return TransactionImpl_->GetId();
2008+
}
2009+
2010+
bool TTransaction::IsActive() const
2011+
{
2012+
return TransactionImpl_->IsActive();
2013+
}
2014+
20052015
TAsyncCommitTransactionResult TTransaction::Commit(const TCommitTxSettings& settings) {
2006-
return Session_.Client_->CommitTransaction(Session_, *this, settings);
2016+
return TransactionImpl_->Commit(settings);
20072017
}
20082018

20092019
TAsyncStatus TTransaction::Rollback(const TRollbackTxSettings& settings) {
2010-
return Session_.Client_->RollbackTransaction(Session_, *this, settings);
2020+
return TransactionImpl_->Rollback(settings);
2021+
}
2022+
2023+
TSession TTransaction::GetSession() const
2024+
{
2025+
return TransactionImpl_->GetSession();
2026+
}
2027+
2028+
void TTransaction::AddPrecommitCallback(TPrecommitTransactionCallback cb)
2029+
{
2030+
TransactionImpl_->AddPrecommitCallback(std::move(cb));
20112031
}
20122032

20132033
////////////////////////////////////////////////////////////////////////////////

src/client/topic/impl/read_session.cpp

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -230,19 +230,14 @@ void TReadSession::UpdateOffsets(const NTable::TTransaction& tx)
230230

231231
Y_ABORT_UNLESS(!topics.empty());
232232

233-
while (true) {
234-
auto result = Client->UpdateOffsetsInTransaction(tx,
235-
topics,
236-
Settings.ConsumerName_,
237-
{}).GetValueSync();
238-
Y_ABORT_UNLESS(!result.IsTransportError());
239-
if (result.GetStatus() != EStatus::SESSION_BUSY) {
240-
if (!result.IsSuccess()) {
241-
ythrow yexception() << "error on update offsets: " << result;
242-
}
243-
break;
244-
}
245-
Sleep(TDuration::MilliSeconds(1));
233+
auto result = Client->UpdateOffsetsInTransaction(tx,
234+
topics,
235+
Settings.ConsumerName_,
236+
{}).GetValueSync();
237+
Y_ABORT_UNLESS(!result.IsTransportError());
238+
239+
if (!result.IsSuccess()) {
240+
ythrow yexception() << "error on update offsets: " << result;
246241
}
247242

248243
OffsetRanges.erase(std::make_pair(sessionId, txId));

0 commit comments

Comments
 (0)