diff --git a/include/ydb-cpp-sdk/client/CMakeLists.txt b/include/ydb-cpp-sdk/client/CMakeLists.txt index 631efeb3ed..f4ba413366 100644 --- a/include/ydb-cpp-sdk/client/CMakeLists.txt +++ b/include/ydb-cpp-sdk/client/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(iam/common) +add_subdirectory(iam_private/common) diff --git a/include/ydb-cpp-sdk/client/iam_private/common/CMakeLists.txt b/include/ydb-cpp-sdk/client/iam_private/common/CMakeLists.txt new file mode 100644 index 0000000000..7248fe5131 --- /dev/null +++ b/include/ydb-cpp-sdk/client/iam_private/common/CMakeLists.txt @@ -0,0 +1,8 @@ +_ydb_sdk_add_library(client-iam_private-types INTERFACE) + +target_link_libraries(client-iam_private-types + INTERFACE + client-iam-types +) + +_ydb_sdk_install_targets(client-iam_private-types) diff --git a/include/ydb-cpp-sdk/client/iam_private/common/types.h b/include/ydb-cpp-sdk/client/iam_private/common/types.h new file mode 100644 index 0000000000..f7f070671e --- /dev/null +++ b/include/ydb-cpp-sdk/client/iam_private/common/types.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace NYdb::inline V3 { + +struct TIamServiceParams : TIamEndpoint { + std::string ServiceId; + std::string MicroserviceId; + std::string ResourceId; + std::string ResourceType; + std::string TargetServiceAccountId; +}; + +} diff --git a/include/ydb-cpp-sdk/client/iam_private/iam.h b/include/ydb-cpp-sdk/client/iam_private/iam.h index c4373c9478..48dd5ae61c 100644 --- a/include/ydb-cpp-sdk/client/iam_private/iam.h +++ b/include/ydb-cpp-sdk/client/iam_private/iam.h @@ -1,5 +1,7 @@ #pragma once +#include "common/types.h" + #include namespace NYdb::inline V3 { @@ -10,4 +12,7 @@ TCredentialsProviderFactoryPtr CreateIamJwtFileCredentialsProviderFactoryPrivate /// Acquire an IAM token using JSON Web Token (JWT) contents. TCredentialsProviderFactoryPtr CreateIamJwtParamsCredentialsProviderFactoryPrivate(const TIamJwtContent& param); +/// Acquire an IAM token for system service account (SSA). +TCredentialsProviderFactoryPtr CreateIamServiceCredentialsProviderFactory(const TIamServiceParams& params); + } // namespace NYdb diff --git a/src/api/client/yc_private/iam/iam_token_service.proto b/src/api/client/yc_private/iam/iam_token_service.proto index b6fed77397..d900dff629 100644 --- a/src/api/client/yc_private/iam/iam_token_service.proto +++ b/src/api/client/yc_private/iam/iam_token_service.proto @@ -18,6 +18,9 @@ service IamTokenService { // create iam token for service account rpc CreateForServiceAccount (CreateIamTokenForServiceAccountRequest) returns (CreateIamTokenResponse); + // create iam token for service + rpc CreateForService (CreateIamTokenForServiceRequest) returns (CreateIamTokenResponse); + // create iam token for compute instance rpc CreateForComputeInstance (CreateIamTokenForComputeInstanceRequest) returns (CreateIamTokenResponse); @@ -50,6 +53,14 @@ message CreateIamTokenForServiceAccountRequest { string service_account_id = 1; } +message CreateIamTokenForServiceRequest { + string service_id = 1; + string microservice_id = 2; + string resource_id = 3; + string resource_type = 4; + string target_service_account_id = 5; +} + message CreateIamTokenForComputeInstanceRequest { string service_account_id = 1; string instance_id = 2; diff --git a/src/client/iam/common/iam.h b/src/client/iam/common/iam.h index c7f7742c5a..22bd10e5fc 100644 --- a/src/client/iam/common/iam.h +++ b/src/client/iam/common/iam.h @@ -19,12 +19,19 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { protected: using TRequestFiller = std::function; + using TSimpleRpc = + typename NYdbGrpc::TSimpleRequestProcessor< + typename TService::Stub, + TRequest, + TResponse>::TAsyncRequest; + private: class TImpl : public std::enable_shared_from_this::TImpl> { public: - TImpl(const TIamEndpoint& iamEndpoint, const TRequestFiller& requestFiller) + TImpl(const TIamEndpoint& iamEndpoint, const TRequestFiller& requestFiller, TSimpleRpc rpc) : Client(std::make_unique()) , Connection_(nullptr) + , Rpc_(rpc) , Ticket_("") , NextTicketUpdate_(TInstant::Zero()) , IamEndpoint_(iamEndpoint) @@ -67,7 +74,7 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { Connection_->template DoRequest( std::move(req), std::move(cb), - &TService::Stub::AsyncCreate, + Rpc_, { {}, {}, IamEndpoint_.RequestTimeout } ); @@ -142,9 +149,9 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { } private: - std::unique_ptr Client; std::unique_ptr> Connection_; + TSimpleRpc Rpc_; std::string Ticket_; TInstant NextTicketUpdate_; const TIamEndpoint IamEndpoint_; @@ -157,8 +164,8 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { }; public: - TGrpcIamCredentialsProvider(const TIamEndpoint& endpoint, const TRequestFiller& requestFiller) - : Impl_(std::make_shared(endpoint, requestFiller)) + TGrpcIamCredentialsProvider(const TIamEndpoint& endpoint, const TRequestFiller& requestFiller, TSimpleRpc rpc) + : Impl_(std::make_shared(endpoint, requestFiller, rpc)) { Impl_->UpdateTicket(true); } @@ -186,7 +193,7 @@ class TIamJwtCredentialsProvider : public TGrpcIamCredentialsProvider(params, [jwtParams = params.JwtParams](TRequest& req) { req.set_jwt(MakeSignedJwt(jwtParams)); - }) {} + }, &TService::Stub::AsyncCreate) {} }; template @@ -196,7 +203,7 @@ class TIamOAuthCredentialsProvider : public TGrpcIamCredentialsProvider(params, [token = params.OAuthToken](TRequest& req) { req.set_yandex_passport_oauth_token(TStringType{token}); - }) {} + }, &TService::Stub::AsyncCreate) {} }; template diff --git a/src/client/iam_private/CMakeLists.txt b/src/client/iam_private/CMakeLists.txt index 981a26a58c..0f377d7d4e 100644 --- a/src/client/iam_private/CMakeLists.txt +++ b/src/client/iam_private/CMakeLists.txt @@ -1,3 +1,5 @@ +add_subdirectory(common) + _ydb_sdk_add_library(client-iam_private) target_link_libraries(client-iam_private @@ -6,7 +8,7 @@ target_link_libraries(client-iam_private yutil PRIVATE api-client-yc_private - client-iam-common + client-iam_private-common ) target_sources(client-iam_private diff --git a/src/client/iam_private/common/CMakeLists.txt b/src/client/iam_private/common/CMakeLists.txt new file mode 100644 index 0000000000..244879c537 --- /dev/null +++ b/src/client/iam_private/common/CMakeLists.txt @@ -0,0 +1,9 @@ +_ydb_sdk_add_library(client-iam_private-common INTERFACE) + +target_link_libraries(client-iam_private-common + INTERFACE + client-iam-common + client-iam_private-types +) + +_ydb_sdk_install_targets(client-iam_private-common) diff --git a/src/client/iam_private/common/iam.h b/src/client/iam_private/common/iam.h new file mode 100644 index 0000000000..bbc09b0d34 --- /dev/null +++ b/src/client/iam_private/common/iam.h @@ -0,0 +1,28 @@ +#include + +#include + +namespace NYdb::inline V3 { + +template + +class TIamServiceCredentialsProviderFactory : public ICredentialsProviderFactory { +public: + TIamServiceCredentialsProviderFactory(const TIamServiceParams& params) : Params_(params) {} + + TCredentialsProviderPtr CreateProvider() const final { + return std::make_shared>(Params_, + [params = Params_](TRequest& req) { + req.set_service_id(params.ServiceId); + req.set_microservice_id(params.MicroserviceId); + req.set_resource_id(params.ResourceId); + req.set_resource_type(params.ResourceType); + req.set_target_service_account_id(params.TargetServiceAccountId); + }, &TService::Stub::AsyncCreateForService); + } + +private: + TIamServiceParams Params_; +}; + +} diff --git a/src/client/iam_private/iam.cpp b/src/client/iam_private/iam.cpp index 294f8cc65d..054118e73a 100644 --- a/src/client/iam_private/iam.cpp +++ b/src/client/iam_private/iam.cpp @@ -1,17 +1,19 @@ -#include +#include "common/iam.h" -#include +#include #include #include +using namespace yandex::cloud::priv::iam::v1; + namespace NYdb::inline V3 { TCredentialsProviderFactoryPtr CreateIamJwtCredentialsProviderFactoryImplPrivate(TIamJwtParams&& jwtParams) { return std::make_shared>(std::move(jwtParams)); } @@ -25,4 +27,12 @@ TCredentialsProviderFactoryPtr CreateIamJwtParamsCredentialsProviderFactoryPriva return CreateIamJwtCredentialsProviderFactoryImplPrivate(std::move(jwtParams)); } +TCredentialsProviderFactoryPtr CreateIamServiceCredentialsProviderFactory(const TIamServiceParams& params) { + return std::make_shared>(std::move(params)); +} + } diff --git a/src/client/resources/ydb_sdk_version.txt b/src/client/resources/ydb_sdk_version.txt index a0cd9f0ccb..a4f52a5dbb 100644 --- a/src/client/resources/ydb_sdk_version.txt +++ b/src/client/resources/ydb_sdk_version.txt @@ -1 +1 @@ -3.1.0 \ No newline at end of file +3.2.0 \ No newline at end of file diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index a057b113f8..e0ae3a9a9a 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -94,6 +94,8 @@ class TFixture : public NUnitTest::TBaseFixture { TDuration stabilizationWindow, ui64 downUtilizationPercent, ui64 upUtilizationPercent); + void SetPartitionWriteSpeed(const std::string& topicPath, + size_t bytesPerSeconds); void WriteToTopicWithInvalidTxId(bool invalidTxId); @@ -511,6 +513,18 @@ void TFixture::AlterAutoPartitioning(const TString& topicPath, UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } +void TFixture::SetPartitionWriteSpeed(const std::string& topicPath, + size_t bytesPerSeconds) +{ + NTopic::TTopicClient client(GetDriver()); + NTopic::TAlterTopicSettings settings; + + settings.SetPartitionWriteSpeedBytesPerSecond(bytesPerSeconds); + + auto result = client.AlterTopic(topicPath, settings).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); +} + TTopicDescription TFixture::DescribeTopic(const TString& path) { return Setup->DescribeTopic(path); @@ -3005,9 +3019,6 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks) Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture) { - // Consumes a lot of memory. Temporarily disabled - return; - // The test verifies the simultaneous execution of several transactions. There is a topic // with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions. // The size of the messages is random. Such that both large blobs in the body and small ones in @@ -3019,6 +3030,8 @@ Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture) CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); + SetPartitionWriteSpeed("topic_A", 50'000'000); + std::vector sessions; std::vector transactions; @@ -3059,6 +3072,141 @@ Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture) } } +Y_UNIT_TEST_F(Write_Only_Big_Messages_In_Wide_Transactions, TFixture) +{ + // The test verifies the simultaneous execution of several transactions. There is a topic `topic_A` and + // it contains a `PARTITIONS_COUNT' of partitions. In each transaction, the test writes to all partitions. + // The size of the messages is chosen so that only large blobs are recorded in the transaction and there + // are no records in the head. Thus, we verify that transaction bundling is working correctly. + + const size_t PARTITIONS_COUNT = 20; + const size_t TXS_COUNT = 100; + + CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); + + SetPartitionWriteSpeed("topic_A", 50'000'000); + + std::vector sessions; + std::vector transactions; + + // We open TXS_COUNT transactions and write messages to the topic. + for (size_t i = 0; i < TXS_COUNT; ++i) { + sessions.push_back(CreateTableSession()); + auto& session = sessions.back(); + + transactions.push_back(BeginTx(session)); + auto& tx = transactions.back(); + + for (size_t j = 0; j < PARTITIONS_COUNT; ++j) { + TString sourceId = TEST_MESSAGE_GROUP_ID; + sourceId += "_"; + sourceId += ToString(i); + sourceId += "_"; + sourceId += ToString(j); + + WriteToTopic("topic_A", sourceId, TString(6'500'000, 'x'), &tx, j); + + WaitForAcks("topic_A", sourceId); + } + } + + // We are doing an asynchronous commit of transactions. They will be executed simultaneously. + std::vector futures; + + for (size_t i = 0; i < TXS_COUNT; ++i) { + futures.push_back(transactions[i].Commit()); + } + + // All transactions must be completed successfully. + for (size_t i = 0; i < TXS_COUNT; ++i) { + futures[i].Wait(); + const auto& result = futures[i].GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } +} + +Y_UNIT_TEST_F(Transactions_Conflict_On_SeqNo, TFixture) +{ + const ui32 PARTITIONS_COUNT = 20; + const size_t TXS_COUNT = 100; + + CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); + + SetPartitionWriteSpeed("topic_A", 50'000'000); + + auto tableSession = CreateTableSession(); + std::vector> topicWriteSessions; + + for (ui32 i = 0; i < PARTITIONS_COUNT; ++i) { + TString sourceId = TEST_MESSAGE_GROUP_ID; + sourceId += "_"; + sourceId += ToString(i); + + NTopic::TTopicClient client(GetDriver()); + NTopic::TWriteSessionSettings options; + options.Path("topic_A"); + options.ProducerId(sourceId); + options.MessageGroupId(sourceId); + options.PartitionId(i); + options.Codec(ECodec::RAW); + + auto session = client.CreateSimpleBlockingWriteSession(options); + + topicWriteSessions.push_back(std::move(session)); + } + + std::vector sessions; + std::vector transactions; + + for (size_t i = 0; i < TXS_COUNT; ++i) { + sessions.push_back(CreateTableSession()); + auto& session = sessions.back(); + + transactions.push_back(BeginTx(session)); + auto& tx = transactions.back(); + + for (size_t j = 0; j < PARTITIONS_COUNT; ++j) { + TString sourceId = TEST_MESSAGE_GROUP_ID; + sourceId += "_"; + sourceId += ToString(j); + + for (size_t k = 0, count = RandomNumber(20) + 1; k < count; ++k) { + const std::string data(RandomNumber(1'000) + 100, 'x'); + NTopic::TWriteMessage params(data); + params.Tx(tx); + + topicWriteSessions[j]->Write(std::move(params)); + } + } + } + + std::vector futures; + + for (size_t i = 0; i < TXS_COUNT; ++i) { + futures.push_back(transactions[i].Commit()); + } + + // Some transactions should end with the error `ABORTED` + size_t successCount = 0; + + for (size_t i = 0; i < TXS_COUNT; ++i) { + futures[i].Wait(); + const auto& result = futures[i].GetValueSync(); + switch (result.GetStatus()) { + case EStatus::SUCCESS: + ++successCount; + break; + case EStatus::ABORTED: + break; + default: + UNIT_FAIL("unexpected status: " << static_cast(result)); + break; + } + } + + UNIT_ASSERT_VALUES_UNEQUAL(successCount, TXS_COUNT); +} + } }