diff --git a/.github/import_generation.txt b/.github/import_generation.txt index 60d3b2f4a4..b6a7d89c68 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -15 +16 diff --git a/.github/last_commit.txt b/.github/last_commit.txt index b9654f8978..6d49802087 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -9a3ba4fbaa4d0b2d6dcff910256db11b3c909166 +b6b887dc9b0107368d53dff40e8ddcbc04001b57 diff --git a/include/ydb-cpp-sdk/client/import/import.h b/include/ydb-cpp-sdk/client/import/import.h index 809cf520c1..07c6a48e81 100644 --- a/include/ydb-cpp-sdk/client/import/import.h +++ b/include/ydb-cpp-sdk/client/import/import.h @@ -10,6 +10,9 @@ class ListObjectsInS3ExportResult; } namespace NYdb::inline V3 { + +class TProtoAccessor; + namespace NImport { /// Common @@ -99,6 +102,8 @@ struct TListObjectsInS3ExportSettings : public TOperationRequestSettings& GetItems() const; const std::string& NextPageToken() const { return NextPageToken_; } void Out(IOutputStream& out) const; +private: + const Ydb::Import::ListObjectsInS3ExportResult& GetProto() const; + private: std::vector Items_; std::string NextPageToken_; + std::unique_ptr Proto_; }; using TAsyncListObjectsInS3ExportResult = NThreading::TFuture; diff --git a/include/ydb-cpp-sdk/client/proto/accessor.h b/include/ydb-cpp-sdk/client/proto/accessor.h index e060fe8822..313321176b 100644 --- a/include/ydb-cpp-sdk/client/proto/accessor.h +++ b/include/ydb-cpp-sdk/client/proto/accessor.h @@ -29,6 +29,10 @@ class TTableDescription; class TIndexDescription; } +namespace NImport { +class TListObjectsInS3ExportResult; +} + //! Provides access to raw protobuf values of YDB API entities. It is not recommended to use this //! class in client applications as it add dependency on API protobuf format which is subject to //! change. Use functionality provided by YDB SDK classes. @@ -46,7 +50,8 @@ class TProtoAccessor { static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription); static const Ydb::Topic::DescribeConsumerResult& GetProto(const NYdb::NTopic::TConsumerDescription& consumerDescription); static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult); - static const Ydb::Coordination::DescribeNodeResult& GetProto(const NYdb::NCoordination::TNodeDescription &describeNodeResult); + static const Ydb::Coordination::DescribeNodeResult& GetProto(const NYdb::NCoordination::TNodeDescription& describeNodeResult); + static const Ydb::Import::ListObjectsInS3ExportResult& GetProto(const NYdb::NImport::TListObjectsInS3ExportResult& result); #ifdef YDB_SDK_INTERNAL_CLIENTS static const Ydb::Replication::DescribeReplicationResult& GetProto(const NYdb::NReplication::TDescribeReplicationResult& desc); static const Ydb::View::DescribeViewResult& GetProto(const NYdb::NView::TDescribeViewResult& desc); diff --git a/src/client/import/CMakeLists.txt b/src/client/import/CMakeLists.txt index 237fe5f7f0..1a030da4a7 100644 --- a/src/client/import/CMakeLists.txt +++ b/src/client/import/CMakeLists.txt @@ -15,6 +15,7 @@ target_sources(client-ydb_import PRIVATE import.cpp out.cpp + proto_accessor.cpp ) generate_enum_serilization(client-ydb_import diff --git a/src/client/import/import.cpp b/src/client/import/import.cpp index f6c62f43f6..466fa66e74 100644 --- a/src/client/import/import.cpp +++ b/src/client/import/import.cpp @@ -87,8 +87,9 @@ const TImportFromS3Response::TMetadata& TImportFromS3Response::Metadata() const return Metadata_; } -TListObjectsInS3ExportResult::TListObjectsInS3ExportResult(TStatus&& status, const ::Ydb::Import::ListObjectsInS3ExportResult& proto) +TListObjectsInS3ExportResult::TListObjectsInS3ExportResult(TStatus&& status, const Ydb::Import::ListObjectsInS3ExportResult& proto) : TStatus(std::move(status)) + , Proto_(std::make_unique(proto)) { Items_.reserve(proto.items_size()); for (const auto& item : proto.items()) { @@ -100,10 +101,36 @@ TListObjectsInS3ExportResult::TListObjectsInS3ExportResult(TStatus&& status, con NextPageToken_ = proto.next_page_token(); } +TListObjectsInS3ExportResult::TListObjectsInS3ExportResult(const TListObjectsInS3ExportResult& result) + : TStatus(result) + , Items_(result.Items_) + , NextPageToken_(result.NextPageToken_) + , Proto_(std::make_unique(*result.Proto_)) +{ +} + +TListObjectsInS3ExportResult::TListObjectsInS3ExportResult(TListObjectsInS3ExportResult&&) = default; + +TListObjectsInS3ExportResult::~TListObjectsInS3ExportResult() = default; + +TListObjectsInS3ExportResult& TListObjectsInS3ExportResult::operator=(TListObjectsInS3ExportResult&&) = default; + +TListObjectsInS3ExportResult& TListObjectsInS3ExportResult::operator=(const TListObjectsInS3ExportResult& result) { + TStatus::operator=(result); + Items_ = result.Items_; + NextPageToken_ = result.NextPageToken_; + Proto_ = std::make_unique(*result.Proto_); + return *this; +} + const std::vector& TListObjectsInS3ExportResult::GetItems() const { return Items_; } +const Ydb::Import::ListObjectsInS3ExportResult& TListObjectsInS3ExportResult::GetProto() const { + return *Proto_; +} + void TListObjectsInS3ExportResult::Out(IOutputStream& out) const { if (IsSuccess()) { out << "{ items: [" << JoinSeq(", ", Items_) << "], next_page_token: \"" << NextPageToken_ << "\" }"; diff --git a/src/client/import/proto_accessor.cpp b/src/client/import/proto_accessor.cpp new file mode 100644 index 0000000000..9bf397c25c --- /dev/null +++ b/src/client/import/proto_accessor.cpp @@ -0,0 +1,11 @@ +#include + +#include + +namespace NYdb::inline V3 { + +const Ydb::Import::ListObjectsInS3ExportResult& TProtoAccessor::GetProto(const NYdb::NImport::TListObjectsInS3ExportResult& result) { + return result.GetProto(); +} + +} diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index be0ce48732..4274a464fd 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -83,6 +83,8 @@ class TFixture : public NUnitTest::TBaseFixture { virtual void Close() = 0; + virtual TAsyncStatus AsyncCommitTx(TTransactionBase& tx) = 0; + virtual ~ISession() = default; }; @@ -107,6 +109,8 @@ class TFixture : public NUnitTest::TBaseFixture { void AddConsumer(const std::string& topicPath, const std::vector& consumers); + void SetPartitionWriteSpeed(const std::string& topicName, std::size_t bytesPerSeconds); + TTopicWriteSessionPtr CreateTopicWriteSession(const std::string& topicPath, const std::string& messageGroupId, std::optional partitionId); @@ -205,6 +209,8 @@ class TFixture : public NUnitTest::TBaseFixture { void TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params); + void WriteMessagesInTx(std::size_t big, size_t small); + const TDriver& GetDriver() const; NTable::TTableClient& GetTableClient(); @@ -215,6 +221,12 @@ class TFixture : public NUnitTest::TBaseFixture { const std::string& consumerName, size_t count); + void TestWriteRandomSizedMessagesInWideTransactions(); + + void TestWriteOnlyBigMessagesInWideTransactions(); + + void TestTransactionsConflictOnSeqNo(); + void TestWriteToTopic1(); void TestWriteToTopic4(); @@ -241,6 +253,18 @@ class TFixture : public NUnitTest::TBaseFixture { void TestWriteToTopic27(); + void TestWriteToTopic38(); + + void TestWriteToTopic40(); + + void TestWriteToTopic41(); + + void TestWriteToTopic42(); + + void TestWriteToTopic43(); + + void TestWriteToTopic44(); + void TestWriteToTopic45(); void TestWriteToTopic46(); @@ -301,6 +325,8 @@ class TFixture : public NUnitTest::TBaseFixture { void CommitTx(TTransactionBase& tx, EStatus status = EStatus::SUCCESS) override; void RollbackTx(TTransactionBase& tx, EStatus status = EStatus::SUCCESS) override; + TAsyncStatus AsyncCommitTx(TTransactionBase& tx) override; + void Close() override; private: @@ -328,6 +354,8 @@ class TFixture : public NUnitTest::TBaseFixture { void CommitTx(TTransactionBase& tx, EStatus status = EStatus::SUCCESS) override; void RollbackTx(TTransactionBase& tx, EStatus status = EStatus::SUCCESS) override; + TAsyncStatus AsyncCommitTx(TTransactionBase& tx) override; + void Close() override; private: @@ -520,6 +548,14 @@ void TFixture::TTableSession::Close() Session_.Close(); } +TAsyncStatus TFixture::TTableSession::AsyncCommitTx(TTransactionBase& tx) +{ + auto txTable = dynamic_cast(tx); + return txTable.Commit().Apply([](auto result) { + return TStatus(result.GetValue()); + }); +} + TFixture::TQuerySession::TQuerySession(NQuery::TQueryClient& client, const std::string& endpoint, const std::string& database) @@ -622,6 +658,14 @@ void TFixture::TQuerySession::Close() UNIT_ASSERT_VALUES_EQUAL_C(response.status(), Ydb::StatusIds::SUCCESS, issues.ToString()); } +TAsyncStatus TFixture::TQuerySession::AsyncCommitTx(TTransactionBase& tx) +{ + auto txQuery = dynamic_cast(tx); + return txQuery.Commit().Apply([](auto result) { + return TStatus(result.GetValue()); + }); +} + std::unique_ptr TFixture::CreateSession() { switch (GetClientType()) { @@ -696,6 +740,17 @@ void TFixture::AddConsumer(const std::string& topicPath, UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } +void TFixture::SetPartitionWriteSpeed(const std::string& topicName, std::size_t bytesPerSeconds) +{ + NTopic::TTopicClient client(GetDriver()); + NTopic::TAlterTopicSettings settings; + + settings.SetPartitionWriteSpeedBytesPerSecond(bytesPerSeconds); + + auto result = client.AlterTopic(Setup->GetTopicPath(topicName), settings).GetValueSync(); + Y_ENSURE_BT(result.IsSuccess(), ToString(static_cast(result))); +} + const TDriver& TFixture::GetDriver() const { return *Driver; @@ -2038,6 +2093,186 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27_Query, TFixtureQuery) TestWriteToTopic27(); } +void TFixture::WriteMessagesInTx(std::size_t big, std::size_t small) +{ + CreateTopic("topic_A"); + + auto session = CreateSession(); + auto tx = session->BeginTx(); + + for (std::size_t i = 0; i < big; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(7'000'000, 'x'), tx.get(), 0); + } + + for (std::size_t i = 0; i < small; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(16'384, 'x'), tx.get(), 0); + } + + session->CommitTx(*tx, EStatus::SUCCESS); +} + +void TFixture::TestWriteToTopic38() +{ + WriteMessagesInTx(2, 202); + WriteMessagesInTx(2, 200); + WriteMessagesInTx(0, 1); + WriteMessagesInTx(4, 0); + WriteMessagesInTx(0, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_38_Table, TFixtureTable) +{ + TestWriteToTopic38(); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_38_Query, TFixtureQuery) +{ + TestWriteToTopic38(); +} + +void TFixture::TestWriteToTopic40() +{ + // The recording stream will run into a quota. Before the commit, the client will receive confirmations + // for some of the messages. The `CommitTx` call will wait for the rest. + CreateTopic("topic_A"); + + auto session = CreateSession(); + auto tx = session->BeginTx(); + + for (std::size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(1'000'000, 'a'), tx.get()); + } + + session->CommitTx(*tx, EStatus::SUCCESS); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 100); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_40_Table, TFixtureTable) +{ + TestWriteToTopic40(); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_40_Query, TFixtureQuery) +{ + TestWriteToTopic40(); +} + +void TFixture::TestWriteToTopic41() +{ + // If the recording session does not wait for confirmations, the commit will fail + CreateTopic("topic_A"); + + auto session = CreateSession(); + auto tx = session->BeginTx(); + + for (std::size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(1'000'000, 'a'), tx.get()); + } + + CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID, true); // force close + + session->CommitTx(*tx, EStatus::SESSION_EXPIRED); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_41_Table, TFixtureTable) +{ + TestWriteToTopic41(); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_41_Query, TFixtureQuery) +{ + TestWriteToTopic41(); +} + +void TFixture::TestWriteToTopic42() +{ + CreateTopic("topic_A"); + + auto session = CreateSession(); + auto tx = session->BeginTx(); + + for (std::size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(1'000'000, 'a'), tx.get()); + } + + CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID); // gracefully close + + session->CommitTx(*tx, EStatus::SUCCESS); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 100); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_42_Table, TFixtureTable) +{ + TestWriteToTopic42(); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_42_Query, TFixtureQuery) +{ + TestWriteToTopic42(); +} + +void TFixture::TestWriteToTopic43() +{ + // The recording stream will run into a quota. Before the commit, the client will receive confirmations + // for some of the messages. The `ExecuteDataQuery` call will wait for the rest. + CreateTopic("topic_A"); + + auto session = CreateSession(); + auto tx = session->BeginTx(); + + for (std::size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(1'000'000, 'a'), tx.get()); + } + + session->Execute("SELECT 1", tx.get()); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 100); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_43_Table, TFixtureTable) +{ + TestWriteToTopic43(); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_43_Query, TFixtureQuery) +{ + TestWriteToTopic43(); +} + +void TFixture::TestWriteToTopic44() +{ + CreateTopic("topic_A"); + + auto session = CreateSession(); + + auto [_, tx] = session->ExecuteInTx("SELECT 1", false); + + for (std::size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(1'000'000, 'a'), tx.get()); + } + + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60)); + UNIT_ASSERT_EQUAL(messages.size(), 0u); + + session->Execute("SELECT 2", tx.get()); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 100); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_44_Table, TFixtureTable) +{ + TestWriteToTopic44(); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_44_Query, TFixtureQuery) +{ + TestWriteToTopic44(); +} + Y_UNIT_TEST_F(ReadRuleGeneration, TFixtureNoClient) { // There was a server @@ -2277,6 +2512,226 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_50_Query, TFixtureQuery) TestWriteToTopic50(); } +void TFixture::TestWriteRandomSizedMessagesInWideTransactions() +{ + // The test verifies the simultaneous execution of several transactions. There is a topic + // with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions. + // The size of the messages is random. Such that both large blobs in the body and small ones in + // the head of the partition are obtained. Message sizes are multiples of 500 KB. This way we + // will make sure that when committing transactions, the division into blocks is taken into account. + + const std::size_t PARTITIONS_COUNT = 20; + const std::size_t TXS_COUNT = 10; + + CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); + + SetPartitionWriteSpeed("topic_A", 50'000'000); + + std::vector> sessions; + std::vector> transactions; + + // We open TXS_COUNT transactions and write messages to the topic. + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + sessions.push_back(CreateSession()); + auto& session = sessions.back(); + + transactions.push_back(session->BeginTx()); + auto& tx = transactions.back(); + + for (std::size_t j = 0; j < PARTITIONS_COUNT; ++j) { + std::string sourceId = TEST_MESSAGE_GROUP_ID; + sourceId += "_"; + sourceId += ToString(i); + sourceId += "_"; + sourceId += ToString(j); + + std::size_t count = RandomNumber(20) + 3; + WriteToTopic("topic_A", sourceId, std::string(512 * 1000 * count, 'x'), tx.get(), j); + + WaitForAcks("topic_A", sourceId); + } + } + + // We are doing an asynchronous commit of transactions. They will be executed simultaneously. + std::vector futures; + + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + futures.push_back(sessions[i]->AsyncCommitTx(*transactions[i])); + } + + // All transactions must be completed successfully. + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + futures[i].Wait(); + const auto& result = futures[i].GetValueSync(); + UNIT_ASSERT_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } +} + +Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions_Table, TFixtureTable) +{ + TestWriteRandomSizedMessagesInWideTransactions(); +} + +Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions_Query, TFixtureQuery) +{ + TestWriteRandomSizedMessagesInWideTransactions(); +} + +void TFixture::TestWriteOnlyBigMessagesInWideTransactions() +{ + // The test verifies the simultaneous execution of several transactions. There is a topic `topic_A` and + // it contains a `PARTITIONS_COUNT' of partitions. In each transaction, the test writes to all partitions. + // The size of the messages is chosen so that only large blobs are recorded in the transaction and there + // are no records in the head. Thus, we verify that transaction bundling is working correctly. + + const std::size_t PARTITIONS_COUNT = 20; + const std::size_t TXS_COUNT = 100; + + CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); + + SetPartitionWriteSpeed("topic_A", 50'000'000); + + std::vector> sessions; + std::vector> transactions; + + // We open TXS_COUNT transactions and write messages to the topic. + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + sessions.push_back(CreateSession()); + auto& session = sessions.back(); + + transactions.push_back(session->BeginTx()); + auto& tx = transactions.back(); + + for (std::size_t j = 0; j < PARTITIONS_COUNT; ++j) { + std::string sourceId = TEST_MESSAGE_GROUP_ID; + sourceId += "_"; + sourceId += ToString(i); + sourceId += "_"; + sourceId += ToString(j); + + WriteToTopic("topic_A", sourceId, std::string(6'500'000, 'x'), tx.get(), j); + + WaitForAcks("topic_A", sourceId); + } + } + + // We are doing an asynchronous commit of transactions. They will be executed simultaneously. + std::vector futures; + + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + futures.push_back(sessions[i]->AsyncCommitTx(*transactions[i])); + } + + // All transactions must be completed successfully. + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + futures[i].Wait(); + const auto& result = futures[i].GetValueSync(); + UNIT_ASSERT_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } +} + +Y_UNIT_TEST_F(Write_Only_Big_Messages_In_Wide_Transactions_Table, TFixtureTable) +{ + TestWriteOnlyBigMessagesInWideTransactions(); +} + +Y_UNIT_TEST_F(Write_Only_Big_Messages_In_Wide_Transactions_Query, TFixtureQuery) +{ + TestWriteOnlyBigMessagesInWideTransactions(); +} + +void TFixture::TestTransactionsConflictOnSeqNo() +{ + const std::uint32_t PARTITIONS_COUNT = 20; + const std::size_t TXS_COUNT = 100; + + CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); + + SetPartitionWriteSpeed("topic_A", 50'000'000); + + auto session = CreateSession(); + std::vector> topicWriteSessions; + + for (std::uint32_t i = 0; i < PARTITIONS_COUNT; ++i) { + std::string sourceId = TEST_MESSAGE_GROUP_ID; + sourceId += "_"; + sourceId += ToString(i); + + NTopic::TTopicClient client(GetDriver()); + NTopic::TWriteSessionSettings options; + options.Path(Setup->GetTopicPath("topic_A")); + options.ProducerId(sourceId); + options.MessageGroupId(sourceId); + options.PartitionId(i); + options.Codec(ECodec::RAW); + + auto session = client.CreateSimpleBlockingWriteSession(options); + + topicWriteSessions.push_back(std::move(session)); + } + + std::vector> sessions; + std::vector> transactions; + + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + sessions.push_back(CreateSession()); + auto& session = sessions.back(); + + transactions.push_back(session->BeginTx()); + auto& tx = transactions.back(); + + for (std::size_t j = 0; j < PARTITIONS_COUNT; ++j) { + std::string sourceId = TEST_MESSAGE_GROUP_ID; + sourceId += "_"; + sourceId += ToString(j); + + for (std::size_t k = 0, count = RandomNumber(20) + 1; k < count; ++k) { + const std::string data(RandomNumber(1'000) + 100, 'x'); + NTopic::TWriteMessage params(data); + params.Tx(*tx); + + topicWriteSessions[j]->Write(std::move(params)); + } + } + } + + std::vector futures; + + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + futures.push_back(sessions[i]->AsyncCommitTx(*transactions[i])); + } + + // Some transactions should end with the error `ABORTED` + std::size_t successCount = 0; + + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + futures[i].Wait(); + const auto& result = futures[i].GetValueSync(); + switch (result.GetStatus()) { + case EStatus::SUCCESS: + ++successCount; + break; + case EStatus::ABORTED: + break; + default: + UNIT_FAIL("unexpected status: " + ToString(static_cast(result))); + break; + } + } + + UNIT_ASSERT_UNEQUAL(successCount, TXS_COUNT); +} + +Y_UNIT_TEST_F(Transactions_Conflict_On_SeqNo_Table, TFixtureTable) +{ + TestTransactionsConflictOnSeqNo(); +} + +Y_UNIT_TEST_F(Transactions_Conflict_On_SeqNo_Query, TFixtureQuery) +{ + TestTransactionsConflictOnSeqNo(); +} + class TFixtureSinks : public TFixture { protected: void CreateRowTable(const std::string& path); diff --git a/tests/integration/topic/setup/fixture.cpp b/tests/integration/topic/setup/fixture.cpp index e00d5c0cf4..5be12b51b9 100644 --- a/tests/integration/topic/setup/fixture.cpp +++ b/tests/integration/topic/setup/fixture.cpp @@ -21,12 +21,12 @@ void TTopicTestFixture::SetUp() { TTopicClient client(MakeDriver()); - const testing::TestInfo* const testInfo = testing::UnitTest::GetInstance()->current_test_info(); + const auto& testInfo = testing::UnitTest::GetInstance()->current_test_info(); std::stringstream topicBuilder; topicBuilder << std::getenv("YDB_TEST_ROOT") << "/" << testInfo->test_suite_name() << "-" << testInfo->name() << "/"; TopicPrefix_ = topicBuilder.str(); - + std::stringstream consumerBuilder; consumerBuilder << testInfo->test_suite_name() << "-" << testInfo->name() << "-"; ConsumerPrefix_ = consumerBuilder.str(); @@ -65,14 +65,6 @@ void TTopicTestFixture::RemoveDirectoryRecurive(const std::string& path) const { } } -void TTopicTestFixture::TearDown() { - // try { - // RemoveDirectoryRecurive(GetDatabase() + "/" + TopicPrefix_); - // } catch (const std::exception& e) { - // std::cerr << "Occurred error in TearDown: " << e.what() << std::endl; - // } -} - std::string TTopicTestFixture::GetEndpoint() const { auto endpoint = std::getenv("YDB_ENDPOINT"); Y_ENSURE_BT(endpoint, "YDB_ENDPOINT is not set"); diff --git a/tests/integration/topic/setup/fixture.h b/tests/integration/topic/setup/fixture.h index 11f7ea0042..da859690a1 100644 --- a/tests/integration/topic/setup/fixture.h +++ b/tests/integration/topic/setup/fixture.h @@ -14,7 +14,6 @@ extern const bool EnableDirectRead; class TTopicTestFixture : public ::testing::Test, public ITopicTestSetup { public: void SetUp() override; - void TearDown() override; std::string GetEndpoint() const override; std::string GetDatabase() const override; @@ -32,7 +31,7 @@ class TTopicTestFixture : public ::testing::Test, public ITopicTestSetup { } #ifdef PQ_EXPERIMENTAL_DIRECT_READ - #define TEST_NAME(name) DirectRead_##name + #define TEST_NAME(name) DirectRead_##name #else - #define TEST_NAME(name) name + #define TEST_NAME(name) name #endif diff --git a/tests/integration/topic/topic_to_table.cpp b/tests/integration/topic/topic_to_table.cpp index 87ae80029b..af411e2cb4 100644 --- a/tests/integration/topic/topic_to_table.cpp +++ b/tests/integration/topic/topic_to_table.cpp @@ -65,8 +65,6 @@ class TxUsage : public TTopicTestFixture { virtual void Close() = 0; - virtual TAsyncStatus AsyncCommitTx(TTransactionBase& tx) = 0; - virtual ~ISession() = default; }; @@ -101,8 +99,6 @@ class TxUsage : public TTopicTestFixture { TDuration stabilizationWindow, std::uint64_t downUtilizationPercent, std::uint64_t upUtilizationPercent); - void SetPartitionWriteSpeed(const std::string& topicName, - std::size_t bytesPerSeconds); void WriteToTopicWithInvalidTxId(bool invalidTxId); @@ -162,12 +158,6 @@ class TxUsage : public TTopicTestFixture { void TestWriteToTopicTwoWriteSession(); - void TestWriteRandomSizedMessagesInWideTransactions(); - - void TestWriteOnlyBigMessagesInWideTransactions(); - - void TestTransactionsConflictOnSeqNo(); - void TestWriteToTopic1(); void TestWriteToTopic2(); @@ -216,20 +206,8 @@ class TxUsage : public TTopicTestFixture { void TestWriteToTopic37(); - void TestWriteToTopic38(); - void TestWriteToTopic39(); - void TestWriteToTopic40(); - - void TestWriteToTopic41(); - - void TestWriteToTopic42(); - - void TestWriteToTopic43(); - - void TestWriteToTopic44(); - void TestWriteToTopic48(); enum class EClientType { @@ -260,8 +238,6 @@ class TxUsage : public TTopicTestFixture { void Close() override; - TAsyncStatus AsyncCommitTx(TTransactionBase& tx) override; - private: NTable::TSession Init(NTable::TTableClient& client); @@ -289,8 +265,6 @@ class TxUsage : public TTopicTestFixture { void Close() override; - TAsyncStatus AsyncCommitTx(TTransactionBase& tx) override; - private: NQuery::TSession Init(NQuery::TQueryClient& client); @@ -441,14 +415,6 @@ void TxUsage::TTableSession::Close() Session_.Close(); } -TAsyncStatus TxUsage::TTableSession::AsyncCommitTx(TTransactionBase& tx) -{ - auto txTable = dynamic_cast(tx); - return txTable.Commit().Apply([](auto result) { - return TStatus(result.GetValue()); - }); -} - TxUsage::TQuerySession::TQuerySession(NQuery::TQueryClient& client, const std::string& endpoint, const std::string& database) @@ -551,14 +517,6 @@ void TxUsage::TQuerySession::Close() Y_ENSURE_BT(response.status() == Ydb::StatusIds::SUCCESS, issues.ToString()); } -TAsyncStatus TxUsage::TQuerySession::AsyncCommitTx(TTransactionBase& tx) -{ - auto txQuery = dynamic_cast(tx); - return txQuery.Commit().Apply([](auto result) { - return TStatus(result.GetValue()); - }); -} - std::unique_ptr TxUsage::CreateSession() { switch (GetClientType()) { @@ -727,18 +685,6 @@ void TxUsage::AlterAutoPartitioning(const std::string& topicName, Y_ENSURE_BT(result.IsSuccess(), ToString(static_cast(result))); } -void TxUsage::SetPartitionWriteSpeed(const std::string& topicName, - std::size_t bytesPerSeconds) -{ - NTopic::TTopicClient client(GetDriver()); - NTopic::TAlterTopicSettings settings; - - settings.SetPartitionWriteSpeedBytesPerSecond(bytesPerSeconds); - - auto result = client.AlterTopic(GetTopicPath(topicName), settings).GetValueSync(); - Y_ENSURE_BT(result.IsSuccess(), ToString(static_cast(result))); -} - const TDriver& TxUsage::GetDriver() const { return *Driver; @@ -2018,25 +1964,6 @@ TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_37)) TestWriteToTopic37(); } -// void TxUsage::TestWriteToTopic38() -// { -// WriteMessagesInTx(2, 202); -// WriteMessagesInTx(2, 200); -// WriteMessagesInTx(0, 1); -// WriteMessagesInTx(4, 0); -// WriteMessagesInTx(0, 1); -// } - -// TEST_F(TxUsageTable, TEST_NAME(WriteToTopic_Demo_38)) -// { -// TestWriteToTopic38(); -// } - -// TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_38)) -// { -// TestWriteToTopic38(); -// } - void TxUsage::TestWriteToTopic39() { CreateTopic("topic_A"); @@ -2064,149 +1991,6 @@ TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_39)) TestWriteToTopic39(); } -// void TxUsage::TestWriteToTopic40() -// { -// // The recording stream will run into a quota. Before the commit, the client will receive confirmations -// // for some of the messages. The `CommitTx` call will wait for the rest. -// CreateTopic("topic_A"); - -// auto session = CreateSession(); -// auto tx = session->BeginTx(); - -// for (std::size_t k = 0; k < 100; ++k) { -// WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(1'000'000, 'a'), tx.get()); -// } - -// session->CommitTx(*tx, EStatus::SUCCESS); - -// Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 100); -// } - -// TEST_F(TxUsageTable, TEST_NAME(WriteToTopic_Demo_40)) -// { -// TestWriteToTopic40(); -// } - -// TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_40)) -// { -// TestWriteToTopic40(); -// } - -// void TxUsage::TestWriteToTopic41() -// { -// // If the recording session does not wait for confirmations, the commit will fail -// CreateTopic("topic_A"); - -// auto session = CreateSession(); -// auto tx = session->BeginTx(); - -// for (std::size_t k = 0; k < 100; ++k) { -// WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(1'000'000, 'a'), tx.get()); -// } - -// CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID, true); // force close - -// session->CommitTx(*tx, EStatus::SESSION_EXPIRED); -// } - -// TEST_F(TxUsageTable, TEST_NAME(WriteToTopic_Demo_41)) -// { -// TestWriteToTopic41(); -// } - -// TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_41)) -// { -// TestWriteToTopic41(); -// } - -// void TxUsage::TestWriteToTopic42() -// { -// CreateTopic("topic_A"); - -// auto session = CreateSession(); -// auto tx = session->BeginTx(); - -// for (std::size_t k = 0; k < 100; ++k) { -// WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(1'000'000, 'a'), tx.get()); -// } - -// CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID); // gracefully close - -// session->CommitTx(*tx, EStatus::SUCCESS); - -// Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 100); -// } - -// TEST_F(TxUsageTable, TEST_NAME(WriteToTopic_Demo_42)) -// { -// TestWriteToTopic42(); -// } - -// TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_42)) -// { -// TestWriteToTopic42(); -// } - -// void TxUsage::TestWriteToTopic43() -// { -// // The recording stream will run into a quota. Before the commit, the client will receive confirmations -// // for some of the messages. The `ExecuteDataQuery` call will wait for the rest. -// CreateTopic("topic_A"); - -// auto session = CreateSession(); -// auto tx = session->BeginTx(); - -// for (std::size_t k = 0; k < 100; ++k) { -// WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(1'000'000, 'a'), tx.get()); -// } - -// session->Execute("SELECT 1", tx.get()); - -// Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 100); -// } - -// TEST_F(TxUsageTable, TEST_NAME(WriteToTopic_Demo_43)) -// { -// TestWriteToTopic43(); -// } - -// TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_43)) -// { -// TestWriteToTopic43(); -// } - -// void TxUsage::TestWriteToTopic44() -// { -// CreateTopic("topic_A"); - -// auto session = CreateSession(); - -// auto [_, tx] = session->ExecuteInTx("SELECT 1", false); - -// for (std::size_t k = 0; k < 100; ++k) { -// WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(1'000'000, 'a'), tx.get()); -// } - -// WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - -// auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60)); -// ASSERT_EQ(messages.size(), 0u); - -// session->Execute("SELECT 2", tx.get()); - -// Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 100); -// } - -// TEST_F(TxUsageTable, TEST_NAME(WriteToTopic_Demo_44)) -// { -// TestWriteToTopic44(); -// } - -// TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_44)) -// { -// TestWriteToTopic44(); -// } - void TxUsage::TestWriteToTopic48() { // the commit of a transaction affects the split of the partition @@ -2247,226 +2031,6 @@ TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_48)) TestWriteToTopic48(); } -// void TxUsage::TestWriteRandomSizedMessagesInWideTransactions() -// { -// // The test verifies the simultaneous execution of several transactions. There is a topic -// // with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions. -// // The size of the messages is random. Such that both large blobs in the body and small ones in -// // the head of the partition are obtained. Message sizes are multiples of 500 KB. This way we -// // will make sure that when committing transactions, the division into blocks is taken into account. - -// const std::size_t PARTITIONS_COUNT = 20; -// const std::size_t TXS_COUNT = 10; - -// CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); - -// SetPartitionWriteSpeed("topic_A", 50'000'000); - -// std::vector> sessions; -// std::vector> transactions; - -// // We open TXS_COUNT transactions and write messages to the topic. -// for (std::size_t i = 0; i < TXS_COUNT; ++i) { -// sessions.push_back(CreateSession()); -// auto& session = sessions.back(); - -// transactions.push_back(session->BeginTx()); -// auto& tx = transactions.back(); - -// for (std::size_t j = 0; j < PARTITIONS_COUNT; ++j) { -// std::string sourceId = TEST_MESSAGE_GROUP_ID; -// sourceId += "_"; -// sourceId += ToString(i); -// sourceId += "_"; -// sourceId += ToString(j); - -// std::size_t count = RandomNumber(20) + 3; -// WriteToTopic("topic_A", sourceId, std::string(512 * 1000 * count, 'x'), tx.get(), j); - -// WaitForAcks("topic_A", sourceId); -// } -// } - -// // We are doing an asynchronous commit of transactions. They will be executed simultaneously. -// std::vector futures; - -// for (std::size_t i = 0; i < TXS_COUNT; ++i) { -// futures.push_back(sessions[i]->AsyncCommitTx(*transactions[i])); -// } - -// // All transactions must be completed successfully. -// for (std::size_t i = 0; i < TXS_COUNT; ++i) { -// futures[i].Wait(); -// const auto& result = futures[i].GetValueSync(); -// ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); -// } -// } - -// TEST_F(TxUsageTable, TEST_NAME(Write_Random_Sized_Messages_In_Wide_Transactions)) -// { -// TestWriteRandomSizedMessagesInWideTransactions(); -// } - -// TEST_F(TxUsageQuery, TEST_NAME(Write_Random_Sized_Messages_In_Wide_Transactions)) -// { -// TestWriteRandomSizedMessagesInWideTransactions(); -// } - -// void TxUsage::TestWriteOnlyBigMessagesInWideTransactions() -// { -// // The test verifies the simultaneous execution of several transactions. There is a topic `topic_A` and -// // it contains a `PARTITIONS_COUNT' of partitions. In each transaction, the test writes to all partitions. -// // The size of the messages is chosen so that only large blobs are recorded in the transaction and there -// // are no records in the head. Thus, we verify that transaction bundling is working correctly. - -// const std::size_t PARTITIONS_COUNT = 20; -// const std::size_t TXS_COUNT = 100; - -// CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); - -// SetPartitionWriteSpeed("topic_A", 50'000'000); - -// std::vector> sessions; -// std::vector> transactions; - -// // We open TXS_COUNT transactions and write messages to the topic. -// for (std::size_t i = 0; i < TXS_COUNT; ++i) { -// sessions.push_back(CreateSession()); -// auto& session = sessions.back(); - -// transactions.push_back(session->BeginTx()); -// auto& tx = transactions.back(); - -// for (std::size_t j = 0; j < PARTITIONS_COUNT; ++j) { -// std::string sourceId = TEST_MESSAGE_GROUP_ID; -// sourceId += "_"; -// sourceId += ToString(i); -// sourceId += "_"; -// sourceId += ToString(j); - -// WriteToTopic("topic_A", sourceId, std::string(6'500'000, 'x'), tx.get(), j); - -// WaitForAcks("topic_A", sourceId); -// } -// } - -// // We are doing an asynchronous commit of transactions. They will be executed simultaneously. -// std::vector futures; - -// for (std::size_t i = 0; i < TXS_COUNT; ++i) { -// futures.push_back(sessions[i]->AsyncCommitTx(*transactions[i])); -// } - -// // All transactions must be completed successfully. -// for (std::size_t i = 0; i < TXS_COUNT; ++i) { -// futures[i].Wait(); -// const auto& result = futures[i].GetValueSync(); -// ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); -// } -// } - -// TEST_F(TxUsageTable, TEST_NAME(Write_Only_Big_Messages_In_Wide_Transactions)) -// { -// TestWriteOnlyBigMessagesInWideTransactions(); -// } - -// TEST_F(TxUsageQuery, TEST_NAME(Write_Only_Big_Messages_In_Wide_Transactions)) -// { -// TestWriteOnlyBigMessagesInWideTransactions(); -// } - -// void TxUsage::TestTransactionsConflictOnSeqNo() -// { -// const std::uint32_t PARTITIONS_COUNT = 20; -// const std::size_t TXS_COUNT = 100; - -// CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); - -// SetPartitionWriteSpeed("topic_A", 50'000'000); - -// auto session = CreateSession(); -// std::vector> topicWriteSessions; - -// for (std::uint32_t i = 0; i < PARTITIONS_COUNT; ++i) { -// std::string sourceId = TEST_MESSAGE_GROUP_ID; -// sourceId += "_"; -// sourceId += ToString(i); - -// NTopic::TTopicClient client(GetDriver()); -// NTopic::TWriteSessionSettings options; -// options.Path(GetTopicPath("topic_A")); -// options.ProducerId(sourceId); -// options.MessageGroupId(sourceId); -// options.PartitionId(i); -// options.Codec(ECodec::RAW); - -// auto session = client.CreateSimpleBlockingWriteSession(options); - -// topicWriteSessions.push_back(std::move(session)); -// } - -// std::vector> sessions; -// std::vector> transactions; - -// for (std::size_t i = 0; i < TXS_COUNT; ++i) { -// sessions.push_back(CreateSession()); -// auto& session = sessions.back(); - -// transactions.push_back(session->BeginTx()); -// auto& tx = transactions.back(); - -// for (std::size_t j = 0; j < PARTITIONS_COUNT; ++j) { -// std::string sourceId = TEST_MESSAGE_GROUP_ID; -// sourceId += "_"; -// sourceId += ToString(j); - -// for (std::size_t k = 0, count = RandomNumber(20) + 1; k < count; ++k) { -// const std::string data(RandomNumber(1'000) + 100, 'x'); -// NTopic::TWriteMessage params(data); -// params.Tx(*tx); - -// topicWriteSessions[j]->Write(std::move(params)); -// } -// } -// } - -// std::vector futures; - -// for (std::size_t i = 0; i < TXS_COUNT; ++i) { -// futures.push_back(sessions[i]->AsyncCommitTx(*transactions[i])); -// } - -// // Some transactions should end with the error `ABORTED` -// std::size_t successCount = 0; - -// for (std::size_t i = 0; i < TXS_COUNT; ++i) { -// futures[i].Wait(); -// const auto& result = futures[i].GetValueSync(); -// switch (result.GetStatus()) { -// case EStatus::SUCCESS: -// ++successCount; -// break; -// case EStatus::ABORTED: -// break; -// default: -// ADD_FAILURE() << "unexpected status: " << ToString(static_cast(result)); -// break; -// } -// } - -// ASSERT_NE(successCount, TXS_COUNT); -// } - -// TEST_F(TxUsageTable, TEST_NAME(Transactions_Conflict_On_SeqNo)) -// { -// TestTransactionsConflictOnSeqNo(); -// } - -// TEST_F(TxUsageQuery, TEST_NAME(Transactions_Conflict_On_SeqNo)) -// { -// TestTransactionsConflictOnSeqNo(); -// } - TEST_F(TxUsageQuery, TEST_NAME(TestRetentionOnLongTxAndBigMessages)) { // TODO uncomment