Skip to content

Integration tests #268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ endif()

if (YDB_SDK_TESTS)
enable_testing()
add_subdirectory(tests)
add_subdirectory(tests/ut)
add_subdirectory(tests/integration)
endif()

if (YDB_SDK_INSTALL)
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ cmake --build --preset release

Specify a level of parallelism by passing the `-j<level>` option into the command below (e.g. `-j$(nproc)`)

Running all tests:

```bash
ctest -j$(nproc) --preset release
```

Running unit tests only:

```bash
ctest -j$(nproc) --preset release -R .*_ut
```

Running integration tests only:

```bash
ctest -j$(nproc) --preset release -R .*_it
```
25 changes: 12 additions & 13 deletions include/ydb-cpp-sdk/library/monlib/counters/counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace NMonitoring {
// https://wiki.yandex-team.ru/solomon/libs/monlib_cpp/
class TDeprecatedCounter {
public:
using TValue = TAtomic;
using TValue = std::atomic<int>;
using TValueBase = TAtomicBase;

TDeprecatedCounter()
Expand All @@ -52,28 +52,28 @@ namespace NMonitoring {
}

operator TValueBase() const {
return AtomicGet(Value);
return Value.load();
}
TValueBase Val() const {
return AtomicGet(Value);
return Value.load();
}

void Set(TValueBase val) {
AtomicSet(Value, val);
Value.store(val);
}

TValueBase Inc() {
return AtomicIncrement(Value);
return ++Value;
}
TValueBase Dec() {
return AtomicDecrement(Value);
return --Value;
}

TValueBase Add(const TValueBase val) {
return AtomicAdd(Value, val);
return Value.fetch_add(val);
}
TValueBase Sub(const TValueBase val) {
return AtomicAdd(Value, -val);
return Value.fetch_sub(val);
}

// operator overloads convinient
Expand All @@ -99,20 +99,19 @@ namespace NMonitoring {
}

TValueBase operator=(TValueBase rhs) {
AtomicSwap(&Value, rhs);
return rhs;
return Value.exchange(rhs);
}

bool operator!() const {
return AtomicGet(Value) == 0;
return Value.load() == 0;
}

TAtomic& GetAtomic() {
std::atomic<int>& GetAtomic() {
return Value;
}

private:
TAtomic Value;
std::atomic<int> Value;
bool Derivative;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ namespace NMonitoring {

/// XXX: hack for deferred removal of expired counters. Remove once Output* functions are not used for serialization
mutable TCounters Counters;
mutable TAtomic ExpiringCount = 0;
mutable std::atomic<int> ExpiringCount = 0;

public:
TDynamicCounters(TCountableBase::EVisibility visibility = TCountableBase::EVisibility::Public);
Expand Down
6 changes: 3 additions & 3 deletions src/client/federated_topic/ut/basic_usage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
auto totalReceived = 0u;

auto f = checkedPromise.GetFuture();
TAtomic check = 1;
std::atomic<int> check = 1;

// Create read session.
NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings;
Expand All @@ -472,7 +472,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {

readSettings.FederatedEventHandlers_.SimpleDataHandlers([&](TReadSessionEvent::TDataReceivedEvent& ev) mutable {
std::cerr << ">>> event from dataHandler: " << DebugString(ev) << std::endl;
Y_VERIFY_S(AtomicGet(check) != 0, "check is false");
Y_VERIFY_S(check.load() != 0, "check is false");
auto& messages = ev.GetMessages();
for (size_t i = 0u; i < messages.size(); ++i) {
auto& message = messages[i];
Expand Down Expand Up @@ -517,7 +517,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {

f.GetValueSync();
ReadSession->Close();
AtomicSet(check, 0);
check.store(0);
}

Y_UNIT_TEST(BasicWriteSession) {
Expand Down
18 changes: 9 additions & 9 deletions src/client/persqueue_public/impl/write_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
Y_ABORT_UNLESS(Lock.IsLocked());

THandleResult result;
if (AtomicGet(Aborting)) {
if (Aborting.load()) {
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
return result;
}
Expand Down Expand Up @@ -128,7 +128,7 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) {
bool cdsRequestIsUnnecessary;
{
std::lock_guard guard(Lock);
if (AtomicGet(Aborting)) {
if (Aborting.load()) {
return;
}
LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: Do CDS request");
Expand Down Expand Up @@ -1231,7 +1231,7 @@ void TWriteSessionImpl::SendImpl() {

// Client method, no Lock
bool TWriteSessionImpl::Close(TDuration closeTimeout) {
if (AtomicGet(Aborting))
if (Aborting.load())
return false;
LOG_LAZY(DbDriverState->Log,
TLOG_INFO,
Expand All @@ -1247,7 +1247,7 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
if (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) {
ready = true;
}
if (AtomicGet(Aborting))
if (Aborting.load())
break;
}
if (ready) {
Expand All @@ -1258,7 +1258,7 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
}
{
std::lock_guard guard(Lock);
ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !AtomicGet(Aborting);
ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !Aborting.load();
}
{
std::lock_guard guard(Lock);
Expand All @@ -1284,7 +1284,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());

FlushWriteIfRequiredImpl();
if (AtomicGet(Aborting)) {
if (Aborting.load()) {
return;
}
auto callback = [cbContext = SelfContext] (bool ok)
Expand Down Expand Up @@ -1352,9 +1352,9 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() {
void TWriteSessionImpl::AbortImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());

if (!AtomicGet(Aborting)) {
if (!Aborting.load()) {
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting");
AtomicSet(Aborting, 1);
Aborting.store(1);
Cancel(ConnectContext);
Cancel(ConnectTimeoutContext);
Cancel(ConnectDelayContext);
Expand Down Expand Up @@ -1395,7 +1395,7 @@ TWriteSessionImpl::~TWriteSessionImpl() {
bool needClose = false;
{
std::lock_guard guard(Lock);
if (!AtomicGet(Aborting)) {
if (!Aborting.load()) {
CloseImpl(EStatus::SUCCESS, NYql::TIssues{});

needClose = !InitSeqNoSetDone && (InitSeqNoSetDone = true);
Expand Down
2 changes: 1 addition & 1 deletion src/client/persqueue_public/impl/write_session_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
bool Connected = false;
bool Started = false;
TAtomic Aborting = 0;
std::atomic<int> Aborting = 0;
bool SessionEstablished = false;
ui32 PartitionId = 0;
ui64 NextId = 0;
Expand Down
18 changes: 9 additions & 9 deletions src/client/topic/impl/write_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
TRACE_KV("status", status.Status));

THandleResult result;
if (AtomicGet(Aborting)) {
if (Aborting.load()) {
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
return result;
}
Expand Down Expand Up @@ -226,7 +226,7 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del
Y_ABORT_UNLESS(Lock.IsLocked());
Y_ABORT_UNLESS(Settings.DirectWriteToPartition_ && (Settings.PartitionId_.has_value() || DirectWriteToPartitionId.has_value()));

if (AtomicGet(Aborting)) {
if (Aborting.load()) {
return;
}

Expand Down Expand Up @@ -1425,7 +1425,7 @@ void TWriteSessionImpl::SendImpl() {

// Client method, no Lock
bool TWriteSessionImpl::Close(TDuration closeTimeout) {
if (AtomicGet(Aborting)) {
if (Aborting.load()) {
return false;
}
LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: close. Timeout " << closeTimeout);
Expand All @@ -1439,7 +1439,7 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
if (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) {
ready = true;
}
if (AtomicGet(Aborting))
if (Aborting.load())
break;
}
if (ready) {
Expand All @@ -1450,7 +1450,7 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
}
{
std::lock_guard guard(Lock);
ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !AtomicGet(Aborting);
ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !Aborting.load();
}
{
std::lock_guard guard(Lock);
Expand All @@ -1472,7 +1472,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());

FlushWriteIfRequiredImpl();
if (AtomicGet(Aborting)) {
if (Aborting.load()) {
return;
}
auto callback = [cbContext = SelfContext] (bool ok)
Expand Down Expand Up @@ -1538,9 +1538,9 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() {
void TWriteSessionImpl::AbortImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());

if (!AtomicGet(Aborting)) {
if (!Aborting.load()) {
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting");
AtomicSet(Aborting, 1);
Aborting.store(1);
Cancel(DescribePartitionContext);
Cancel(ConnectContext);
Cancel(ConnectTimeoutContext);
Expand Down Expand Up @@ -1582,7 +1582,7 @@ TWriteSessionImpl::~TWriteSessionImpl() {
bool needClose = false;
{
std::lock_guard guard(Lock);
if (!AtomicGet(Aborting)) {
if (!Aborting.load()) {
CloseImpl(EStatus::SUCCESS, NYql::TIssues{});

needClose = !InitSeqNoSetDone && (InitSeqNoSetDone = true);
Expand Down
2 changes: 1 addition & 1 deletion src/client/topic/impl/write_session_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
bool Connected = false;
bool Started = false;
TAtomic Aborting = 0;
std::atomic<int> Aborting = 0;
bool SessionEstablished = false;
ui32 PartitionId = 0;
TPartitionLocation PreferredPartitionLocation = {};
Expand Down
12 changes: 6 additions & 6 deletions src/client/topic/ut/basic_usage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,20 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess


NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>();
TAtomic lastOffset = 0u;
std::atomic<int> lastOffset = 0u;

auto f = checkedPromise.GetFuture();
readSettings.EventHandlers_.SimpleDataHandlers(
[&]
(TReadSessionEvent::TDataReceivedEvent& ev) mutable {
AtomicSet(lastOffset, ev.GetMessages().back().GetOffset());
lastOffset.store(ev.GetMessages().back().GetOffset());
std::cerr << ">>> TEST: last offset = " << lastOffset << std::endl;
});

ReadSession = topicClient.CreateReadSession(readSettings);

ui32 i = 0;
while (AtomicGet(lastOffset) + 1 < count) {
while (lastOffset.load() + 1 < count) {
RunTasks(decompressor, {i++});
}

Expand Down Expand Up @@ -302,12 +302,12 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
auto totalReceived = 0u;

auto f = checkedPromise.GetFuture();
TAtomic check = 1;
std::atomic<int> check = 1;
readSettings.EventHandlers_.SimpleDataHandlers(
// [checkedPromise = std::move(checkedPromise), &check, &sentMessages, &totalReceived]
[&]
(TReadSessionEvent::TDataReceivedEvent& ev) mutable {
Y_VERIFY_S(AtomicGet(check) != 0, "check is false");
Y_VERIFY_S(check.load() != 0, "check is false");
auto& messages = ev.GetMessages();
for (size_t i = 0u; i < messages.size(); ++i) {
auto& message = messages[i];
Expand All @@ -322,7 +322,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {

f.GetValueSync();
ReadSession->Close(TDuration::MilliSeconds(10));
AtomicSet(check, 0);
check.store(0);

auto status = topicClient.CommitOffset(setup->GetTestTopic(), 0, setup->GetTestConsumer(), 50);
UNIT_ASSERT(status.GetValueSync().IsSuccess());
Expand Down
8 changes: 4 additions & 4 deletions src/library/http/server/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,15 +469,15 @@ class THttpServer::TImpl {
}

inline void DecreaseConnections() noexcept {
AtomicDecrement(ConnectionCount);
ConnectionCount--;
}

inline void IncreaseConnections() noexcept {
AtomicIncrement(ConnectionCount);
ConnectionCount++;
}

inline i64 GetClientCount() const {
return AtomicGet(ConnectionCount);
return ConnectionCount.load();
}

inline bool MaxRequestsReached() const {
Expand All @@ -491,7 +491,7 @@ class THttpServer::TImpl {
TPipeHandle ListenWakeupWriteFd;
TMtpQueueRef Requests;
TMtpQueueRef FailRequests;
TAtomic ConnectionCount = 0;
std::atomic<int> ConnectionCount = 0;
THolder<TSocketPoller> Poller;
THolder<TConnections> Connections;
int ErrorCode = 0;
Expand Down
Loading
Loading