diff --git a/CMakeLists.txt b/CMakeLists.txt index 9390cc97c5d..41f4783ca2d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,11 +46,10 @@ include(cmake/install.cmake) include(cmake/common.cmake) include(cmake/ccache.cmake) include(cmake/protobuf.cmake) -include(cmake/testing.cmake) include(cmake/external_libs.cmake) if (YDB_SDK_TESTS) - enable_testing() + include(cmake/testing.cmake) endif() add_subdirectory(tools) diff --git a/CMakePresets.json b/CMakePresets.json index bd0c75cfb1c..4638d0b0a96 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -102,7 +102,11 @@ "name": "all", "inherits": "common", "displayName": "Run All Tests", - "configurePreset": "base" + "configurePreset": "base", + "environment": { + "YDB_ENDPOINT": "localhost:2136", + "YDB_DATABASE": "/local" + } }, { "name": "unit", diff --git a/cmake/external_libs.cmake b/cmake/external_libs.cmake index c258dea642a..22d0603e77c 100644 --- a/cmake/external_libs.cmake +++ b/cmake/external_libs.cmake @@ -12,7 +12,6 @@ find_package(Snappy 1.1.8 REQUIRED) find_package(base64 REQUIRED) find_package(Brotli 1.1.0 REQUIRED) find_package(jwt-cpp REQUIRED) -find_package(GTest REQUIRED) find_package(double-conversion REQUIRED) # RapidJSON diff --git a/cmake/testing.cmake b/cmake/testing.cmake index ab20c3169ac..d2a2050e236 100644 --- a/cmake/testing.cmake +++ b/cmake/testing.cmake @@ -1,71 +1,7 @@ -function(add_yunittest) - set(opts "") - set(oneval_args NAME TEST_TARGET WORKING_DIRECTORY) - set(multival_args TEST_ARG) - cmake_parse_arguments(YUNITTEST_ARGS - "${opts}" - "${oneval_args}" - "${multival_args}" - ${ARGN} - ) - - get_property(SPLIT_FACTOR TARGET ${YUNITTEST_ARGS_TEST_TARGET} PROPERTY SPLIT_FACTOR) - get_property(SPLIT_TYPE TARGET ${YUNITTEST_ARGS_TEST_TARGET} PROPERTY SPLIT_TYPE) - - if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/run_testpack") - add_test(NAME ${YUNITTEST_ARGS_NAME} - COMMAND "${CMAKE_CURRENT_SOURCE_DIR}/run_testpack" ${YUNITTEST_ARGS_TEST_ARG} - WORKING_DIRECTORY ${YUNITTEST_ARGS_WORKING_DIRECTORY} - ) - set_property(TEST ${YUNITTEST_ARGS_NAME} PROPERTY ENVIRONMENT "source_root=${YDB_SDK_SOURCE_DIR};build_root=${YDB_SDK_BINARY_DIR};test_split_factor=${SPLIT_FACTOR};test_split_type=${SPLIT_TYPE}") - return() - endif() - - if (${SPLIT_FACTOR} EQUAL 1) - add_test(NAME ${YUNITTEST_ARGS_NAME} - COMMAND ${YUNITTEST_ARGS_TEST_TARGET} ${YUNITTEST_ARGS_TEST_ARG} - WORKING_DIRECTORY ${YUNITTEST_ARGS_WORKING_DIRECTORY} - ) - return() - endif() - - if ("${SPLIT_TYPE}") - set(FORK_MODE_ARG --fork-mode ${SPLIT_TYPE}) - endif() - math(EXPR LastIdx "${SPLIT_FACTOR} - 1") - foreach(Idx RANGE ${LastIdx}) - add_test(NAME ${YUNITTEST_ARGS_NAME}_${Idx} - COMMAND Python3::Interpreter ${YDB_SDK_SOURCE_DIR}/scripts/split_unittest.py - --split-factor ${SPLIT_FACTOR} ${FORK_MODE_ARG} - --shard ${Idx} - $ ${YUNITTEST_ARGS_TEST_ARG} - WORKING_DIRECTORY ${YUNITTEST_ARGS_WORKING_DIRECTORY} - ) - endforeach() -endfunction() - -function(set_yunittest_property) - set(opts "") - set(oneval_args TEST PROPERTY) - set(multival_args ) - cmake_parse_arguments(YUNITTEST_ARGS - "${opts}" - "${oneval_args}" - "${multival_args}" - ${ARGN} - ) - get_property(SPLIT_FACTOR TARGET ${YUNITTEST_ARGS_TEST} PROPERTY SPLIT_FACTOR) +enable_testing() - if ((${SPLIT_FACTOR} EQUAL 1) OR (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/run_testpack")) - set_property(TEST ${YUNITTEST_ARGS_TEST} PROPERTY ${YUNITTEST_ARGS_PROPERTY} ${YUNITTEST_ARGS_UNPARSED_ARGUMENTS}) - return() - endif() - - math(EXPR LastIdx "${SPLIT_FACTOR} - 1") - foreach(Idx RANGE ${LastIdx}) - set_property(TEST ${YUNITTEST_ARGS_TEST}_${Idx} PROPERTY ${YUNITTEST_ARGS_PROPERTY} ${YUNITTEST_ARGS_UNPARSED_ARGUMENTS}) - endforeach() -endfunction() +find_package(GTest REQUIRED) +include(GoogleTest) function(add_ydb_test) set(opts GTEST) @@ -112,72 +48,38 @@ function(add_ydb_test) ) endif() - set_property( - TARGET - ${YDB_TEST_NAME} - PROPERTY - SPLIT_FACTOR - 1 - ) if (YDB_TEST_GTEST) - add_yunittest( - NAME - ${YDB_TEST_NAME} - TEST_TARGET - ${YDB_TEST_NAME} - TEST_ARG - ${YDB_TEST_TEST_ARG} - WORKING_DIRECTORY - ${YDB_TEST_WORKING_DIRECTORY} + gtest_discover_tests(${YDB_TEST_NAME} + EXTRA_ARGS ${YDB_TEST_TEST_ARG} + WORKING_DIRECTORY ${YDB_TEST_WORKING_DIRECTORY} + PROPERTIES LABELS ${YDB_TEST_LABELS} + PROPERTIES PROCESSORS 1 + PROPERTIES TIMEOUT 600 ) + target_link_libraries(${YDB_TEST_NAME} PRIVATE GTest::gtest_main ) else() - add_yunittest( - NAME - ${YDB_TEST_NAME} - TEST_TARGET - ${YDB_TEST_NAME} - TEST_ARG + add_test(NAME ${YDB_TEST_NAME} + WORKING_DIRECTORY ${YDB_TEST_WORKING_DIRECTORY} + COMMAND ${YDB_TEST_NAME} --print-before-suite --print-before-test --fork-tests --print-times --show-fails ${YDB_TEST_TEST_ARG} - WORKING_DIRECTORY - ${YDB_TEST_WORKING_DIRECTORY} ) + target_link_libraries(${YDB_TEST_NAME} PRIVATE cpp-testing-unittest_main ) - endif() - - set_yunittest_property( - TEST - ${YDB_TEST_NAME} - PROPERTY - LABELS - MEDIUM - ${YDB_TEST_LABELS} - ) - set_yunittest_property( - TEST - ${YDB_TEST_NAME} - PROPERTY - PROCESSORS - 1 - ) - - set_yunittest_property( - TEST - ${YDB_TEST_NAME} - PROPERTY - TIMEOUT - 600 - ) + set_tests_properties(${YDB_TEST_NAME} PROPERTIES LABELS ${YDB_TEST_LABELS}) + set_tests_properties(${YDB_TEST_NAME} PROPERTIES PROCESSORS 1) + set_tests_properties(${YDB_TEST_NAME} PROPERTIES TIMEOUT 600) + endif() vcs_info(${YDB_TEST_NAME}) endfunction() diff --git a/scripts/split_unittest.py b/scripts/split_unittest.py deleted file mode 100644 index 8874b8b915a..00000000000 --- a/scripts/split_unittest.py +++ /dev/null @@ -1,80 +0,0 @@ -import argparse -import tempfile -import shlex -import subprocess - - -def parse_args(): - parser = argparse.ArgumentParser() - parser.add_argument("--split-factor", type=int, default=0) - parser.add_argument("--shard", type=int, default=0) - parser.add_argument("--fork-mode", type=str, default="SEQUENTIAL") - parser.add_argument("command", nargs=argparse.REMAINDER) - return parser.parse_args() - - -def get_sequential_chunk(tests, modulo, modulo_index): - chunk_size = len(tests) // modulo - not_used = len(tests) % modulo - shift = chunk_size + (modulo_index < not_used) - start = chunk_size * modulo_index + min(modulo_index, not_used) - end = start + shift - return [] if end > len(tests) else tests[start:end] - - -def get_shuffled_chunk(tests, modulo, modulo_index): - result_tests = [] - for i, test in enumerate(tests): - if i % modulo == modulo_index: - result_tests.append(test) - return result_tests - - -def list_tests(binary): - with tempfile.NamedTemporaryFile() as tmpfile: - cmd = [binary, "--list-verbose", "--list-path", tmpfile.name] - subprocess.check_call(cmd) - - with open(tmpfile.name) as afile: - lines = afile.read().strip().split("\n") - lines = [x.strip() for x in lines] - return [x for x in lines if x] - - -def get_shard_tests(args): - test_names = list_tests(args.command[0]) - test_names = sorted(test_names) - - if args.fork_mode == "MODULO": - return get_shuffled_chunk(test_names, args.split_factor, args.shard) - elif args.fork_mode == "SEQUENTIAL": - return get_sequential_chunk(test_names, args.split_factor, args.shard) - else: - raise ValueError("detected unknown partition mode: {}".format(args.fork_mode)) - - -def get_shard_cmd_args(args): - return ["+{}".format(x) for x in get_shard_tests(args)] - - -def main(): - args = parse_args() - - if args.split_factor: - shard_cmd = get_shard_cmd_args(args) - if shard_cmd: - cmd = args.command + shard_cmd - else: - print("No tests for {} shard".format(args.shard)) - return 0 - else: - cmd = args.command - - rc = subprocess.call(cmd) - if rc: - print("Some tests failed. To reproduce run: {}".format(shlex.join(cmd))) - return rc - - -if __name__ == "__main__": - exit(main()) diff --git a/src/client/draft/CMakeLists.txt b/src/client/draft/CMakeLists.txt deleted file mode 100644 index 988b7706ec9..00000000000 --- a/src/client/draft/CMakeLists.txt +++ /dev/null @@ -1,25 +0,0 @@ -_ydb_sdk_add_library(client-draft) - -target_link_libraries(client-draft PUBLIC - yutil - yql-public-issue - api-grpc-draft - client-ydb_table - client-ydb_types-operation - client-ydb_value -) - -target_sources(client-draft PRIVATE - ydb_dynamic_config.cpp - ydb_replication.cpp - ydb_scripting.cpp - ydb_view.cpp -) - -generate_enum_serilization(client-draft - ${YDB_SDK_SOURCE_DIR}/include/ydb-cpp-sdk/client/draft/ydb_replication.h - INCLUDE_HEADERS - include/ydb-cpp-sdk/client/draft/ydb_replication.h -) - -_ydb_sdk_make_client_component(Draft client-draft) diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index 9d65935a2d5..78d656ade65 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -1,3 +1,5 @@ add_subdirectory(basic_example) add_subdirectory(bulk_upsert) add_subdirectory(server_restart) +add_subdirectory(sessions) +add_subdirectory(sessions_pool) diff --git a/tests/integration/basic_example/CMakeLists.txt b/tests/integration/basic_example/CMakeLists.txt index 6178b80c719..55bdd05341b 100644 --- a/tests/integration/basic_example/CMakeLists.txt +++ b/tests/integration/basic_example/CMakeLists.txt @@ -1,4 +1,4 @@ -add_ydb_test(NAME basic-example GTEST +add_ydb_test(NAME basic_example_it GTEST SOURCES main.cpp basic_example_data.cpp diff --git a/tests/integration/bulk_upsert/CMakeLists.txt b/tests/integration/bulk_upsert/CMakeLists.txt index 05a141963f8..46848877c69 100644 --- a/tests/integration/bulk_upsert/CMakeLists.txt +++ b/tests/integration/bulk_upsert/CMakeLists.txt @@ -1,4 +1,4 @@ -add_ydb_test(NAME bulk_upsert GTEST +add_ydb_test(NAME bulk_upsert_it GTEST SOURCES main.cpp bulk_upsert.cpp diff --git a/tests/integration/server_restart/CMakeLists.txt b/tests/integration/server_restart/CMakeLists.txt index 2420bae6087..2d485de4e4a 100644 --- a/tests/integration/server_restart/CMakeLists.txt +++ b/tests/integration/server_restart/CMakeLists.txt @@ -1,4 +1,4 @@ -add_ydb_test(NAME server_restart GTEST +add_ydb_test(NAME server_restart_it GTEST SOURCES main.cpp LINK_LIBRARIES diff --git a/tests/integration/sessions/CMakeLists.txt b/tests/integration/sessions/CMakeLists.txt new file mode 100644 index 00000000000..100c8ace2bc --- /dev/null +++ b/tests/integration/sessions/CMakeLists.txt @@ -0,0 +1,12 @@ +add_ydb_test(NAME sessions_it GTEST + SOURCES + main.cpp + LINK_LIBRARIES + yutil + YDB-CPP-SDK::Table + YDB-CPP-SDK::Query + api-grpc + grpc-client + LABELS + integration +) diff --git a/tests/integration/sessions/main.cpp b/tests/integration/sessions/main.cpp new file mode 100644 index 00000000000..26a126a0472 --- /dev/null +++ b/tests/integration/sessions/main.cpp @@ -0,0 +1,798 @@ +#include +#include + +#include +#include + +#include + +#include + +#include +#include + +#include + +using namespace NYdb; +using namespace NYdb::NTable; + +namespace { + +void CreateTestTable(NYdb::TDriver& driver) { + NYdb::NTable::TTableClient client(driver); + auto sessionResult = client.CreateSession().ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()); + auto session = sessionResult.GetSession(); + auto result = session.ExecuteSchemeQuery(R"___( + CREATE TABLE `/local/t` ( + Key Uint32, + Value String, + PRIMARY KEY (Key) + ); + )___").ExtractValueSync(); + ASSERT_TRUE(result.IsSuccess()); +} + +void WarmPoolCreateSession(NYdb::NQuery::TQueryClient& client, std::string& sessionId) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + auto result = session.ExecuteQuery(R"___( + SELECT 42; + )___", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + ASSERT_TRUE(result.IsSuccess()); + sessionId = session.GetId(); +} + +void WaitForSessionsInPool(NYdb::NQuery::TQueryClient& client, std::int64_t expected) { + while (client.GetActiveSessionCount() != expected) { + Sleep(TDuration::MilliSeconds(100)); + } +} + +} + +void CheckDelete(const NYdbGrpc::TGRpcClientConfig& clientConfig, const std::string& id, int expected, bool& allDoneOk) { + NYdbGrpc::TGRpcClientLow clientLow; + auto connection = clientLow.CreateGRpcServiceConnection(clientConfig); + + Ydb::Query::DeleteSessionRequest request; + request.set_session_id(id); + + NYdbGrpc::TResponseCallback responseCb = + [&allDoneOk, expected](NYdbGrpc::TGrpcStatus&& grpcStatus, Ydb::Query::DeleteSessionResponse&& response) -> void { + ASSERT_FALSE(grpcStatus.InternalError); + ASSERT_EQ(grpcStatus.GRpcStatusCode, 0) << grpcStatus.Msg + " " + grpcStatus.Details; + allDoneOk &= (response.status() == expected); + if (!allDoneOk) { + std::cerr << "Expected status: " << expected << ", got response: " << response.DebugString() << std::endl; + } + }; + + connection->DoRequest(request, std::move(responseCb), &Ydb::Query::V1::QueryService::Stub::AsyncDeleteSession); +} + +TEST(YdbSdkSessions, TestSessionPool) { + const std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 10; + + std::unordered_set sids; + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_FALSE(sessionResponse.IsTransportError()); + auto session = sessionResponse.GetSession(); + sids.insert(session.GetId()); + auto result = session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS); + ASSERT_EQ(result.GetEndpoint(), location); + } + // All requests used one session + ASSERT_EQ(sids.size(), 1); + // No more session captured by client + ASSERT_EQ(client.GetActiveSessionCount(), 0); + + driver.Stop(true); +} + +TEST(YdbSdkSessions, TestMultipleSessions) { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 10; + + std::vector sids; + std::vector results; + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_FALSE(sessionResponse.IsTransportError()); + auto session = sessionResponse.GetSession(); + sids.push_back(session); + results.push_back(session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + } + + NThreading::WaitExceptionOrAll(results).Wait(); + ASSERT_EQ(client.GetActiveSessionCount(), 10); + + for (auto& result : results) { + ASSERT_EQ(result.GetValue().GetStatus(), EStatus::SUCCESS); + } + sids.clear(); + results.clear(); + + ASSERT_EQ(client.GetActiveSessionCount(), 0); + + driver.Stop(true); +} + +TEST(YdbSdkSessions, TestActiveSessionCountAfterBadSession) { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 10; + + std::vector sids; + std::vector results; + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_FALSE(sessionResponse.IsTransportError()); + auto session = sessionResponse.GetSession(); + sids.push_back(session); + if (count == 0) { + // Force BAD session server response for ExecuteDataQuery + ASSERT_EQ(session.Close().GetValueSync().GetStatus(), EStatus::SUCCESS); + results.push_back(session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + } else { + results.push_back(session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + } + } + + NThreading::WaitExceptionOrAll(results).Wait(); + ASSERT_EQ(client.GetActiveSessionCount(), 10); + + for (size_t i = 0; i < results.size(); i++) { + if (i == 9) { + ASSERT_EQ(results[i].GetValue().GetStatus(), EStatus::BAD_SESSION); + } else { + ASSERT_EQ(results[i].GetValue().GetStatus(), EStatus::SUCCESS); + } + } + sids.clear(); + results.clear(); + + ASSERT_EQ(client.GetActiveSessionCount(), 0); + + driver.Stop(true); +} + +TEST(YdbSdkSessions, TestSdkFreeSessionAfterBadSessionQueryService) { + GTEST_SKIP() << "Test is failing right now"; + std::string location = std::getenv("YDB_ENDPOINT"); + auto clientConfig = NYdbGrpc::TGRpcClientConfig(location); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + CreateTestTable(driver); + + NYdb::NQuery::TQueryClient client(driver); + std::string sessionId; + WarmPoolCreateSession(client, sessionId); + WaitForSessionsInPool(client, 1); + + bool allDoneOk = true; + CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk); + ASSERT_TRUE(allDoneOk); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + ASSERT_EQ(session.GetId(), sessionId); + + auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + ASSERT_EQ(res.GetStatus(), EStatus::BAD_SESSION) << res.GetIssues().ToString(); + } + + WaitForSessionsInPool(client, 0); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + ASSERT_NE(session.GetId(), sessionId); + auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + ASSERT_EQ(res.GetStatus(), EStatus::SUCCESS) << res.GetIssues().ToString(); + } + + WaitForSessionsInPool(client, 1); + + driver.Stop(true); +} + +TEST(YdbSdkSessions, TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) { + GTEST_SKIP() << "Test is failing right now"; + std::string location = std::getenv("YDB_ENDPOINT"); + auto clientConfig = NYdbGrpc::TGRpcClientConfig(location); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + CreateTestTable(driver); + + NYdb::NQuery::TQueryClient client(driver); + std::string sessionId; + WarmPoolCreateSession(client, sessionId); + WaitForSessionsInPool(client, 1); + + bool allDoneOk = true; + CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk); + ASSERT_TRUE(allDoneOk); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + ASSERT_EQ(session.GetId(), sessionId); + + auto it = session.StreamExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + ASSERT_EQ(it.GetStatus(), EStatus::SUCCESS) << it.GetIssues().ToString(); + + auto res = it.ReadNext().GetValueSync(); + ASSERT_EQ(res.GetStatus(), EStatus::BAD_SESSION) << res.GetIssues().ToString(); + } + + WaitForSessionsInPool(client, 0); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + ASSERT_NE(session.GetId(), sessionId); + + auto res = session.ExecuteQuery("SELECT * FROM `/local/t`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + ASSERT_EQ(res.GetStatus(), EStatus::SUCCESS) << res.GetIssues().ToString(); + } + + WaitForSessionsInPool(client, 1); + + driver.Stop(true); +} + +TEST(YdbSdkSessions, TestActiveSessionCountAfterTransportError) { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 100; + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + auto result = session.ExecuteSchemeQuery(R"___( + CREATE TABLE `/local/t` ( + Key Uint32, + Value String, + PRIMARY KEY (Key) + ); + )___").ExtractValueSync(); + ASSERT_TRUE(result.IsSuccess()); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + } + + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_FALSE(sessionResponse.IsTransportError()); + auto session = sessionResponse.GetSession(); + + // Assume 10us is too small to execute query and get response + auto res = session.ExecuteDataQuery("SELECT COUNT(*) FROM `/local/t`;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + } + + ASSERT_EQ(client.GetActiveSessionCount(), 0); + auto session = client.GetSession().ExtractValueSync().GetSession(); + session.ExecuteSchemeQuery(R"___( + DROP TABLE `/local/t`; + )___").ExtractValueSync(); + driver.Stop(true); +} + +TEST(YdbSdkSessions, MultiThreadSync) { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + const int nThreads = 10; + const int nRequests = 1000; + auto job = [client]() mutable { + for (int i = 0; i < nRequests; i++) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResponse.GetStatus(), EStatus::SUCCESS); + } + }; + std::vector threads; + for (int i = 0; i < nThreads; i++) { + threads.emplace_back(job); + } + for (auto& thread : threads) { + thread.join(); + } + ASSERT_EQ(client.GetActiveSessionCount(), 0); + driver.Stop(true); +} + +void EnsureCanExecQuery(NYdb::NTable::TSession session) { + auto execStatus = session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus(); + ASSERT_EQ(execStatus, EStatus::SUCCESS); +} + +void EnsureCanExecQuery(NYdb::NQuery::TSession session) { + auto execStatus = session.ExecuteQuery("SELECT 42;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync().GetStatus(); + ASSERT_EQ(execStatus, EStatus::SUCCESS); +} + +template +void DoMultiThreadSessionPoolLimitSync() { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + const int maxActiveSessions = 45; + TClient client(driver, + typename TClient::TSettings() + .SessionPoolSettings( + typename TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); + + constexpr int nThreads = 100; + NYdb::EStatus statuses[nThreads]; + std::vector> sessions; + sessions.resize(nThreads); + std::atomic t = 0; + auto job = [client, &t, &statuses, &sessions]() mutable { + auto sessionResponse = client.GetSession().ExtractValueSync(); + int i = ++t; + statuses[--i] = sessionResponse.GetStatus(); + if (statuses[i] == EStatus::SUCCESS) { + EnsureCanExecQuery(sessionResponse.GetSession()); + sessions[i] = sessionResponse.GetSession(); + } + }; + + std::vector threads; + threads.resize(nThreads); + for (int i = 0; i < nThreads; i++) { + threads[i] = std::thread(job); + } + for (int i = 0; i < nThreads; i++) { + threads[i].join(); + } + + sessions.clear(); + + int successCount = 0; + int exhaustedCount = 0; + for (int i = 0; i < nThreads; i++) { + switch (statuses[i]) { + case EStatus::SUCCESS: + successCount++; + break; + case EStatus::CLIENT_RESOURCE_EXHAUSTED: + exhaustedCount++; + break; + default: + FAIL() << "Unexpected status code: " << static_cast(statuses[i]); + } + } + + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(successCount, maxActiveSessions); + ASSERT_EQ(exhaustedCount, nThreads - maxActiveSessions); + driver.Stop(true); +} + +TEST(YdbSdkSessions, MultiThreadSessionPoolLimitSyncTableClient) { + DoMultiThreadSessionPoolLimitSync(); +} + +TEST(YdbSdkSessions, MultiThreadSessionPoolLimitSyncQueryClient) { + DoMultiThreadSessionPoolLimitSync(); +} + +NYdb::NTable::TAsyncDataQueryResult ExecQueryAsync(NYdb::NTable::TSession session, const std::string q) { + return session.ExecuteDataQuery(q, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()); +} + +NYdb::NQuery::TAsyncExecuteQueryResult ExecQueryAsync(NYdb::NQuery::TSession session, const std::string q) { + return session.ExecuteQuery(q, + NYdb::NQuery::TTxControl::BeginTx().CommitTx()); +} + +template +void DoMultiThreadMultipleRequestsOnSharedSessions() { + std::string location = std::getenv("YDB_ENDPOINT"); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + const int maxActiveSessions = 10; + typename T::TClient client(driver, + typename T::TClient::TSettings() + .SessionPoolSettings( + typename T::TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); + + constexpr int nThreads = 20; + constexpr int nRequests = 50; + std::array, nThreads> results; + std::atomic t = 0; + std::atomic validSessions = 0; + auto job = [client, &t, &results, &validSessions]() mutable { + auto sessionResponse = client.GetSession().ExtractValueSync(); + + int i = ++t; + std::vector& r = results[--i]; + + if (sessionResponse.GetStatus() != EStatus::SUCCESS) { + return; + } + validSessions.fetch_add(1); + + for (int i = 0; i < nRequests; i++) { + r.push_back(ExecQueryAsync(sessionResponse.GetSession(), "SELECT 42;")); + } + }; + + std::vector threads; + for (int i = 0; i < nThreads; i++) { + threads.emplace_back(job); + } + for (auto& thread : threads) { + thread.join(); + } + + for (auto& r : results) { + NThreading::WaitExceptionOrAll(r).Wait(); + } + for (auto& r : results) { + if (!r.empty()) { + for (auto& asyncStatus : r) { + auto res = asyncStatus.GetValue(); + if (!res.IsSuccess()) { + ASSERT_EQ(res.GetStatus(), EStatus::SESSION_BUSY); + } + } + } + } + ASSERT_EQ(client.GetActiveSessionCount(), maxActiveSessions); + auto curExpectedActive = maxActiveSessions; + auto empty = 0; + for (auto& r : results) { + if (!r.empty()) { + r.clear(); + ASSERT_EQ(client.GetActiveSessionCount(), --curExpectedActive); + } else { + empty++; + } + } + ASSERT_EQ(empty, nThreads - maxActiveSessions); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + driver.Stop(true); +} + +TEST(YdbSdkSessions, MultiThreadMultipleRequestsOnSharedSessionsTableClient) { + struct TTypeHelper { + using TClient = NYdb::NTable::TTableClient; + using TResult = NYdb::NTable::TAsyncDataQueryResult; + }; + DoMultiThreadMultipleRequestsOnSharedSessions(); +} + +TEST(YdbSdkSessions, MultiThreadMultipleRequestsOnSharedSessionsQueryClient) { + GTEST_SKIP() << "Enable after interactive tx support"; + struct TTypeHelper { + using TClient = NYdb::NQuery::TQueryClient; + using TResult = NYdb::NQuery::TAsyncExecuteQueryResult; + }; + DoMultiThreadMultipleRequestsOnSharedSessions(); +} + +TEST(YdbSdkSessions, SessionsServerLimit) { + GTEST_SKIP() << "Enable after accepting a pull request with merging configs"; + + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + auto sessionResult = client.CreateSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + + sessionResult = client.CreateSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session2 = sessionResult.GetSession(); + + sessionResult = client.CreateSession().ExtractValueSync(); + sessionResult.GetIssues().PrintTo(Cerr); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::OVERLOADED); + + auto status = session1.Close().ExtractValueSync(); + ASSERT_EQ(status.IsTransportError(), false); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(status.GetStatus(), EStatus::SUCCESS); + + auto result = session2.ExecuteDataQuery(R"___( + SELECT 1; + )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS); + + sessionResult = client.CreateSession().ExtractValueSync(); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + + sessionResult = client.CreateSession().ExtractValueSync(); + sessionResult.GetIssues().PrintTo(Cerr); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::OVERLOADED); + ASSERT_EQ(client.GetActiveSessionCount(), 0); +} + +TEST(YdbSdkSessions, SessionsServerLimitWithSessionPool) { + GTEST_SKIP() << "Enable after accepting a pull request with merging configs"; + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + auto sessionResult1 = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResult1.GetStatus(), EStatus::SUCCESS); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + auto session1 = sessionResult1.GetSession(); + + auto sessionResult2 = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResult2.GetStatus(), EStatus::SUCCESS); + ASSERT_EQ(client.GetActiveSessionCount(), 2); + auto session2 = sessionResult2.GetSession(); + + { + auto sessionResult3 = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResult3.GetStatus(), EStatus::OVERLOADED); + ASSERT_EQ(client.GetActiveSessionCount(), 3); + } + ASSERT_EQ(client.GetActiveSessionCount(), 2); + + auto status = session1.Close().ExtractValueSync(); + ASSERT_EQ(status.IsTransportError(), false); + ASSERT_EQ(status.GetStatus(), EStatus::SUCCESS); + + // Close doesnt free session from user perspective, + // the value of ActiveSessionsCounter will be same after Close() call. + // Probably we want to chenge this contract + ASSERT_EQ(client.GetActiveSessionCount(), 2); + + auto result = session2.ExecuteDataQuery(R"___( + SELECT 1; + )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS); + + sessionResult1 = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResult1.GetStatus(), EStatus::SUCCESS); + ASSERT_EQ(sessionResult1.GetSession().GetId().empty(), false); + ASSERT_EQ(client.GetActiveSessionCount(), 3); + + auto sessionResult3 = client.GetSession().ExtractValueSync(); + ASSERT_EQ(sessionResult3.GetStatus(), EStatus::OVERLOADED); + ASSERT_EQ(client.GetActiveSessionCount(), 4); + + auto tmp = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 5); + sessionResult1 = tmp; // here we reset previous created session object, + // so perform close rpc call implicitly and delete it + ASSERT_EQ(sessionResult1.GetStatus(), EStatus::OVERLOADED); + ASSERT_EQ(client.GetActiveSessionCount(), 4); +} + +TEST(YdbSdkSessions, CloseSessionAfterDriverDtorWithoutSessionPool) { + std::vector sessionIds; + int iterations = 50; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + auto sessionResult = client.CreateSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + } + + std::shared_ptr channel; + channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + ASSERT_TRUE(status.ok()); + auto deferred = response.operation(); + ASSERT_TRUE(deferred.ready() == true); + ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION); + } +} + +TEST(YdbSdkSessions, CloseSessionWithSessionPoolExplicit) { + std::vector sessionIds; + int iterations = 100; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + //TODO: remove this scope after session tracker implementation + { + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + + sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 2); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + // Here previous created session will be returnet to session pool + session1 = sessionResult.GetSession(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + sessionIds.push_back(session1.GetId()); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 9); + if (dis(gen) == 5) { + auto future = client.Stop(); + future.Apply([](NThreading::TFuture f) { + return f; + }).Wait(); + } else { + client.Stop().Wait(); + } + + if (iterations & 4) { + driver.Stop(true); + } + } + + std::shared_ptr channel; + channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + ASSERT_TRUE(status.ok()); + auto deferred = response.operation(); + ASSERT_TRUE(deferred.ready() == true); + ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION); + } +} + +TEST(YdbSdkSessions, CloseSessionWithSessionPoolExplicitDriverStopOnly) { + std::vector sessionIds; + int iterations = 100; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + //TODO: remove this scope after session tracker implementation + { + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + + sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 2); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + // Here previous created session will be returnet to session pool + session1 = sessionResult.GetSession(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + sessionIds.push_back(session1.GetId()); + } + driver.Stop(true); + } + + std::shared_ptr channel; + channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + ASSERT_TRUE(status.ok()); + auto deferred = response.operation(); + ASSERT_TRUE(deferred.ready() == true); + ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION); + } +} + +TEST(YdbSdkSessions, CloseSessionWithSessionPoolFromDtors) { + std::vector sessionIds; + int iterations = 100; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + NYdb::NTable::TTableClient client(driver); + //TODO: remove this scope after session tracker implementation + { + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + + sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_EQ(client.GetActiveSessionCount(), 2); + ASSERT_EQ(sessionResult.GetStatus(), EStatus::SUCCESS); + // Here previous created session will be returnet to session pool + session1 = sessionResult.GetSession(); + ASSERT_EQ(client.GetActiveSessionCount(), 1); + sessionIds.push_back(session1.GetId()); + } + } + + std::shared_ptr channel; + channel = grpc::CreateChannel(std::getenv("YDB_ENDPOINT"), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + ASSERT_TRUE(status.ok()); + auto deferred = response.operation(); + ASSERT_TRUE(deferred.ready() == true); + ASSERT_EQ(deferred.status(), Ydb::StatusIds::BAD_SESSION); + } +} diff --git a/tests/integration/sessions_pool/CMakeLists.txt b/tests/integration/sessions_pool/CMakeLists.txt new file mode 100644 index 00000000000..6e7a6a70ab7 --- /dev/null +++ b/tests/integration/sessions_pool/CMakeLists.txt @@ -0,0 +1,10 @@ +add_ydb_test(NAME sessions_pool_it GTEST + SOURCES + main.cpp + LINK_LIBRARIES + yutil + YDB-CPP-SDK::Table + api-grpc + LABELS + integration +) diff --git a/tests/integration/sessions_pool/main.cpp b/tests/integration/sessions_pool/main.cpp new file mode 100644 index 00000000000..0779530519f --- /dev/null +++ b/tests/integration/sessions_pool/main.cpp @@ -0,0 +1,369 @@ +#include + +#include + +#include + +#include +#include + +using namespace NYdb; +using namespace NYdb::NTable; + +class YdbSdkSessionsPool : public ::testing::TestWithParam { +protected: + void SetUp() override { + ui32 maxActiveSessions = GetParam(); + Driver = std::make_unique(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT"))); + + auto clientSettings = TClientSettings().SessionPoolSettings( + TSessionPoolSettings() + .MaxActiveSessions(maxActiveSessions) + .KeepAliveIdleThreshold(TDuration::MilliSeconds(10)) + .CloseIdleThreshold(TDuration::MilliSeconds(10))); + Client = std::make_unique(*Driver, clientSettings); + } + + void TearDown() override { + Driver->Stop(true); + } + +protected: + std::unique_ptr Driver; + std::unique_ptr Client; +}; + +class YdbSdkSessionsPool1Session : public YdbSdkSessionsPool {}; + +enum class EAction: ui8 { + CreateFuture, + ExtractValue, + Return +}; +using TPlan = std::vector>; + + +void CheckPlan(TPlan plan) { + std::unordered_map sessions; + for (const auto& [action, sessionId]: plan) { + if (action == EAction::CreateFuture) { + ASSERT_FALSE(sessions.contains(sessionId)); + } else { + ASSERT_TRUE(sessions.contains(sessionId)); + switch (sessions.at(sessionId)) { + case EAction::CreateFuture: { + ASSERT_EQ(action, EAction::ExtractValue); + break; + } + case EAction::ExtractValue: { + ASSERT_EQ(action, EAction::Return); + break; + } + default: { + ASSERT_TRUE(false); + } + } + } + sessions[sessionId] = action; + } +} + +void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) { + std::unordered_map> sessionFutures; + std::unordered_map sessions; + + ui32 requestedSessions = 0; + + for (const auto& [action, sessionId]: plan) { + switch (action) { + case EAction::CreateFuture: { + sessionFutures.emplace(sessionId, client.GetSession()); + ++requestedSessions; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + if (requestedSessions > client.GetActiveSessionsLimit()) { + ASSERT_EQ(client.GetActiveSessionCount(), client.GetActiveSessionsLimit()); + } + ASSERT_FALSE(sessionFutures.at(sessionId).HasValue()); + break; + } + case EAction::ExtractValue: { + auto it = sessionFutures.find(sessionId); + auto session = it->second.ExtractValueSync(); + sessionFutures.erase(it); + sessions.emplace(sessionId, std::move(session)); + break; + } + case EAction::Return: { + sessions.erase(sessionId); + --requestedSessions; + break; + } + } + ASSERT_LE(client.GetActiveSessionCount(), client.GetActiveSessionsLimit()); + ASSERT_GE(client.GetActiveSessionCount(), static_cast(sessions.size())); + ASSERT_LE(client.GetActiveSessionCount(), static_cast(sessions.size() + sessionFutures.size())); + } +} + +int GetRand(std::mt19937& rng, int min, int max) { + std::uniform_int_distribution dist(min, max); + return dist(rng); +} + + +TPlan GenerateRandomPlan(ui32 numSessions) { + TPlan plan; + std::random_device dev; + std::mt19937 rng(dev()); + + for (ui32 i = 0; i < numSessions; ++i) { + std::uniform_int_distribution dist(0, plan.size()); + ui32 prevPos = 0; + for (EAction action: {EAction::CreateFuture, EAction::ExtractValue, EAction::Return}) { + int pos = GetRand(rng, prevPos, plan.size()); + plan.emplace(plan.begin() + pos, std::make_pair(action, i)); + prevPos = pos + 1; + } + } + return plan; +} + + +TEST_P(YdbSdkSessionsPool1Session, GetSession) { + ASSERT_EQ(Client->GetActiveSessionsLimit(), 1); + ASSERT_EQ(Client->GetActiveSessionCount(), 0); + ASSERT_EQ(Client->GetCurrentPoolSize(), 0); + + { + //TCreateSessionResult + auto session = Client->GetSession().ExtractValueSync(); + + ASSERT_EQ(session.GetStatus(), EStatus::SUCCESS); + ASSERT_EQ(Client->GetActiveSessionCount(), 1); + ASSERT_EQ(Client->GetCurrentPoolSize(), 0); + } + + ASSERT_EQ(Client->GetActiveSessionCount(), 0); + ASSERT_EQ(Client->GetCurrentPoolSize(), 1); +} + +void TestWaitQueue(NYdb::NTable::TTableClient& client, ui32 activeSessionsLimit) { + std::vector> sessionFutures; + std::vector sessions; + + // exhaust the pool + for (ui32 i = 0; i < activeSessionsLimit; ++i) { + sessions.emplace_back(client.GetSession().ExtractValueSync()); + } + ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit); + + // next should be in the wait queue + for (ui32 i = 0; i < activeSessionsLimit * 10; ++i) { + sessionFutures.emplace_back(client.GetSession()); + } + ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit); + + // next should be a fake session + { + auto brokenSession = client.GetSession().ExtractValueSync(); + ASSERT_FALSE(brokenSession.IsSuccess()); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + for (auto& sessionFuture: sessionFutures) { + ASSERT_FALSE(sessionFuture.HasValue()); + } + + for (auto& sessionFuture: sessionFutures) { + sessions.erase(sessions.begin()); + sessions.emplace_back(sessionFuture.ExtractValueSync()); + } + ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit); +} + +TEST_P(YdbSdkSessionsPool, WaitQueue) { + TestWaitQueue(*Client, GetParam()); +} + +TEST_P(YdbSdkSessionsPool1Session, RunSmallPlan) { + TPlan plan{ + {EAction::CreateFuture, 1}, + {EAction::ExtractValue, 1}, + {EAction::CreateFuture, 2}, + {EAction::Return, 1}, + {EAction::ExtractValue, 2}, + {EAction::Return, 2} + }; + CheckPlan(plan); + RunPlan(plan, *Client); + + ASSERT_EQ(Client->GetActiveSessionCount(), 0); + ASSERT_EQ(Client->GetCurrentPoolSize(), 1); +} + +TEST_P(YdbSdkSessionsPool1Session, CustomPlan) { + TPlan plan{ + {EAction::CreateFuture, 1} + }; + CheckPlan(plan); + RunPlan(plan, *Client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + ASSERT_EQ(Client->GetActiveSessionCount(), 0); +} + +ui32 RunStressTestSync(ui32 n, ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) { + std::vector> sessionFutures; + std::vector sessions; + std::mt19937 rng(0); + ui32 successCount = 0; + + for (ui32 i = 0; i < activeSessionsLimit * 12; ++i) { + sessionFutures.emplace_back(client.GetSession()); + } + + for (ui32 i = 0; i < n; ++i) { + switch (static_cast(GetRand(rng, 0, 2))) { + case EAction::CreateFuture: { + sessionFutures.emplace_back(client.GetSession()); + break; + } + case EAction::ExtractValue: { + if (sessionFutures.empty()) { + break; + } + auto ind = GetRand(rng, 0, sessionFutures.size() - 1); + auto sessionFuture = sessionFutures[ind]; + if (sessionFuture.HasValue()) { + auto session = sessionFuture.ExtractValueSync(); + if (session.IsSuccess()) { + ++successCount; + } + sessions.emplace_back(std::move(session)); + sessionFutures.erase(sessionFutures.begin() + ind); + break; + } + break; + } + case EAction::Return: { + if (sessions.empty()) { + break; + } + auto ind = GetRand(rng, 0, sessions.size() - 1); + sessions.erase(sessions.begin() + ind); + break; + } + } + } + return successCount; +} + +TEST_P(YdbSdkSessionsPool, StressTestSync) { + ui32 activeSessionsLimit = GetParam(); + + RunStressTestSync(1000, activeSessionsLimit, *Client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + ASSERT_EQ(Client->GetActiveSessionCount(), 0); + ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit); +} + +ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& client) { + std::atomic successCount(0); + std::atomic jobIndex(0); + + auto job = [&client, &successCount, &jobIndex, n]() mutable { + std::mt19937 rng(++jobIndex); + for (ui32 i = 0; i < n; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 10))); + auto sessionFuture = client.GetSession(); + std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 10))); + auto session = sessionFuture.ExtractValueSync(); + std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 10))); + successCount += session.IsSuccess(); + } + }; + + std::vector threads; + threads.resize(nThreads); + for (ui32 i = 0; i < nThreads; i++) { + threads[i] = std::thread(job); + } + for (ui32 i = 0; i < nThreads; i++) { + threads[i].join(); + } + + return successCount; +} + +TEST_P(YdbSdkSessionsPool, StressTestAsync) { + ui32 activeSessionsLimit = GetParam(); + ui32 iterations = (activeSessionsLimit == 1) ? 100 : 1000; + + RunStressTestAsync(iterations, 10, *Client); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + ASSERT_EQ(Client->GetActiveSessionCount(), 0); + ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit); +} + +void TestPeriodicTask(ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) { + std::vector> sessionFutures; + std::vector sessions; + + for (ui32 i = 0; i < activeSessionsLimit; ++i) { + sessions.emplace_back(client.GetSession().ExtractValueSync()); + ASSERT_TRUE(sessions.back().IsSuccess()); + } + + for (ui32 i = 0; i < activeSessionsLimit; ++i) { + sessionFutures.emplace_back(client.GetSession()); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + for (auto& sessionFuture : sessionFutures) { + ASSERT_FALSE(sessionFuture.HasValue()); + } + + // Wait for wait session timeout + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + for (auto& sessionFuture : sessionFutures) { + ASSERT_TRUE(sessionFuture.HasValue()); + ASSERT_FALSE(sessionFuture.ExtractValueSync().IsSuccess()); + } + + ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit); + + sessionFutures.clear(); + sessions.clear(); + + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + ASSERT_EQ(client.GetActiveSessionCount(), 0); + ASSERT_EQ(client.GetCurrentPoolSize(), activeSessionsLimit); +} + +TEST_P(YdbSdkSessionsPool, PeriodicTask) { + TestPeriodicTask(GetParam(), *Client); +} + +TEST_P(YdbSdkSessionsPool1Session, FailTest) { + // This test reproduces bug from KIKIMR-18063 + auto sessionFromPool = Client->GetSession().ExtractValueSync(); + auto futureInWaitPool = Client->GetSession(); + + { + auto standaloneSessionThatWillBeBroken = Client->CreateSession().ExtractValueSync(); + auto res = standaloneSessionThatWillBeBroken.GetSession().ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync(); + } +} + +INSTANTIATE_TEST_SUITE_P(, YdbSdkSessionsPool, ::testing::Values(1, 10)); + +INSTANTIATE_TEST_SUITE_P(, YdbSdkSessionsPool1Session, ::testing::Values(1));