Skip to content

Commit 07b1c17

Browse files
committed
[C++ SDK] Fixed flaky topic tests (#20912)
1 parent 10a6d9a commit 07b1c17

File tree

5 files changed

+23
-29
lines changed

5 files changed

+23
-29
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
9949fe4a8f217461246f81be40113f8af4d91847
1+
cdad0889aa2f47f9cde997deae6f682eced20873

src/client/query/client.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -858,8 +858,8 @@ class TTransaction::TImpl : public std::enable_shared_from_this<TImpl> {
858858

859859
private:
860860
bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet
861-
std::vector<TPrecommitTransactionCallback> PrecommitCallbacks;
862-
std::vector<TOnFailureTransactionCallback> OnFailureCallbacks;
861+
mutable std::vector<TPrecommitTransactionCallback> PrecommitCallbacks;
862+
mutable std::vector<TOnFailureTransactionCallback> OnFailureCallbacks;
863863

864864
std::mutex PrecommitCallbacksMutex;
865865
std::mutex OnFailureCallbacksMutex;

src/client/topic/impl/read_session_impl.ipp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2183,6 +2183,9 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::TrySubscribeOnTransact
21832183
return;
21842184
}
21852185

2186+
txInfo->IsActive = true;
2187+
txInfo->Subscribed = true;
2188+
21862189
auto callback = [cbContext = this->SelfContext, txId, txInfo, consumer = Settings.ConsumerName_, client]() {
21872190
std::vector<TTopicOffsets> offsets;
21882191

@@ -2205,9 +2208,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::TrySubscribeOnTransact
22052208
};
22062209

22072210
tx.AddPrecommitCallback(std::move(callback));
2208-
2209-
txInfo->IsActive = true;
2210-
txInfo->Subscribed = true;
22112211
}
22122212
}
22132213

src/client/topic/impl/write_session_impl.cpp

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -554,31 +554,31 @@ void TWriteSessionImpl::TrySubscribeOnTransactionCommit(TTransactionBase* tx)
554554
txInfo->IsActive = true;
555555
txInfo->Subscribed = true;
556556
txInfo->AllAcksReceived = NThreading::NewPromise<TStatus>();
557-
}
558557

559-
auto callback = [cbContext = this->SelfContext, txId, txInfo]() {
560-
with_lock(txInfo->Lock) {
561-
Y_ABORT_UNLESS(!txInfo->CommitCalled);
558+
auto callback = [cbContext = this->SelfContext, txId, txInfo]() {
559+
with_lock(txInfo->Lock) {
560+
Y_ABORT_UNLESS(!txInfo->CommitCalled);
562561

563-
txInfo->CommitCalled = true;
562+
txInfo->CommitCalled = true;
564563

565-
if (txInfo->WriteCount == txInfo->AckCount) {
566-
txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess());
567-
if (auto self = cbContext->LockShared()) {
568-
self->DeleteTx(txId);
564+
if (txInfo->WriteCount == txInfo->AckCount) {
565+
txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess());
566+
if (auto self = cbContext->LockShared()) {
567+
self->DeleteTx(txId);
568+
}
569+
return txInfo->AllAcksReceived.GetFuture();
569570
}
570-
return txInfo->AllAcksReceived.GetFuture();
571-
}
572571

573-
if (txInfo->IsActive) {
574-
return txInfo->AllAcksReceived.GetFuture();
572+
if (txInfo->IsActive) {
573+
return txInfo->AllAcksReceived.GetFuture();
574+
}
575575
}
576-
}
577576

578-
return NThreading::MakeFuture(MakeSessionExpiredError());
579-
};
577+
return NThreading::MakeFuture(MakeSessionExpiredError());
578+
};
580579

581-
tx->AddPrecommitCallback(std::move(callback));
580+
tx->AddPrecommitCallback(std::move(callback));
581+
}
582582
}
583583

584584
void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo)

tests/integration/topic/topic_to_table.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,9 +1335,6 @@ void TxUsage::TestWriteToTopic10()
13351335

13361336
void TxUsage::TestWriteToTopic26()
13371337
{
1338-
// TODO(brgayazov): fix test
1339-
GTEST_SKIP() << "Test is flaky";
1340-
13411338
//
13421339
// the test verifies a transaction in which data is read from a partition of one topic and written to
13431340
// another partition of this topic
@@ -1723,9 +1720,6 @@ TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_17))
17231720

17241721
void TxUsage::TestWriteToTopic25()
17251722
{
1726-
// TODO(brgayazov): fix test
1727-
GTEST_SKIP() << "Test is flaky";
1728-
17291723
//
17301724
// the test verifies a transaction in which data is read from one topic and written to another
17311725
//

0 commit comments

Comments
 (0)