Skip to content

Commit cc0a336

Browse files
The Commit call waits Acks (#9761)
1 parent 71f2a5e commit cc0a336

File tree

17 files changed

+503
-163
lines changed

17 files changed

+503
-163
lines changed

ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,16 @@ class TFederatedWriteSession : public NTopic::IWriteSession,
155155
NThreading::TFuture<ui64> GetInitSeqNo() override {
156156
return TryGetImpl()->GetInitSeqNo();
157157
}
158-
void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message) override {
158+
void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message, NTable::TTransaction* tx = nullptr) override {
159+
if (tx) {
160+
ythrow yexception() << "transactions are not supported";
161+
}
159162
TryGetImpl()->Write(std::move(continuationToken), std::move(message));
160163
}
161-
void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params) override {
164+
void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params, NTable::TTransaction* tx = nullptr) override {
165+
if (tx) {
166+
ythrow yexception() << "transactions are not supported";
167+
}
162168
TryGetImpl()->WriteEncoded(std::move(continuationToken), std::move(params));
163169
}
164170
void Write(NTopic::TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(),

ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ namespace NTable {
2020

2121
using TSessionInspectorFn = std::function<void(TAsyncCreateSessionResult future)>;
2222

23-
2423
class TSession::TImpl : public TKqpSessionCommon {
2524
friend class TTableClient;
2625
friend class TSession;

ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -646,12 +646,12 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio
646646
return promise.GetFuture();
647647
}
648648

649-
TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TTransaction& tx,
649+
TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TString& txId,
650650
const TCommitTxSettings& settings)
651651
{
652652
auto request = MakeOperationRequest<Ydb::Table::CommitTransactionRequest>(settings);
653653
request.set_session_id(session.GetId());
654-
request.set_tx_id(tx.GetId());
654+
request.set_tx_id(txId);
655655
request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));
656656

657657
auto promise = NewPromise<TCommitTransactionResult>();
@@ -684,12 +684,12 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess
684684
return promise.GetFuture();
685685
}
686686

687-
TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TTransaction& tx,
687+
TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TString& txId,
688688
const TRollbackTxSettings& settings)
689689
{
690690
auto request = MakeOperationRequest<Ydb::Table::RollbackTransactionRequest>(settings);
691691
request.set_session_id(session.GetId());
692-
request.set_tx_id(tx.GetId());
692+
request.set_tx_id(txId);
693693

694694
return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>(
695695
std::move(request),

ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public
109109

110110
TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings,
111111
const TBeginTxSettings& settings);
112-
TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TTransaction& tx,
112+
TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TString& txId,
113113
const TCommitTxSettings& settings);
114-
TAsyncStatus RollbackTransaction(const TSession& session, const TTransaction& tx,
114+
TAsyncStatus RollbackTransaction(const TSession& session, const TString& txId,
115115
const TRollbackTxSettings& settings);
116116

117117
TAsyncExplainDataQueryResult ExplainDataQuery(const TSession& session, const TString& query,
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#include "transaction.h"
2+
#include "table_client.h"
3+
4+
namespace NYdb::NTable {
5+
6+
TTransaction::TImpl::TImpl(const TSession& session, const TString& txId)
7+
: Session_(session)
8+
, TxId_(txId)
9+
{
10+
}
11+
12+
TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSettings& settings)
13+
{
14+
ChangesAreAccepted = false;
15+
16+
auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {}));
17+
18+
for (auto& callback : PrecommitCallbacks) {
19+
auto action = [curr = callback()](const TAsyncStatus& prev) {
20+
if (const TStatus& status = prev.GetValue(); !status.IsSuccess()) {
21+
return prev;
22+
}
23+
24+
return curr;
25+
};
26+
27+
result = result.Apply(action);
28+
}
29+
30+
auto precommitsCompleted = [this, settings](const TAsyncStatus& result) mutable {
31+
if (const TStatus& status = result.GetValue(); !status.IsSuccess()) {
32+
return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), Nothing()));
33+
}
34+
35+
return Session_.Client_->CommitTransaction(Session_,
36+
TxId_,
37+
settings);
38+
};
39+
40+
return result.Apply(precommitsCompleted);
41+
}
42+
43+
TAsyncStatus TTransaction::TImpl::Rollback(const TRollbackTxSettings& settings)
44+
{
45+
ChangesAreAccepted = false;
46+
return Session_.Client_->RollbackTransaction(Session_, TxId_, settings);
47+
}
48+
49+
void TTransaction::TImpl::AddPrecommitCallback(TPrecommitTransactionCallback cb)
50+
{
51+
if (!ChangesAreAccepted) {
52+
ythrow TContractViolation("Changes are no longer accepted");
53+
}
54+
55+
PrecommitCallbacks.push_back(std::move(cb));
56+
}
57+
58+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#pragma once
2+
3+
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
4+
5+
namespace NYdb::NTable {
6+
7+
class TTransaction::TImpl {
8+
public:
9+
TImpl(const TSession& session, const TString& txId);
10+
11+
const TString& GetId() const {
12+
return TxId_;
13+
}
14+
15+
bool IsActive() const {
16+
return !TxId_.empty();
17+
}
18+
19+
TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
20+
TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());
21+
22+
TSession GetSession() const {
23+
return Session_;
24+
}
25+
26+
void AddPrecommitCallback(TPrecommitTransactionCallback cb);
27+
28+
private:
29+
TSession Session_;
30+
TString TxId_;
31+
32+
bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet
33+
TVector<TPrecommitTransactionCallback> PrecommitCallbacks;
34+
};
35+
36+
}

ydb/public/sdk/cpp/client/ydb_table/impl/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SRCS(
66
readers.cpp
77
request_migrator.cpp
88
table_client.cpp
9+
transaction.cpp
910
)
1011

1112
PEERDIR(

ydb/public/sdk/cpp/client/ydb_table/table.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <ydb/public/sdk/cpp/client/ydb_table/impl/data_query.h>
1919
#include <ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h>
2020
#include <ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h>
21+
#include <ydb/public/sdk/cpp/client/ydb_table/impl/transaction.h>
2122
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
2223

2324
#include <google/protobuf/util/time_util.h>
@@ -1997,16 +1998,35 @@ TTxControl::TTxControl(const TTxSettings& begin)
19971998
////////////////////////////////////////////////////////////////////////////////
19981999

19992000
TTransaction::TTransaction(const TSession& session, const TString& txId)
2000-
: Session_(session)
2001-
, TxId_(txId)
2001+
: TransactionImpl_(new TTransaction::TImpl(session, txId))
20022002
{}
20032003

2004+
const TString& TTransaction::GetId() const
2005+
{
2006+
return TransactionImpl_->GetId();
2007+
}
2008+
2009+
bool TTransaction::IsActive() const
2010+
{
2011+
return TransactionImpl_->IsActive();
2012+
}
2013+
20042014
TAsyncCommitTransactionResult TTransaction::Commit(const TCommitTxSettings& settings) {
2005-
return Session_.Client_->CommitTransaction(Session_, *this, settings);
2015+
return TransactionImpl_->Commit(settings);
20062016
}
20072017

20082018
TAsyncStatus TTransaction::Rollback(const TRollbackTxSettings& settings) {
2009-
return Session_.Client_->RollbackTransaction(Session_, *this, settings);
2019+
return TransactionImpl_->Rollback(settings);
2020+
}
2021+
2022+
TSession TTransaction::GetSession() const
2023+
{
2024+
return TransactionImpl_->GetSession();
2025+
}
2026+
2027+
void TTransaction::AddPrecommitCallback(TPrecommitTransactionCallback cb)
2028+
{
2029+
TransactionImpl_->AddPrecommitCallback(std::move(cb));
20102030
}
20112031

20122032
////////////////////////////////////////////////////////////////////////////////

ydb/public/sdk/cpp/client/ydb_table/table.h

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1692,6 +1692,8 @@ struct TReadTableSettings : public TRequestSettings<TReadTableSettings> {
16921692
FLUENT_SETTING_OPTIONAL(bool, ReturnNotNullAsOptional);
16931693
};
16941694

1695+
using TPrecommitTransactionCallback = std::function<TAsyncStatus ()>;
1696+
16951697
//! Represents all session operations
16961698
//! Session is transparent logic representation of connection
16971699
class TSession {
@@ -1829,26 +1831,22 @@ TAsyncStatus TTableClient::RetryOperation(
18291831
class TTransaction {
18301832
friend class TTableClient;
18311833
public:
1832-
const TString& GetId() const {
1833-
return TxId_;
1834-
}
1835-
1836-
bool IsActive() const {
1837-
return !TxId_.empty();
1838-
}
1834+
const TString& GetId() const;
1835+
bool IsActive() const;
18391836

18401837
TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
18411838
TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());
18421839

1843-
TSession GetSession() const {
1844-
return Session_;
1845-
}
1840+
TSession GetSession() const;
1841+
1842+
void AddPrecommitCallback(TPrecommitTransactionCallback cb);
18461843

18471844
private:
18481845
TTransaction(const TSession& session, const TString& txId);
18491846

1850-
TSession Session_;
1851-
TString TxId_;
1847+
class TImpl;
1848+
1849+
std::shared_ptr<TImpl> TransactionImpl_;
18521850
};
18531851

18541852
////////////////////////////////////////////////////////////////////////////////

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,21 @@ void TReadSession::UpdateOffsets(const NTable::TTransaction& tx)
230230

231231
Y_ABORT_UNLESS(!topics.empty());
232232

233-
auto result = Client->UpdateOffsetsInTransaction(tx,
234-
topics,
235-
Settings.ConsumerName_,
236-
{}).GetValueSync();
237-
Y_ABORT_UNLESS(!result.IsTransportError());
238-
239-
if (!result.IsSuccess()) {
240-
ythrow yexception() << "error on update offsets: " << result;
233+
while (true) {
234+
auto result = Client->UpdateOffsetsInTransaction(tx,
235+
topics,
236+
Settings.ConsumerName_,
237+
{}).GetValueSync();
238+
Y_ABORT_UNLESS(!result.IsTransportError());
239+
240+
if (result.GetStatus() != EStatus::SESSION_BUSY) {
241+
if (!result.IsSuccess()) {
242+
ythrow yexception() << "error on update offsets: " << result;
243+
}
244+
break;
245+
}
246+
247+
Sleep(TDuration::MilliSeconds(1));
241248
}
242249

243250
OffsetRanges.erase(std::make_pair(sessionId, txId));

0 commit comments

Comments
 (0)