Skip to content

Commit d455f01

Browse files
Issue #62 - Replace TAtomic to std::atomic (#266)
1 parent 62caaab commit d455f01

File tree

6 files changed

+29
-29
lines changed

6 files changed

+29
-29
lines changed

src/client/federated_topic/ut/basic_usage_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
461461
auto totalReceived = 0u;
462462

463463
auto f = checkedPromise.GetFuture();
464-
TAtomic check = 1;
464+
std::atomic<int> check = 1;
465465

466466
// Create read session.
467467
NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings;
@@ -472,7 +472,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
472472

473473
readSettings.FederatedEventHandlers_.SimpleDataHandlers([&](TReadSessionEvent::TDataReceivedEvent& ev) mutable {
474474
std::cerr << ">>> event from dataHandler: " << DebugString(ev) << std::endl;
475-
Y_VERIFY_S(AtomicGet(check) != 0, "check is false");
475+
Y_VERIFY_S(check.load() != 0, "check is false");
476476
auto& messages = ev.GetMessages();
477477
for (size_t i = 0u; i < messages.size(); ++i) {
478478
auto& message = messages[i];
@@ -517,7 +517,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
517517

518518
f.GetValueSync();
519519
ReadSession->Close();
520-
AtomicSet(check, 0);
520+
check.store(0);
521521
}
522522

523523
Y_UNIT_TEST(BasicWriteSession) {

src/client/persqueue_public/impl/write_session_impl.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
8787
Y_ABORT_UNLESS(Lock.IsLocked());
8888

8989
THandleResult result;
90-
if (AtomicGet(Aborting)) {
90+
if (Aborting.load()) {
9191
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
9292
return result;
9393
}
@@ -128,7 +128,7 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) {
128128
bool cdsRequestIsUnnecessary;
129129
{
130130
std::lock_guard guard(Lock);
131-
if (AtomicGet(Aborting)) {
131+
if (Aborting.load()) {
132132
return;
133133
}
134134
LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: Do CDS request");
@@ -1231,7 +1231,7 @@ void TWriteSessionImpl::SendImpl() {
12311231

12321232
// Client method, no Lock
12331233
bool TWriteSessionImpl::Close(TDuration closeTimeout) {
1234-
if (AtomicGet(Aborting))
1234+
if (Aborting.load())
12351235
return false;
12361236
LOG_LAZY(DbDriverState->Log,
12371237
TLOG_INFO,
@@ -1247,7 +1247,7 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
12471247
if (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) {
12481248
ready = true;
12491249
}
1250-
if (AtomicGet(Aborting))
1250+
if (Aborting.load())
12511251
break;
12521252
}
12531253
if (ready) {
@@ -1258,7 +1258,7 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
12581258
}
12591259
{
12601260
std::lock_guard guard(Lock);
1261-
ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !AtomicGet(Aborting);
1261+
ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !Aborting.load();
12621262
}
12631263
{
12641264
std::lock_guard guard(Lock);
@@ -1284,7 +1284,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
12841284
Y_ABORT_UNLESS(Lock.IsLocked());
12851285

12861286
FlushWriteIfRequiredImpl();
1287-
if (AtomicGet(Aborting)) {
1287+
if (Aborting.load()) {
12881288
return;
12891289
}
12901290
auto callback = [cbContext = SelfContext] (bool ok)
@@ -1352,9 +1352,9 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() {
13521352
void TWriteSessionImpl::AbortImpl() {
13531353
Y_ABORT_UNLESS(Lock.IsLocked());
13541354

1355-
if (!AtomicGet(Aborting)) {
1355+
if (!Aborting.load()) {
13561356
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting");
1357-
AtomicSet(Aborting, 1);
1357+
Aborting.store(1);
13581358
Cancel(ConnectContext);
13591359
Cancel(ConnectTimeoutContext);
13601360
Cancel(ConnectDelayContext);
@@ -1395,7 +1395,7 @@ TWriteSessionImpl::~TWriteSessionImpl() {
13951395
bool needClose = false;
13961396
{
13971397
std::lock_guard guard(Lock);
1398-
if (!AtomicGet(Aborting)) {
1398+
if (!Aborting.load()) {
13991399
CloseImpl(EStatus::SUCCESS, NYql::TIssues{});
14001400

14011401
needClose = !InitSeqNoSetDone && (InitSeqNoSetDone = true);

src/client/persqueue_public/impl/write_session_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
419419
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
420420
bool Connected = false;
421421
bool Started = false;
422-
TAtomic Aborting = 0;
422+
std::atomic<int> Aborting = 0;
423423
bool SessionEstablished = false;
424424
ui32 PartitionId = 0;
425425
ui64 NextId = 0;

src/client/topic/impl/write_session_impl.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
167167
TRACE_KV("status", status.Status));
168168

169169
THandleResult result;
170-
if (AtomicGet(Aborting)) {
170+
if (Aborting.load()) {
171171
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
172172
return result;
173173
}
@@ -226,7 +226,7 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del
226226
Y_ABORT_UNLESS(Lock.IsLocked());
227227
Y_ABORT_UNLESS(Settings.DirectWriteToPartition_ && (Settings.PartitionId_.has_value() || DirectWriteToPartitionId.has_value()));
228228

229-
if (AtomicGet(Aborting)) {
229+
if (Aborting.load()) {
230230
return;
231231
}
232232

@@ -1425,7 +1425,7 @@ void TWriteSessionImpl::SendImpl() {
14251425

14261426
// Client method, no Lock
14271427
bool TWriteSessionImpl::Close(TDuration closeTimeout) {
1428-
if (AtomicGet(Aborting)) {
1428+
if (Aborting.load()) {
14291429
return false;
14301430
}
14311431
LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: close. Timeout " << closeTimeout);
@@ -1439,7 +1439,7 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
14391439
if (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) {
14401440
ready = true;
14411441
}
1442-
if (AtomicGet(Aborting))
1442+
if (Aborting.load())
14431443
break;
14441444
}
14451445
if (ready) {
@@ -1450,7 +1450,7 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
14501450
}
14511451
{
14521452
std::lock_guard guard(Lock);
1453-
ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !AtomicGet(Aborting);
1453+
ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !Aborting.load();
14541454
}
14551455
{
14561456
std::lock_guard guard(Lock);
@@ -1472,7 +1472,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
14721472
Y_ABORT_UNLESS(Lock.IsLocked());
14731473

14741474
FlushWriteIfRequiredImpl();
1475-
if (AtomicGet(Aborting)) {
1475+
if (Aborting.load()) {
14761476
return;
14771477
}
14781478
auto callback = [cbContext = SelfContext] (bool ok)
@@ -1538,9 +1538,9 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() {
15381538
void TWriteSessionImpl::AbortImpl() {
15391539
Y_ABORT_UNLESS(Lock.IsLocked());
15401540

1541-
if (!AtomicGet(Aborting)) {
1541+
if (!Aborting.load()) {
15421542
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting");
1543-
AtomicSet(Aborting, 1);
1543+
Aborting.store(1);
15441544
Cancel(DescribePartitionContext);
15451545
Cancel(ConnectContext);
15461546
Cancel(ConnectTimeoutContext);
@@ -1582,7 +1582,7 @@ TWriteSessionImpl::~TWriteSessionImpl() {
15821582
bool needClose = false;
15831583
{
15841584
std::lock_guard guard(Lock);
1585-
if (!AtomicGet(Aborting)) {
1585+
if (!Aborting.load()) {
15861586
CloseImpl(EStatus::SUCCESS, NYql::TIssues{});
15871587

15881588
needClose = !InitSeqNoSetDone && (InitSeqNoSetDone = true);

src/client/topic/impl/write_session_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
452452
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
453453
bool Connected = false;
454454
bool Started = false;
455-
TAtomic Aborting = 0;
455+
std::atomic<int> Aborting = 0;
456456
bool SessionEstablished = false;
457457
ui32 PartitionId = 0;
458458
TPartitionLocation PreferredPartitionLocation = {};

src/client/topic/ut/basic_usage_ut.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,20 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess
7171

7272

7373
NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>();
74-
TAtomic lastOffset = 0u;
74+
std::atomic<int> lastOffset = 0u;
7575

7676
auto f = checkedPromise.GetFuture();
7777
readSettings.EventHandlers_.SimpleDataHandlers(
7878
[&]
7979
(TReadSessionEvent::TDataReceivedEvent& ev) mutable {
80-
AtomicSet(lastOffset, ev.GetMessages().back().GetOffset());
80+
lastOffset.store(ev.GetMessages().back().GetOffset());
8181
std::cerr << ">>> TEST: last offset = " << lastOffset << std::endl;
8282
});
8383

8484
ReadSession = topicClient.CreateReadSession(readSettings);
8585

8686
ui32 i = 0;
87-
while (AtomicGet(lastOffset) + 1 < count) {
87+
while (lastOffset.load() + 1 < count) {
8888
RunTasks(decompressor, {i++});
8989
}
9090

@@ -302,12 +302,12 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
302302
auto totalReceived = 0u;
303303

304304
auto f = checkedPromise.GetFuture();
305-
TAtomic check = 1;
305+
std::atomic<int> check = 1;
306306
readSettings.EventHandlers_.SimpleDataHandlers(
307307
// [checkedPromise = std::move(checkedPromise), &check, &sentMessages, &totalReceived]
308308
[&]
309309
(TReadSessionEvent::TDataReceivedEvent& ev) mutable {
310-
Y_VERIFY_S(AtomicGet(check) != 0, "check is false");
310+
Y_VERIFY_S(check.load() != 0, "check is false");
311311
auto& messages = ev.GetMessages();
312312
for (size_t i = 0u; i < messages.size(); ++i) {
313313
auto& message = messages[i];
@@ -322,7 +322,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
322322

323323
f.GetValueSync();
324324
ReadSession->Close(TDuration::MilliSeconds(10));
325-
AtomicSet(check, 0);
325+
check.store(0);
326326

327327
auto status = topicClient.CommitOffset(setup->GetTestTopic(), 0, setup->GetTestConsumer(), 50);
328328
UNIT_ASSERT(status.GetValueSync().IsSuccess());

0 commit comments

Comments
 (0)