diff --git a/examples/topic_writer/transaction/main.cpp b/examples/topic_writer/transaction/main.cpp new file mode 100644 index 0000000000..23ffdbbb1c --- /dev/null +++ b/examples/topic_writer/transaction/main.cpp @@ -0,0 +1,44 @@ +#include +#include + +void ThrowOnError(const NYdb::TStatus& status) +{ + if (status.IsSuccess()) { + return; + } + + ythrow yexception() << status; +} + +int main() +{ + const std::string ENDPOINT = "HOST:PORT"; + const std::string DATABASE = "DATABASE"; + const std::string TOPIC = "PATH/TO/TOPIC"; + + NYdb::TDriverConfig config; + config.SetEndpoint(ENDPOINT); + config.SetDatabase(DATABASE); + NYdb::TDriver driver(config); + + NYdb::NTable::TTableClient tableClient(driver); + auto getTableSessionResult = tableClient.GetSession().GetValueSync(); + ThrowOnError(getTableSessionResult); + auto tableSession = getTableSessionResult.GetSession(); + + NYdb::NTopic::TTopicClient topicClient(driver); + auto topicSessionSettings = NYdb::NTopic::TWriteSessionSettings() + .Path(TOPIC) + .DeduplicationEnabled(true); + auto topicSession = topicClient.CreateSimpleBlockingWriteSession(topicSessionSettings); + + auto beginTransactionResult = tableSession.BeginTransaction().GetValueSync(); + ThrowOnError(beginTransactionResult); + auto transaction = beginTransactionResult.GetTransaction(); + + NYdb::NTopic::TWriteMessage writeMessage("message"); + + topicSession->Write(std::move(writeMessage), &transaction); + + transaction.Commit().GetValueSync(); +} diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index 7f98bc6a2c..9214c2254e 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -1694,6 +1694,8 @@ struct TReadTableSettings : public TRequestSettings { FLUENT_SETTING_OPTIONAL(bool, ReturnNotNullAsOptional); }; +using TPrecommitTransactionCallback = std::function; + //! Represents all session operations //! Session is transparent logic representation of connection class TSession { @@ -1831,26 +1833,21 @@ TAsyncStatus TTableClient::RetryOperation( class TTransaction { friend class TTableClient; public: - const std::string& GetId() const { - return TxId_; - } - - bool IsActive() const { - return !TxId_.empty(); - } + const std::string& GetId() const; + bool IsActive() const; TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings()); TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings()); - TSession GetSession() const { - return Session_; - } + TSession GetSession() const; + void AddPrecommitCallback(TPrecommitTransactionCallback cb); private: TTransaction(const TSession& session, const std::string& txId); - TSession Session_; - std::string TxId_; + class TImpl; + + std::shared_ptr TransactionImpl_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/include/ydb-cpp-sdk/client/topic/write_session.h b/include/ydb-cpp-sdk/client/topic/write_session.h index aed5cc9a8c..45e995a235 100644 --- a/include/ydb-cpp-sdk/client/topic/write_session.h +++ b/include/ydb-cpp-sdk/client/topic/write_session.h @@ -17,6 +17,8 @@ namespace NYdb::NTable { namespace NYdb::NTopic { +using TTransaction = NTable::TTransaction; + //! Settings for write session. struct TWriteSessionSettings : public TRequestSettings { using TSelf = TWriteSessionSettings; @@ -190,9 +192,9 @@ struct TWriteMessage { FLUENT_SETTING(TMessageMeta, MessageMeta); //! Transaction id - FLUENT_SETTING_OPTIONAL(std::reference_wrapper, Tx); + FLUENT_SETTING_OPTIONAL(std::reference_wrapper, Tx); - const NTable::TTransaction* GetTxPtr() const + TTransaction* GetTxPtr() const { return Tx_ ? &Tx_->get() : nullptr; } @@ -204,7 +206,9 @@ class ISimpleBlockingWriteSession : public TThrRefBase { //! Write single message. Blocks for up to blockTimeout if inflight is full or memoryUsage is exceeded; //! return - true if write succeeded, false if message was not enqueued for write within blockTimeout. //! no Ack is provided. - virtual bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) = 0; + virtual bool Write(TWriteMessage&& message, + NTable::TTransaction* tx = nullptr, + const TDuration& blockTimeout = TDuration::Max()) = 0; //! Write single message. Deprecated method with only basic message options. @@ -249,7 +253,8 @@ class IWriteSession { //! Write single message. //! continuationToken - a token earlier provided to client with ReadyToAccept event. - virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) = 0; + virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message, + NTable::TTransaction* tx = nullptr) = 0; //! Write single message. Old method with only basic message options. virtual void Write(TContinuationToken&& continuationToken, std::string_view data, std::optional seqNo = std::nullopt, @@ -257,7 +262,8 @@ class IWriteSession { //! Write single message that is already coded by codec. //! continuationToken - a token earlier provided to client with ReadyToAccept event. - virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params) = 0; + virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params, + NTable::TTransaction* tx = nullptr) = 0; //! Write single message that is already compressed by codec. Old method with only basic message options. virtual void WriteEncoded(TContinuationToken&& continuationToken, std::string_view data, ECodec codec, uint32_t originalSize, diff --git a/src/client/federated_topic/impl/federated_write_session.h b/src/client/federated_topic/impl/federated_write_session.h index 121eb63fbd..efe85fbef0 100644 --- a/src/client/federated_topic/impl/federated_write_session.h +++ b/src/client/federated_topic/impl/federated_write_session.h @@ -154,10 +154,16 @@ class TFederatedWriteSession : public NTopic::IWriteSession, NThreading::TFuture GetInitSeqNo() override { return TryGetImpl()->GetInitSeqNo(); } - void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message) override { + void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message, NTable::TTransaction* tx = nullptr) override { + if (tx) { + ythrow yexception() << "transactions are not supported"; + } TryGetImpl()->Write(std::move(continuationToken), std::move(message)); } - void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params) override { + void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params, NTable::TTransaction* tx = nullptr) override { + if (tx) { + ythrow yexception() << "transactions are not supported"; + } TryGetImpl()->WriteEncoded(std::move(continuationToken), std::move(params)); } void Write(NTopic::TContinuationToken&& continuationToken, std::string_view data, std::optional seqNo = std::nullopt, diff --git a/src/client/table/impl/CMakeLists.txt b/src/client/table/impl/CMakeLists.txt index 343b00fcde..b93290b046 100644 --- a/src/client/table/impl/CMakeLists.txt +++ b/src/client/table/impl/CMakeLists.txt @@ -21,6 +21,7 @@ target_sources(client-ydb_table-impl PRIVATE readers.cpp request_migrator.cpp table_client.cpp + transaction.cpp ) target_compile_options(client-ydb_table-impl PRIVATE diff --git a/src/client/table/impl/client_session.h b/src/client/table/impl/client_session.h index 3de2f03211..4e4a9aeb93 100644 --- a/src/client/table/impl/client_session.h +++ b/src/client/table/impl/client_session.h @@ -20,7 +20,6 @@ namespace NTable { using TSessionInspectorFn = std::function; - class TSession::TImpl : public TKqpSessionCommon { friend class TTableClient; friend class TSession; diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index b5863abf8f..c75479503d 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -646,12 +646,12 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio return promise.GetFuture(); } -TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TTransaction& tx, +TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const std::string& txId, const TCommitTxSettings& settings) { auto request = MakeOperationRequest(settings); request.set_session_id(TStringType{session.GetId()}); - request.set_tx_id(TStringType{tx.GetId()}); + request.set_tx_id(TStringType{txId}); request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); auto promise = NewPromise(); @@ -684,12 +684,12 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess return promise.GetFuture(); } -TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TTransaction& tx, +TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const std::string& txId, const TRollbackTxSettings& settings) { auto request = MakeOperationRequest(settings); request.set_session_id(TStringType{session.GetId()}); - request.set_tx_id(TStringType{tx.GetId()}); + request.set_tx_id(TStringType{txId}); return RunSimple( std::move(request), diff --git a/src/client/table/impl/table_client.h b/src/client/table/impl/table_client.h index c44d907dd1..7f66e4193f 100644 --- a/src/client/table/impl/table_client.h +++ b/src/client/table/impl/table_client.h @@ -109,9 +109,9 @@ class TTableClient::TImpl: public TClientImplCommon, public TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings, const TBeginTxSettings& settings); - TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TTransaction& tx, + TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const std::string& txId, const TCommitTxSettings& settings); - TAsyncStatus RollbackTransaction(const TSession& session, const TTransaction& tx, + TAsyncStatus RollbackTransaction(const TSession& session, const std::string& txId, const TRollbackTxSettings& settings); TAsyncExplainDataQueryResult ExplainDataQuery(const TSession& session, const std::string& query, diff --git a/src/client/table/impl/transaction.cpp b/src/client/table/impl/transaction.cpp new file mode 100644 index 0000000000..ccd56f1dde --- /dev/null +++ b/src/client/table/impl/transaction.cpp @@ -0,0 +1,58 @@ +#include "transaction.h" +#include "table_client.h" + +namespace NYdb::NTable { + +TTransaction::TImpl::TImpl(const TSession& session, const std::string& txId) + : Session_(session) + , TxId_(txId) +{ +} + +TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSettings& settings) +{ + ChangesAreAccepted = false; + + auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {})); + + for (auto& callback : PrecommitCallbacks) { + auto action = [curr = callback()](const TAsyncStatus& prev) { + if (const TStatus& status = prev.GetValue(); !status.IsSuccess()) { + return prev; + } + + return curr; + }; + + result = result.Apply(action); + } + + auto precommitsCompleted = [this, settings](const TAsyncStatus& result) mutable { + if (const TStatus& status = result.GetValue(); !status.IsSuccess()) { + return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), std::nullopt)); + } + + return Session_.Client_->CommitTransaction(Session_, + TxId_, + settings); + }; + + return result.Apply(precommitsCompleted); +} + +TAsyncStatus TTransaction::TImpl::Rollback(const TRollbackTxSettings& settings) +{ + ChangesAreAccepted = false; + return Session_.Client_->RollbackTransaction(Session_, TxId_, settings); +} + +void TTransaction::TImpl::AddPrecommitCallback(TPrecommitTransactionCallback cb) +{ + if (!ChangesAreAccepted) { + ythrow TContractViolation("Changes are no longer accepted"); + } + + PrecommitCallbacks.push_back(std::move(cb)); +} + +} diff --git a/src/client/table/impl/transaction.h b/src/client/table/impl/transaction.h new file mode 100644 index 0000000000..32b336af4b --- /dev/null +++ b/src/client/table/impl/transaction.h @@ -0,0 +1,36 @@ +#pragma once + +#include + +namespace NYdb::NTable { + +class TTransaction::TImpl { +public: + TImpl(const TSession& session, const std::string& txId); + + const std::string& GetId() const { + return TxId_; + } + + bool IsActive() const { + return !TxId_.empty(); + } + + TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings()); + TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings()); + + TSession GetSession() const { + return Session_; + } + + void AddPrecommitCallback(TPrecommitTransactionCallback cb); + +private: + TSession Session_; + std::string TxId_; + + bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet + std::vector PrecommitCallbacks; +}; + +} diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index 1d13e5dddf..c597b2ddf9 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -1998,16 +1999,35 @@ TTxControl::TTxControl(const TTxSettings& begin) //////////////////////////////////////////////////////////////////////////////// TTransaction::TTransaction(const TSession& session, const std::string& txId) - : Session_(session) - , TxId_(txId) + : TransactionImpl_(new TTransaction::TImpl(session, txId)) {} +const std::string& TTransaction::GetId() const +{ + return TransactionImpl_->GetId(); +} + +bool TTransaction::IsActive() const +{ + return TransactionImpl_->IsActive(); +} + TAsyncCommitTransactionResult TTransaction::Commit(const TCommitTxSettings& settings) { - return Session_.Client_->CommitTransaction(Session_, *this, settings); + return TransactionImpl_->Commit(settings); } TAsyncStatus TTransaction::Rollback(const TRollbackTxSettings& settings) { - return Session_.Client_->RollbackTransaction(Session_, *this, settings); + return TransactionImpl_->Rollback(settings); +} + +TSession TTransaction::GetSession() const +{ + return TransactionImpl_->GetSession(); +} + +void TTransaction::AddPrecommitCallback(TPrecommitTransactionCallback cb) +{ + TransactionImpl_->AddPrecommitCallback(std::move(cb)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/client/topic/impl/common.h b/src/client/topic/impl/common.h index 026bee1818..88c7f858f6 100644 --- a/src/client/topic/impl/common.h +++ b/src/client/topic/impl/common.h @@ -383,6 +383,7 @@ class TBaseSessionEventsQueue : public ISignalable { while (!HasEventsImpl()) { std::unique_lock lk(Mutex, std::adopt_lock); CondVar.wait(lk); + lk.release(); } } diff --git a/src/client/topic/impl/read_session.cpp b/src/client/topic/impl/read_session.cpp index cc5383313a..541481b5d0 100644 --- a/src/client/topic/impl/read_session.cpp +++ b/src/client/topic/impl/read_session.cpp @@ -309,9 +309,10 @@ bool TReadSession::Close(TDuration timeout) { issues.AddIssue(TStringBuilder() << "Session was closed after waiting " << timeout); EventsQueue->Close(TSessionClosedEvent(EStatus::TIMEOUT, std::move(issues)), deferred); } - - std::lock_guard guard(Lock); - Aborting = true; // Set abort flag for doing nothing on destructor. + { + std::lock_guard guard(Lock); + Aborting = true; // Set abort flag for doing nothing on destructor. + } return result; } diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 5e7d976f68..1645142f10 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -2184,22 +2184,23 @@ TReadSessionEventsQueue::GetEvents(bool block, std::option std::vector> eventInfos; const size_t maxCount = maxEventsCount ? *maxEventsCount : std::numeric_limits::max(); TUserRetrievedEventsInfoAccumulator accumulator; + { + std::lock_guard guard(TParent::Mutex); + eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.has_value(), maxCount)); + do { + if (block) { + TParent::WaitEventsImpl(); + } - std::lock_guard guard(TParent::Mutex); - eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.has_value(), maxCount)); - do { - if (block) { - TParent::WaitEventsImpl(); - } - - while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) { - TReadSessionEventInfo event = GetEventImpl(maxByteSize, accumulator); - eventInfos.emplace_back(std::move(event)); - if (eventInfos.back().IsSessionClosedEvent()) { - break; + while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) { + TReadSessionEventInfo event = GetEventImpl(maxByteSize, accumulator); + eventInfos.emplace_back(std::move(event)); + if (eventInfos.back().IsSessionClosedEvent()) { + break; + } } - } - } while (block && eventInfos.empty()); + } while (block && eventInfos.empty()); + } accumulator.OnUserRetrievedEvent(); @@ -2222,18 +2223,19 @@ TReadSessionEventsQueue::GetEvent(bool block, size_t maxBy std::optional> eventInfo; TUserRetrievedEventsInfoAccumulator accumulator; + { + std::lock_guard guard(TParent::Mutex); + do { + if (block) { + TParent::WaitEventsImpl(); + } - std::lock_guard guard(TParent::Mutex); - do { - if (block) { - TParent::WaitEventsImpl(); - } - - if (TParent::HasEventsImpl()) { - eventInfo = GetEventImpl(maxByteSize, accumulator); - } + if (TParent::HasEventsImpl()) { + eventInfo = GetEventImpl(maxByteSize, accumulator); + } - } while (block && !eventInfo); + } while (block && !eventInfo); + } accumulator.OnUserRetrievedEvent(); diff --git a/src/client/topic/impl/write_session.cpp b/src/client/topic/impl/write_session.cpp index c741c47773..c8edb06021 100644 --- a/src/client/topic/impl/write_session.cpp +++ b/src/client/topic/impl/write_session.cpp @@ -45,8 +45,12 @@ void TWriteSession::WriteEncoded(TContinuationToken&& token, std::string_view da TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } -void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message) +void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message, + NTable::TTransaction* tx) { + if (tx) { + message.Tx(*tx); + } TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } @@ -60,7 +64,11 @@ void TWriteSession::Write(TContinuationToken&& token, std::string_view data, std TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } -void TWriteSession::Write(TContinuationToken&& token, TWriteMessage&& message) { +void TWriteSession::Write(TContinuationToken&& token, TWriteMessage&& message, + NTable::TTransaction* tx) { + if (tx) { + message.Tx(*tx); + } TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } @@ -112,15 +120,15 @@ bool TSimpleBlockingWriteSession::Write( auto message = TWriteMessage(std::move(data)) .SeqNo(seqNo) .CreateTimestamp(createTimestamp); - return Write(std::move(message), blockTimeout); + return Write(std::move(message), nullptr, blockTimeout); } bool TSimpleBlockingWriteSession::Write( - TWriteMessage&& message, const TDuration& blockTimeout + TWriteMessage&& message, NTable::TTransaction* tx, const TDuration& blockTimeout ) { auto continuationToken = WaitForToken(blockTimeout); if (continuationToken.has_value()) { - Writer->Write(std::move(*continuationToken), std::move(message)); + Writer->Write(std::move(*continuationToken), std::move(message), tx); return true; } return false; diff --git a/src/client/topic/impl/write_session.h b/src/client/topic/impl/write_session.h index 1922c43068..1a0c918338 100644 --- a/src/client/topic/impl/write_session.h +++ b/src/client/topic/impl/write_session.h @@ -36,9 +36,11 @@ class TWriteSession : public IWriteSession, void WriteEncoded(TContinuationToken&& continuationToken, std::string_view data, ECodec codec, ui32 originalSize, std::optional seqNo = std::nullopt, std::optional createTimestamp = std::nullopt) override; - void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override; + void Write(TContinuationToken&& continuationToken, TWriteMessage&& message, + NTable::TTransaction* tx = nullptr) override; - void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message) override; + void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message, + NTable::TTransaction* tx = nullptr) override; NThreading::TFuture WaitEvent() override; @@ -67,7 +69,9 @@ class TSimpleBlockingWriteSession : public ISimpleBlockingWriteSession { bool Write(std::string_view data, std::optional seqNo = std::nullopt, std::optional createTimestamp = std::nullopt, const TDuration& blockTimeout = TDuration::Max()) override; - bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) override; + bool Write(TWriteMessage&& message, + NTable::TTransaction* tx = nullptr, + const TDuration& blockTimeout = TDuration::Max()) override; uint64_t GetInitSeqNo() override; diff --git a/src/client/topic/impl/write_session_impl.cpp b/src/client/topic/impl/write_session_impl.cpp index 35acf14452..164dfff1a4 100644 --- a/src/client/topic/impl/write_session_impl.cpp +++ b/src/client/topic/impl/write_session_impl.cpp @@ -520,14 +520,132 @@ NThreading::TFuture TWriteSessionImpl::WaitEvent() { return EventsQueue->WaitEvent(); } +void TWriteSessionImpl::OnTransactionCommit() +{ +} + +TStatus MakeStatus(EStatus code, NYql::TIssues&& issues) +{ + return {code, std::move(issues)}; +} + +TStatus MakeSessionExpiredError() +{ + return MakeStatus(EStatus::SESSION_EXPIRED, {}); +} + +TStatus MakeCommitTransactionSuccess() +{ + return MakeStatus(EStatus::SUCCESS, {}); +} + +std::pair MakeTransactionId(const TTransaction& tx) +{ + return {tx.GetSession().GetId(), tx.GetId()}; +} + +void TWriteSessionImpl::TrySubscribeOnTransactionCommit(TTransaction* tx) +{ + if (!tx) { + return; + } + + const TTransactionId txId = MakeTransactionId(*tx); + TTransactionInfoPtr txInfo = GetOrCreateTxInfo(txId); + + with_lock(txInfo->Lock) { + if (txInfo->Subscribed) { + return; + } + + txInfo->IsActive = true; + txInfo->Subscribed = true; + txInfo->AllAcksReceived = NThreading::NewPromise(); + } + + auto callback = [txInfo]() { + with_lock(txInfo->Lock) { + txInfo->CommitCalled = true; + + if (txInfo->WriteCount == txInfo->AckCount) { + txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess()); + return txInfo->AllAcksReceived.GetFuture(); + } + + if (txInfo->IsActive) { + return txInfo->AllAcksReceived.GetFuture(); + } + + return NThreading::MakeFuture(MakeSessionExpiredError()); + } + }; + + tx->AddPrecommitCallback(std::move(callback)); +} + +void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo) +{ + Y_ABORT_UNLESS(Lock.IsLocked()); + + auto p = WrittenInTx.find(seqNo); + if (p == WrittenInTx.end()) { + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, + LogPrefix() << "OnAck: seqno=" << seqNo << ", txId=?"); + return; + } + + const TTransactionId& txId = p->second; + TTransactionInfoPtr txInfo = GetOrCreateTxInfo(txId); + + with_lock(txInfo->Lock) { + ++txInfo->AckCount; + + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, + LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=" << txId.second<< ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + + if (txInfo->CommitCalled && (txInfo->WriteCount == txInfo->AckCount)) { + txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess()); + + Txs.erase(txId); + } + } +} + +auto TWriteSessionImpl::GetOrCreateTxInfo(const TTransactionId& txId) -> TTransactionInfoPtr +{ + auto p = Txs.find(txId); + if (p == Txs.end()) { + TTransactionInfoPtr& txInfo = Txs[txId]; + txInfo = std::make_shared(); + txInfo->Subscribed = false; + txInfo->CommitCalled = false; + p = Txs.find(txId); + } + return p->second; +} + void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& message) { TInstant createdAtValue = message.CreateTimestamp_.value_or(TInstant::Now()); bool readyToAccept = false; size_t bufferSize = message.Data.size(); { std::lock_guard guard(Lock); + TrySubscribeOnTransactionCommit(message.GetTxPtr()); + + ui64 seqNo = GetNextIdImpl(message.SeqNo_); + + if (message.GetTxPtr()) { + const auto& txId = MakeTransactionId(*message.GetTxPtr()); + TTransactionInfoPtr txInfo = GetOrCreateTxInfo(txId); + ++txInfo->WriteCount; + WrittenInTx[seqNo] = txId; + + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, + LogPrefix() << "OnWrite: seqNo=" << seqNo << ", txId=" << txId.second << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + } + CurrentBatch.Add( - GetNextIdImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize, + seqNo, createdAtValue, message.Data, message.Codec, message.OriginalSize, message.MessageMeta_, message.GetTxPtr() ); @@ -999,10 +1117,12 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess for (const auto& ack : batchWriteResponse.acks()) { // TODO: Fill writer statistics - uint64_t sequenceNumber = ack.seq_no(); + uint64_t msgId = GetIdImpl(ack.seq_no()); Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped() || ack.has_written_in_tx()); + TrySignalAllAcksReceived(msgId); + TWriteSessionEvent::TWriteAck::EEventState msgWriteStatus; if (ack.has_written_in_tx()) { msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX; @@ -1018,7 +1138,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess uint64_t offset = ack.has_written() ? ack.written().offset() : 0; acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{ - GetIdImpl(sequenceNumber), + msgId, msgWriteStatus, TWriteSessionEvent::TWriteAck::TWrittenMessageDetails { offset, @@ -1027,7 +1147,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess writeStat, }); - if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) { + if (CleanupOnAcknowledged(msgId)) { result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } } @@ -1085,6 +1205,9 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(uint64_t id) { (*Counters->BytesInflightTotal) = MemoryUsage; SentOriginalMessages.pop(); + + WrittenInTx.erase(id); + return result; } @@ -1572,10 +1695,23 @@ void TWriteSessionImpl::AbortImpl() { Cancel(ConnectDelayContext); if (Processor) Processor->Cancel(); - Cancel(ClientContext); ClientContext.reset(); // removes context from contexts set from underlying gRPC-client. + + CancelTransactions(); + } +} + +void TWriteSessionImpl::CancelTransactions() +{ + for (auto& [_, txInfo] : Txs) { + txInfo->IsActive = false; + if (txInfo->WriteCount != txInfo->AckCount) { + txInfo->AllAcksReceived.SetValue(MakeSessionExpiredError()); + } } + + Txs.clear(); } void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { diff --git a/src/client/topic/impl/write_session_impl.h b/src/client/topic/impl/write_session_impl.h index 6a496da233..e0260ce95f 100644 --- a/src/client/topic/impl/write_session_impl.h +++ b/src/client/topic/impl/write_session_impl.h @@ -307,6 +307,19 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, i64 Generation; }; + struct TTransactionInfo { + TSpinLock Lock; + bool IsActive = false; + bool Subscribed = false; + NThreading::TPromise AllAcksReceived; + bool CommitCalled = false; + ui64 WriteCount = 0; + ui64 AckCount = 0; + }; + + using TTransactionId = std::pair; // SessionId, TxId + using TTransactionInfoPtr = std::shared_ptr; + THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action public: @@ -411,6 +424,13 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, bool TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRequest* writeRequest) const; + void TrySubscribeOnTransactionCommit(TTransaction* tx); + void CancelTransactions(); + TTransactionInfoPtr GetOrCreateTxInfo(const TTransactionId& txId); + void TrySignalAllAcksReceived(ui64 seqNo); + + void OnTransactionCommit(); + private: TWriteSessionSettings Settings; std::shared_ptr Client; @@ -472,6 +492,9 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, std::optional DirectWriteToPartitionId; protected: uint64_t MessagesAcquired = 0; + + std::unordered_map> Txs; + std::unordered_map WrittenInTx; // SeqNo -> TxId }; } // namespace NYdb::NTopic diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 55d4ffb6e0..8c1df743c4 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -109,7 +109,8 @@ class TFixture : public NUnitTest::TBaseFixture { const TString& messageGroupId, NYdb::EStatus status); void CloseTopicWriteSession(const TString& topicPath, - const TString& messageGroupId); + const TString& messageGroupId, + bool force = false); void CloseTopicReadSession(const TString& topicPath, const TString& consumerName); @@ -243,21 +244,38 @@ NTable::TSession TFixture::CreateTableSession() NTable::TTransaction TFixture::BeginTx(NTable::TSession& session) { - auto result = session.BeginTransaction().ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - return result.GetTransaction(); + while (true) { + auto result = session.BeginTransaction().ExtractValueSync(); + if (result.GetStatus() != EStatus::SESSION_BUSY) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + return result.GetTransaction(); + } + Sleep(TDuration::MilliSeconds(100)); + } } void TFixture::CommitTx(NTable::TTransaction& tx, EStatus status) { - auto result = tx.Commit().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString()); + while (true) { + auto result = tx.Commit().ExtractValueSync(); + if (result.GetStatus() != EStatus::SESSION_BUSY) { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString()); + return; + } + Sleep(TDuration::MilliSeconds(100)); + } } void TFixture::RollbackTx(NTable::TTransaction& tx, EStatus status) { - auto result = tx.Rollback().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString()); + while (true) { + auto result = tx.Rollback().ExtractValueSync(); + if (result.GetStatus() != EStatus::SESSION_BUSY) { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString()); + return; + } + Sleep(TDuration::MilliSeconds(100)); + } } auto TFixture::CreateReader() -> TTopicReadSessionPtr @@ -490,15 +508,15 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture) CommitTx(tx1, EStatus::ABORTED); } -Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture) -{ - WriteToTopicWithInvalidTxId(false); -} - -Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture) -{ - WriteToTopicWithInvalidTxId(true); -} +//Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture) +//{ +// WriteToTopicWithInvalidTxId(false); +//} +// +//Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture) +//{ +// WriteToTopicWithInvalidTxId(true); +//} Y_UNIT_TEST_F(WriteToTopic_Two_WriteSession, TFixture) { @@ -694,7 +712,8 @@ void TFixture::TTopicWriteSessionContext::Write(const TString& message, NTable:: } void TFixture::CloseTopicWriteSession(const TString& topicPath, - const TString& messageGroupId) + const TString& messageGroupId, + bool force) { std::pair key(topicPath, messageGroupId); auto i = TopicWriteSessions.find(key); @@ -703,7 +722,7 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath, TTopicWriteSessionContext& context = i->second; - context.Session->Close(); + context.Session->Close(force ? TDuration::MilliSeconds(0) : TDuration::Max()); TopicWriteSessions.erase(key); } @@ -972,9 +991,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture) WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #8", &tx); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #9", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); @@ -1022,11 +1038,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_2, TFixture) WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #8", &tx); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #9", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID_2); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); @@ -1069,14 +1080,9 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_3, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3"); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - - { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3"); - } + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3"); CommitTx(tx, EStatus::ABORTED); @@ -1085,10 +1091,15 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_3, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); + + messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + + messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #2"); } Y_UNIT_TEST_F(WriteToTopic_Demo_4, TFixture) @@ -1102,41 +1113,27 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_4, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - NTable::TTransaction tx_2 = BeginTx(tableSession); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3", &tx_2); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #4", &tx_2); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - - { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); - } + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); - { - auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); - } + messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); CommitTx(tx_2, EStatus::SUCCESS); CommitTx(tx_1, EStatus::ABORTED); - { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3"); - } + messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3"); - { - auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #4"); - } + messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #4"); } Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture) @@ -1152,9 +1149,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - CommitTx(tx_1, EStatus::SUCCESS); } @@ -1164,9 +1158,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3", &tx_2); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #4", &tx_2); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - CommitTx(tx_2, EStatus::SUCCESS); } @@ -1195,8 +1186,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_6, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); @@ -1230,9 +1219,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_7, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #5", &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #6", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); @@ -1261,8 +1247,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_8, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2"); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); @@ -1275,8 +1259,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_8, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); { @@ -1295,14 +1277,10 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_9, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - NTable::TTransaction tx_2 = BeginTx(tableSession); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx_2); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - { auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); @@ -1329,8 +1307,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx_1, EStatus::SUCCESS); } @@ -1339,8 +1315,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx_2); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx_2, EStatus::SUCCESS); } @@ -1534,6 +1508,7 @@ void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestD for (auto& topic : d.Topics) { WriteToTopic(topic, TEST_MESSAGE_GROUP_ID, "message", &tx); + // TODO: нужен callback для RollbakTx WaitForAcks(topic, TEST_MESSAGE_GROUP_ID); } @@ -1554,6 +1529,10 @@ void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestD for (auto& topic : d.Topics) { CheckTabletKeys(topic); } + + for (auto& topic : d.Topics) { + CloseTopicWriteSession(topic, TEST_MESSAGE_GROUP_ID); + } } Y_UNIT_TEST_F(WriteToTopic_Demo_11, TFixture) @@ -1610,27 +1589,30 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_14, TFixture) CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); CommitTx(tx, EStatus::ABORTED); } Y_UNIT_TEST_F(WriteToTopic_Demo_15, TFixture) { + // the session of writing to the topic can be closed before the commit CreateTopic("topic_A"); NTable::TSession tableSession = CreateTableSession(); NTable::TTransaction tx = BeginTx(tableSession); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #1", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID_1); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID_2); CommitTx(tx, EStatus::SUCCESS); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2"); } Y_UNIT_TEST_F(WriteToTopic_Demo_16, TFixture) @@ -1643,8 +1625,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_16, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - RestartPQTablet("topic_A", 0); CommitTx(tx, EStatus::SUCCESS); @@ -1672,8 +1652,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_17, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(20'000'000, 'x'), &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString( 7'000'000, 'x'), &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); //RestartPQTablet("topic_A", 0); @@ -1703,19 +1681,16 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params) for (size_t i = 0; i < params.OldHeadCount; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x')); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++oldHeadMsgCount; } for (size_t i = 0; i < params.BigBlobsCount; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++bigBlobMsgCount; } for (size_t i = 0; i < params.NewHeadCount; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++newHeadMsgCount; } @@ -1879,8 +1854,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_24, TFixture) WriteToTable("table_A", records, &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); @@ -1914,8 +1887,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_25, TFixture) WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, m, &tx); } - WaitForAcks("topic_B", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); @@ -1947,8 +1918,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_26, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, m, &tx, PARTITION_1); } - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - CommitTx(tx, EStatus::SUCCESS); messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), nullptr, PARTITION_1); @@ -1961,7 +1930,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture) CreateTopic("topic_B", TEST_CONSUMER); CreateTopic("topic_C", TEST_CONSUMER); - for (size_t i = 0, writtenInTx = 0; i < 2; ++i) { + for (size_t i = 0; i < 2; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, 0); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, 0); @@ -1971,14 +1940,10 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture) auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); - ++writtenInTx; - WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx); messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); - ++writtenInTx; - WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx); CommitTx(tx, EStatus::SUCCESS); @@ -2003,12 +1968,10 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture) TString message(16'000, 'a'); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, TString(16'000, 'a'), &tx, 0); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); CommitTx(tx, EStatus::SUCCESS); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, TString(20'000, 'b'), nullptr, 0); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), nullptr, 0); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); @@ -2023,12 +1986,10 @@ void TFixture::WriteMessagesInTx(size_t big, size_t small) for (size_t i = 0; i < big; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); } for (size_t i = 0; i < small; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); } CommitTx(tx, EStatus::SUCCESS); @@ -2108,8 +2069,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture) WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - AddConsumer("topic_A", {"consumer"}); CommitTx(tx, EStatus::SUCCESS); @@ -2124,35 +2083,71 @@ Y_UNIT_TEST_F(ReadRuleGeneration, TFixture) NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false}); // Users have created their own topic on it - CreateTopic(TEST_TOPIC); + CreateTopic(TString{TEST_TOPIC}); // And they wrote their messages into it - WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-1"); - WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-2"); - WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-3"); + WriteToTopic(TString{TEST_TOPIC}, TEST_MESSAGE_GROUP_ID, "message-1"); + WriteToTopic(TString{TEST_TOPIC}, TEST_MESSAGE_GROUP_ID, "message-2"); + WriteToTopic(TString{TEST_TOPIC}, TEST_MESSAGE_GROUP_ID, "message-3"); // And he had a consumer - AddConsumer(TEST_TOPIC, {"consumer-1"}); + AddConsumer(TString{TEST_TOPIC}, {"consumer-1"}); // We read messages from the topic and committed offsets - auto messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2)); + auto messages = ReadFromTopic(TString{TEST_TOPIC}, "consumer-1", TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); - CloseTopicReadSession(TEST_TOPIC, "consumer-1"); + CloseTopicReadSession(TString{TEST_TOPIC}, "consumer-1"); // And then the Logbroker team turned on the feature flag NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true}); // Users continued to write to the topic - WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4"); + WriteToTopic(TString{TEST_TOPIC}, TEST_MESSAGE_GROUP_ID, "message-4"); // Users have added new consumers - AddConsumer(TEST_TOPIC, {"consumer-2"}); + AddConsumer(TString{TEST_TOPIC}, {"consumer-2"}); // And they wanted to continue reading their messages - messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2)); + messages = ReadFromTopic(TString{TEST_TOPIC}, "consumer-1", TDuration::Seconds(2)); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); } +Y_UNIT_TEST_F(WriteToTopic_Demo_40, TFixture) +{ + // The recording stream will run into a quota. Before the commit, the client will receive confirmations + // for some of the messages. The `CommitTx` call will wait for the rest. + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + for (size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx); + } + + CommitTx(tx, EStatus::SUCCESS); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 100); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_41, TFixture) +{ + // If the recording session does not wait for confirmations, the commit will fail + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + for (size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx); + } + + CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID, true); // force close + + CommitTx(tx, EStatus::SESSION_EXPIRED); +} + } }