diff --git a/include/ydb-cpp-sdk/client/bsconfig/storage_config.h b/include/ydb-cpp-sdk/client/bsconfig/storage_config.h index 151a81e943..9b70806757 100644 --- a/include/ydb-cpp-sdk/client/bsconfig/storage_config.h +++ b/include/ydb-cpp-sdk/client/bsconfig/storage_config.h @@ -14,17 +14,24 @@ namespace NYdb::inline V3::NStorageConfig { struct TFetchStorageConfigResult : public TStatus { TFetchStorageConfigResult( TStatus&& status, - std::string&& config) + std::string&& config, + std::string&& storage_config) : TStatus(std::move(status)) , Config_(std::move(config)) + , StorageConfig_(std::move(storage_config)) {} const std::string& GetConfig() const { return Config_; } + const std::string& GetStorageConfig() const { + return StorageConfig_; + } + private: std::string Config_; + std::string StorageConfig_; }; using TAsyncFetchStorageConfigResult = NThreading::TFuture; @@ -39,10 +46,14 @@ class TStorageConfigClient { ~TStorageConfigClient(); // Replace config - TAsyncStatus ReplaceStorageConfig(const std::string& config); + TAsyncStatus ReplaceStorageConfig(const std::optional& yaml_config, + const std::optional& storage_yaml_config, + std::optional switch_dedicated_storage_section, + bool dedicated_config_mode); // Fetch current cluster storage config - TAsyncFetchStorageConfigResult FetchStorageConfig(const TStorageConfigSettings& settings = {}); + TAsyncFetchStorageConfigResult FetchStorageConfig(bool dedicated_storage_section, bool dedicated_cluster_section, + const TStorageConfigSettings& settings = {}); // Bootstrap cluster with automatic configuration TAsyncStatus BootstrapCluster(const std::string& selfAssemblyUUID); diff --git a/include/ydb-cpp-sdk/client/debug/client.h b/include/ydb-cpp-sdk/client/debug/client.h index 7ee45e5a4f..56568743c5 100644 --- a/include/ydb-cpp-sdk/client/debug/client.h +++ b/include/ydb-cpp-sdk/client/debug/client.h @@ -41,6 +41,13 @@ class TTxProxyPingResult: public TStatus { {} }; +class TActorChainPingResult: public TStatus { +public: + TActorChainPingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + //////////////////////////////////////////////////////////////////////////////// using TAsyncPlainGrpcPingResult = NThreading::TFuture; @@ -48,14 +55,21 @@ using TAsyncGrpcProxyPingResult = NThreading::TFuture; using TAsyncKqpProxyPingResult = NThreading::TFuture; using TAsyncSchemeCachePingResult = NThreading::TFuture; using TAsyncTxProxyPingResult = NThreading::TFuture; +using TAsyncActorChainPingResult = NThreading::TFuture; //////////////////////////////////////////////////////////////////////////////// -struct TPlainGrpcPingSettings : public TOperationRequestSettings {}; -struct TGrpcProxyPingSettings : public TOperationRequestSettings {}; -struct TKqpProxyPingSettings : public TOperationRequestSettings {}; -struct TSchemeCachePingSettings : public TOperationRequestSettings {}; -struct TTxProxyPingSettings : public TOperationRequestSettings {}; +struct TPlainGrpcPingSettings : public TSimpleRequestSettings {}; +struct TGrpcProxyPingSettings : public TSimpleRequestSettings {}; +struct TKqpProxyPingSettings : public TSimpleRequestSettings {}; +struct TSchemeCachePingSettings : public TSimpleRequestSettings {}; +struct TTxProxyPingSettings : public TSimpleRequestSettings {}; + +struct TActorChainPingSettings : public TSimpleRequestSettings { + FLUENT_SETTING_DEFAULT(size_t, ChainLength, 10); + FLUENT_SETTING_DEFAULT(size_t, WorkUsec, 5); + FLUENT_SETTING_DEFAULT(bool, NoTailChain, false); +}; //////////////////////////////////////////////////////////////////////////////// @@ -75,6 +89,8 @@ class TDebugClient { TAsyncSchemeCachePingResult PingSchemeCache(const TSchemeCachePingSettings& settings); TAsyncTxProxyPingResult PingTxProxy(const TTxProxyPingSettings& settings); + TAsyncActorChainPingResult PingActorChain(const TActorChainPingSettings& settings); + private: class TImpl; std::shared_ptr Impl_; diff --git a/include/ydb-cpp-sdk/client/query/client.h b/include/ydb-cpp-sdk/client/query/client.h index 980bf2b079..8238633e96 100644 --- a/include/ydb-cpp-sdk/client/query/client.h +++ b/include/ydb-cpp-sdk/client/query/client.h @@ -151,6 +151,7 @@ class TSession { class TImpl; private: TSession(); + TSession(std::shared_ptr client); // Create broken session TSession(std::shared_ptr client, TSession::TImpl* sessionImpl); std::shared_ptr Client_; @@ -210,7 +211,10 @@ class TExecuteQueryPart : public TStreamPartStatus { const TResultSet& GetResultSet() const { return *ResultSet_; } TResultSet ExtractResultSet() { return std::move(*ResultSet_); } + bool HasStats() const { return Stats_.has_value(); } const std::optional& GetStats() const { return Stats_; } + TExecStats ExtractStats() const { return std::move(*Stats_); } + const std::optional& GetTransaction() const { return Transaction_; } TExecuteQueryPart(TStatus&& status, std::optional&& queryStats, std::optional&& tx) diff --git a/include/ydb-cpp-sdk/client/query/query.h b/include/ydb-cpp-sdk/client/query/query.h index 4e3616a436..6f460df125 100644 --- a/include/ydb-cpp-sdk/client/query/query.h +++ b/include/ydb-cpp-sdk/client/query/query.h @@ -76,6 +76,7 @@ struct TExecuteQuerySettings : public TRequestSettings { FLUENT_SETTING_DEFAULT(EStatsMode, StatsMode, EStatsMode::None); FLUENT_SETTING_OPTIONAL(bool, ConcurrentResultSets); FLUENT_SETTING(std::string, ResourcePool); + FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, StatsCollectPeriod); }; struct TBeginTxSettings : public TRequestSettings {}; diff --git a/include/ydb-cpp-sdk/client/topic/control_plane.h b/include/ydb-cpp-sdk/client/topic/control_plane.h index 4e25523f9e..b2fa18b792 100644 --- a/include/ydb-cpp-sdk/client/topic/control_plane.h +++ b/include/ydb-cpp-sdk/client/topic/control_plane.h @@ -19,7 +19,7 @@ namespace NYdb::inline V3 { } namespace NYdb::inline V3::NTopic { - + enum class EMeteringMode : uint32_t { Unspecified = 0, ReservedCapacity = 1, @@ -187,6 +187,8 @@ friend struct TAutoPartitioningSettingsBuilder; , DownUtilizationPercent_(downUtilizationPercent) , UpUtilizationPercent_(upUtilizationPercent) {} + void SerializeTo(Ydb::Topic::AutoPartitioningSettings& proto) const; + EAutoPartitioningStrategy GetStrategy() const; TDuration GetStabilizationWindow() const; ui32 GetDownUtilizationPercent() const; @@ -228,6 +230,8 @@ class TPartitioningSettings { { } + void SerializeTo(Ydb::Topic::PartitioningSettings& proto) const; + uint64_t GetMinActivePartitions() const; uint64_t GetMaxActivePartitions() const; uint64_t GetPartitionCountLimit() const; @@ -437,8 +441,11 @@ struct TConsumerSettings { using TAttributes = std::map; - TConsumerSettings(TSettings& parent): Parent_(parent) {} + TConsumerSettings(TSettings& parent) : Parent_(parent) {} TConsumerSettings(TSettings& parent, const std::string& name) : ConsumerName_(name), Parent_(parent) {} + TConsumerSettings(TSettings& parent, const Ydb::Topic::Consumer& proto); + + void SerializeTo(Ydb::Topic::Consumer& proto) const; FLUENT_SETTING(std::string, ConsumerName); FLUENT_SETTING_DEFAULT(bool, Important, false); @@ -526,6 +533,11 @@ struct TCreateTopicSettings : public TOperationRequestSettings; + TCreateTopicSettings() = default; + TCreateTopicSettings(const Ydb::Topic::CreateTopicRequest& proto); + + void SerializeTo(Ydb::Topic::CreateTopicRequest& proto) const; + FLUENT_SETTING(TPartitioningSettings, PartitioningSettings); FLUENT_SETTING_DEFAULT(TDuration, RetentionPeriod, TDuration::Hours(24)); diff --git a/src/api/grpc/ydb_debug_v1.proto b/src/api/grpc/ydb_debug_v1.proto index 0905b8412d..2df8b20cb9 100644 --- a/src/api/grpc/ydb_debug_v1.proto +++ b/src/api/grpc/ydb_debug_v1.proto @@ -11,4 +11,5 @@ service DebugService { rpc PingKqpProxy(Debug.KqpProxyRequest) returns (Debug.KqpProxyResponse); rpc PingSchemeCache(Debug.SchemeCacheRequest) returns (Debug.SchemeCacheResponse); rpc PingTxProxy(Debug.TxProxyRequest) returns (Debug.TxProxyResponse); + rpc PingActorChain(Debug.ActorChainRequest) returns (Debug.ActorChainResponse); } diff --git a/src/api/protos/ydb_bsconfig.proto b/src/api/protos/ydb_bsconfig.proto index a75d947502..814c824149 100644 --- a/src/api/protos/ydb_bsconfig.proto +++ b/src/api/protos/ydb_bsconfig.proto @@ -15,7 +15,10 @@ import "src/api/protos/ydb_operation.proto"; message ReplaceStorageConfigRequest { Ydb.Operations.OperationParams operation_params = 1; - string yaml_config = 2; + optional string yaml_config = 2; // cluster yaml config + optional string storage_yaml_config = 3; // dedicated storage yaml config (when dual-config mode is enabled) + optional bool switch_dedicated_storage_section = 4; // if filled, can turn on or off dedicated section of YAML config + bool dedicated_config_mode = 5; // if true, then user expects system to work in dual-config mode } message ReplaceStorageConfigResponse { @@ -27,6 +30,8 @@ message ReplaceStorageConfigResult { message FetchStorageConfigRequest { Ydb.Operations.OperationParams operation_params = 1; + bool dedicated_storage_section = 2; + bool dedicated_cluster_section = 3; } message FetchStorageConfigResponse { @@ -34,7 +39,8 @@ message FetchStorageConfigResponse { } message FetchStorageConfigResult { - string yaml_config = 1; + optional string yaml_config = 1; + optional string storage_yaml_config = 2; } message BootstrapClusterRequest { diff --git a/src/api/protos/ydb_debug.proto b/src/api/protos/ydb_debug.proto index 64d01c52dc..aefd20f6a7 100644 --- a/src/api/protos/ydb_debug.proto +++ b/src/api/protos/ydb_debug.proto @@ -57,3 +57,22 @@ message TxProxyResponse { StatusIds.StatusCode status = 1; repeated Ydb.Issue.IssueMessage issues = 2; } + +// Ping Actor Chain + +message ActorChainRequest { + + // number of actors to be created, default 10 + uint32 ChainLength = 1; + + // immitate work duration for each actor (approximate), default ~ 5 usec + uint32 WorkUsec = 2; + + // don't use tail sends and registrations + bool NoTailChain = 3; +} + +message ActorChainResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} diff --git a/src/api/protos/ydb_query.proto b/src/api/protos/ydb_query.proto index ab9f3cabb0..d3e61131cb 100644 --- a/src/api/protos/ydb_query.proto +++ b/src/api/protos/ydb_query.proto @@ -174,7 +174,13 @@ message ExecuteQueryRequest { // Allows to set size limitation (in bytes) for one result part int64 response_part_limit_bytes = 9 [(Ydb.value) = "[0; 33554432]"]; - string pool_id = 10; // Workload manager pool id + // Workload manager pool id + string pool_id = 10; + + // Time interval for sending periodical query statistics. + // When query statistics are enabled (stats_mode != STATS_MODE_NONE), by default statistics will be sent only once after query execution is finished. + // In case when stats_period_ms is specified and is non-zero, query statistics will be additionally sent every stats_period_ms milliseconds beginning from the start of query execution. + int64 stats_period_ms = 11 [(Ydb.value) = ">= 0"]; } message ResultSetMeta { diff --git a/src/api/protos/ydb_rate_limiter.proto b/src/api/protos/ydb_rate_limiter.proto index 74e05772ea..23dc9b85b1 100644 --- a/src/api/protos/ydb_rate_limiter.proto +++ b/src/api/protos/ydb_rate_limiter.proto @@ -63,6 +63,9 @@ message MeteringConfig { // Default value is inherited from parent or equals 60 seconds for root. uint64 billing_period_sec = 2; + // User-defined labels. + map labels = 3 [(map_key).length.le = 256, (length).le = 10240, (size).le = 100]; + // Billing metric JSON fields (inherited from parent if not set) google.protobuf.Struct metric_fields = 10; } diff --git a/src/client/bsconfig/storage_config.cpp b/src/client/bsconfig/storage_config.cpp index 8a6307f474..0f472abfd6 100644 --- a/src/client/bsconfig/storage_config.cpp +++ b/src/client/bsconfig/storage_config.cpp @@ -14,28 +14,51 @@ class TStorageConfigClient::TImpl : public TClientImplCommon& yaml_config, + const std::optional& storage_yaml_config, + std::optional switch_dedicated_storage_section, + bool dedicated_config_mode) { auto request = MakeRequest(); - request.set_yaml_config(config); + + if (yaml_config) { + request.set_yaml_config(*yaml_config); + } + if (storage_yaml_config) { + request.set_storage_yaml_config(*storage_yaml_config); + } + if (switch_dedicated_storage_section) { + request.set_switch_dedicated_storage_section(*switch_dedicated_storage_section); + } + request.set_dedicated_config_mode(dedicated_config_mode); return RunSimple( std::move(request), &Ydb::BSConfig::V1::BSConfigService::Stub::AsyncReplaceStorageConfig); } - TAsyncFetchStorageConfigResult FetchStorageConfig(const TStorageConfigSettings& settings = {}) { + TAsyncFetchStorageConfigResult FetchStorageConfig(bool dedicated_storage_section, bool dedicated_cluster_section, + const TStorageConfigSettings& settings = {}) { auto request = MakeOperationRequest(settings); + if (dedicated_storage_section) { + request.set_dedicated_storage_section(true); + } + if (dedicated_cluster_section) { + request.set_dedicated_cluster_section(true); + } auto promise = NThreading::NewPromise(); auto extractor = [promise] (google::protobuf::Any* any, TPlainStatus status) mutable { - NYdb::TStringType config; - if (Ydb::BSConfig::FetchStorageConfigResult result; any && any->UnpackTo(&result)) { - config = result.yaml_config(); - } - - TFetchStorageConfigResult val(TStatus(std::move(status)), std::string{std::move(config)}); - promise.SetValue(std::move(val)); - }; + NYdb::TStringType config; + NYdb::TStringType storage_config; + if (Ydb::BSConfig::FetchStorageConfigResult result; any && any->UnpackTo(&result)) { + config = result.yaml_config(); + storage_config = result.storage_yaml_config(); + } + + TFetchStorageConfigResult val(TStatus(std::move(status)), std::string{std::move(config)}, + std::string{std::move(storage_config)}); + promise.SetValue(std::move(val)); + }; Connections_->RunDeferred( std::move(request), @@ -62,12 +85,16 @@ TStorageConfigClient::TStorageConfigClient(const TDriver& driver, const TCommonC TStorageConfigClient::~TStorageConfigClient() = default; -TAsyncStatus TStorageConfigClient::ReplaceStorageConfig(const std::string& config) { - return Impl_->ReplaceStorageConfig(config); +TAsyncStatus TStorageConfigClient::ReplaceStorageConfig(const std::optional& yaml_config, + const std::optional& storage_yaml_config, std::optional switch_dedicated_storage_section, + bool dedicated_config_mode) { + return Impl_->ReplaceStorageConfig(yaml_config, storage_yaml_config, switch_dedicated_storage_section, + dedicated_config_mode); } -TAsyncFetchStorageConfigResult TStorageConfigClient::FetchStorageConfig(const TStorageConfigSettings& settings) { - return Impl_->FetchStorageConfig(settings); +TAsyncFetchStorageConfigResult TStorageConfigClient::FetchStorageConfig(bool dedicated_storage_section, + bool dedicated_cluster_section, const TStorageConfigSettings& settings) { + return Impl_->FetchStorageConfig(dedicated_storage_section, dedicated_cluster_section, settings); } TAsyncStatus TStorageConfigClient::BootstrapCluster(const std::string& selfAssemblyUUID) { diff --git a/src/client/debug/client.cpp b/src/client/debug/client.cpp index e3542e3633..d00173b423 100644 --- a/src/client/debug/client.cpp +++ b/src/client/debug/client.cpp @@ -40,6 +40,28 @@ class TDebugClient::TImpl: public TClientImplCommon { return pingPromise; } + auto PingActorChain(const TActorChainPingSettings& settings) { + auto pingPromise = NewPromise(); + auto responseCb = [pingPromise] (Debug::ActorChainResponse*, TPlainStatus status) mutable { + TActorChainPingResult val(TStatus(std::move(status))); + pingPromise.SetValue(std::move(val)); + }; + + Debug::ActorChainRequest request; + request.set_chainlength(settings.ChainLength_); + request.set_workusec(settings.WorkUsec_); + request.set_notailchain(settings.NoTailChain_); + + Connections_->Run( + std::move(request), + responseCb, + &Debug::V1::DebugService::Stub::AsyncPingActorChain, + DbDriverState_, + TRpcRequestSettings::Make(settings)); + + return pingPromise; + } + ~TImpl() = default; }; @@ -73,4 +95,8 @@ TAsyncTxProxyPingResult TDebugClient::PingTxProxy(const TTxProxyPingSettings& se settings, &Debug::V1::DebugService::Stub::AsyncPingTxProxy); } -} // namespace NYdb::NDebug +TAsyncActorChainPingResult TDebugClient::PingActorChain(const TActorChainPingSettings& settings) { + return Impl_->PingActorChain(settings); +} + +} // namespace NYdb::V3::NDebug diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index 8182d9d30b..0fef4f75fb 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -361,7 +361,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public TSession::TImpl::MakeImplAsync(processor, args); } else { TStatus st(std::move(status)); - args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession())); + args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession(args->Client))); } }, &Ydb::Query::V1::QueryService::Stub::AsyncAttachSession, @@ -384,13 +384,13 @@ class TQueryClient::TImpl: public TClientImplCommon, public NYdb::NIssue::TIssues opIssues; NYdb::NIssue::IssuesFromMessage(resp->issues(), opIssues); TStatus st(static_cast(resp->status()), std::move(opIssues)); - promise.SetValue(TCreateSessionResult(std::move(st), TSession())); + promise.SetValue(TCreateSessionResult(std::move(st), TSession(self))); } else { self->DoAttachSession(resp, promise, status.Endpoint, self); } } else { TStatus st(std::move(status)); - promise.SetValue(TCreateSessionResult(std::move(st), TSession())); + promise.SetValue(TCreateSessionResult(std::move(st), TSession(self))); } }; @@ -631,6 +631,14 @@ TSession TCreateSessionResult::GetSession() const { TSession::TSession() {} +TSession::TSession(std::shared_ptr client) + : Client_(client) + , SessionImpl_( + new TSession::TImpl(nullptr, "", "", client), + TKqpSessionCommon::GetSmartDeleter(client) + ) +{} + TSession::TSession(std::shared_ptr client, TSession::TImpl* session) : Client_(client) , SessionImpl_(session, TKqpSessionCommon::GetSmartDeleter(client)) diff --git a/src/client/query/impl/client_session.cpp b/src/client/query/impl/client_session.cpp index 8b1c89a8fa..f7f733c92a 100644 --- a/src/client/query/impl/client_session.cpp +++ b/src/client/query/impl/client_session.cpp @@ -79,13 +79,12 @@ void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptrTrySharedOwning(); if (impl) { impl->CloseFromServer(client); holder->Release(); } - break; } } }); @@ -96,14 +95,21 @@ TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const std::string& sessionId, co , StreamProcessor_(ptr) , SessionHolder(std::make_shared(this)) { - MarkActive(); - SetNeedUpdateActiveCounter(true); - StartAsyncRead(StreamProcessor_, client, SessionHolder); + if (ptr) { + MarkActive(); + SetNeedUpdateActiveCounter(true); + StartAsyncRead(StreamProcessor_, client, SessionHolder); + } else { + MarkBroken(); + SetNeedUpdateActiveCounter(true); + } } TSession::TImpl::~TImpl() { - StreamProcessor_->Cancel(); + if (StreamProcessor_) { + StreamProcessor_->Cancel(); + } SessionHolder->WaitAndLock(); } @@ -114,7 +120,7 @@ void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr, ptr->Read(resp.get(), [args, resp, ptr](NYdbGrpc::TGrpcStatus grpcStatus) mutable { if (grpcStatus.GRpcStatusCode != grpc::StatusCode::OK) { TStatus st(TPlainStatus(grpcStatus, args->Endpoint)); - args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession())); + args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession(args->Client))); } else { if (resp->status() == Ydb::StatusIds::SUCCESS) { @@ -125,7 +131,7 @@ void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr, NYdb::NIssue::TIssues opIssues; NYdb::NIssue::IssuesFromMessage(resp->issues(), opIssues); TStatus st(static_cast(resp->status()), std::move(opIssues)); - args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession())); + args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession(args->Client))); } } }); diff --git a/src/client/query/impl/exec_query.cpp b/src/client/query/impl/exec_query.cpp index 0e8899abe4..bc76b358a6 100644 --- a/src/client/query/impl/exec_query.cpp +++ b/src/client/query/impl/exec_query.cpp @@ -246,6 +246,10 @@ TFuture> StreamExecuteQueryIm request.set_response_part_limit_bytes(*settings.OutputChunkMaxSize_); } + if (settings.StatsCollectPeriod_) { + request.set_stats_period_ms((*settings.StatsCollectPeriod_).count()); + } + if (txControl.HasTx()) { auto requestTxControl = request.mutable_tx_control(); requestTxControl->set_commit_tx(txControl.CommitTx_); diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index b02471c9c4..99121ffd5b 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -1365,13 +1365,15 @@ TScanQueryPartIterator::TScanQueryPartIterator( {} TAsyncScanQueryPart TScanQueryPartIterator::ReadNext() { - if (ReaderImpl_->IsFinished()) + if (!ReaderImpl_ || ReaderImpl_->IsFinished()) { + if (!IsSuccess()) + RaiseError(TStringBuilder() << "Attempt to perform read on an unsuccessful result " + << GetIssues().ToString()); RaiseError("Attempt to perform read on invalid or finished stream"); + } return ReaderImpl_->ReadNext(ReaderImpl_); } - - static bool IsSessionStatusRetriable(const TCreateSessionResult& res) { switch (res.GetStatus()) { case EStatus::OVERLOADED: diff --git a/src/client/topic/impl/topic.cpp b/src/client/topic/impl/topic.cpp index ea3d0b59ee..8a0eb5f9f3 100644 --- a/src/client/topic/impl/topic.cpp +++ b/src/client/topic/impl/topic.cpp @@ -184,8 +184,15 @@ const std::vector& TTopicDescription::GetConsumers() const { } void TTopicDescription::SerializeTo(Ydb::Topic::CreateTopicRequest& request) const { - Y_UNUSED(request); - Y_ABORT("Not implemented"); + *request.mutable_partitioning_settings() = Proto_.partitioning_settings(); + *request.mutable_retention_period() = Proto_.retention_period(); + request.set_retention_storage_mb(Proto_.retention_storage_mb()); + *request.mutable_supported_codecs() = Proto_.supported_codecs(); + request.set_partition_write_speed_bytes_per_second(Proto_.partition_write_speed_bytes_per_second()); + request.set_partition_write_burst_bytes(Proto_.partition_write_burst_bytes()); + *request.mutable_attributes() = Proto_.attributes(); + *request.mutable_consumers() = Proto_.consumers(); + request.set_metering_mode(Proto_.metering_mode()); } const Ydb::Topic::DescribeTopicResult& TTopicDescription::GetProto() const { @@ -227,6 +234,13 @@ TPartitioningSettings::TPartitioningSettings(const Ydb::Topic::PartitioningSetti , AutoPartitioningSettings_(settings.auto_partitioning_settings()) {} +void TPartitioningSettings::SerializeTo(Ydb::Topic::PartitioningSettings& proto) const { + proto.set_min_active_partitions(MinActivePartitions_); + proto.set_max_active_partitions(MaxActivePartitions_); + proto.set_partition_count_limit(PartitionCountLimit_); + AutoPartitioningSettings_.SerializeTo(*proto.mutable_auto_partitioning_settings()); +} + uint64_t TPartitioningSettings::GetMinActivePartitions() const { return MinActivePartitions_; } @@ -250,6 +264,14 @@ TAutoPartitioningSettings::TAutoPartitioningSettings(const Ydb::Topic::AutoParti , UpUtilizationPercent_(settings.partition_write_speed().up_utilization_percent()) {} +void TAutoPartitioningSettings::SerializeTo(Ydb::Topic::AutoPartitioningSettings& proto) const { + proto.set_strategy(static_cast(Strategy_)); + auto& writeSpeed = *proto.mutable_partition_write_speed(); + writeSpeed.mutable_stabilization_window()->set_seconds(StabilizationWindow_.Seconds()); + writeSpeed.set_down_utilization_percent(DownUtilizationPercent_); + writeSpeed.set_up_utilization_percent(UpUtilizationPercent_); +} + EAutoPartitioningStrategy TAutoPartitioningSettings::GetStrategy() const { return Strategy_; } @@ -535,4 +557,109 @@ TAsyncStatus TTopicClient::CommitOffset(const std::string& path, uint64_t partit return Impl_->CommitOffset(path, partitionId, consumerName, offset, settings); } +namespace { + +Ydb::Topic::SupportedCodecs SerializeCodecs(const std::vector& codecs) { + Ydb::Topic::SupportedCodecs proto; + for (ECodec codec : codecs) { + proto.add_codecs(static_cast(codec)); + } + return proto; +} + +std::vector DeserializeCodecs(const Ydb::Topic::SupportedCodecs& proto) { + std::vector codecs; + codecs.reserve(proto.codecs_size()); + for (int codec : proto.codecs()) { + codecs.emplace_back(static_cast(codec)); + } + return codecs; +} + +google::protobuf::Map SerializeAttributes(const std::map& attributes) { + google::protobuf::Map proto; + for (const auto& [key, value] : attributes) { + proto[key] = value; + } + return proto; +} + +std::map DeserializeAttributes(const google::protobuf::Map& proto) { + std::map attributes; + for (const auto& [key, value] : proto) { + attributes.emplace(key, value); + } + return attributes; +} + +template +google::protobuf::RepeatedPtrField SerializeConsumers(const std::vector>& consumers) { + google::protobuf::RepeatedPtrField proto; + proto.Reserve(consumers.size()); + for (const auto& consumer : consumers) { + consumer.SerializeTo(*proto.Add()); + } + return proto; +} + +template +std::vector> DeserializeConsumers(TSettings& parent, const google::protobuf::RepeatedPtrField& proto) { + std::vector> consumers; + consumers.reserve(proto.size()); + for (const auto& consumer : proto) { + consumers.emplace_back(TConsumerSettings(parent, consumer)); + } + return consumers; +} + +} + +template +TConsumerSettings::TConsumerSettings(TSettings& parent, const Ydb::Topic::Consumer& proto) + : ConsumerName_(proto.name()) + , Important_(proto.important()) + , ReadFrom_(TInstant::Seconds(proto.read_from().seconds())) + , SupportedCodecs_(DeserializeCodecs(proto.supported_codecs())) + , Attributes_(DeserializeAttributes(proto.attributes())) + , Parent_(parent) +{ +} + +template +void TConsumerSettings::SerializeTo(Ydb::Topic::Consumer& proto) const { + proto.set_name(ConsumerName_); + proto.set_important(Important_); + proto.mutable_read_from()->set_seconds(ReadFrom_.Seconds()); + *proto.mutable_supported_codecs() = SerializeCodecs(SupportedCodecs_); + *proto.mutable_attributes() = SerializeAttributes(Attributes_); +} + +template struct TConsumerSettings; +template struct TConsumerSettings; + +TCreateTopicSettings::TCreateTopicSettings(const Ydb::Topic::CreateTopicRequest& proto) + : PartitioningSettings_(TPartitioningSettings(proto.partitioning_settings())) + , RetentionPeriod_(TDuration::Seconds(proto.retention_period().seconds())) + , SupportedCodecs_(DeserializeCodecs(proto.supported_codecs())) + , RetentionStorageMb_(proto.retention_storage_mb()) + , MeteringMode_(TProtoAccessor::FromProto(proto.metering_mode())) + , PartitionWriteSpeedBytesPerSecond_(proto.partition_write_speed_bytes_per_second()) + , PartitionWriteBurstBytes_(proto.partition_write_burst_bytes()) + , Attributes_(DeserializeAttributes(proto.attributes())) +{ + Consumers_ = DeserializeConsumers(*this, proto.consumers()); +} + +void TCreateTopicSettings::SerializeTo(Ydb::Topic::CreateTopicRequest& request) const { + PartitioningSettings_.SerializeTo(*request.mutable_partitioning_settings()); + request.mutable_retention_period()->set_seconds(RetentionPeriod_.Seconds()); + *request.mutable_supported_codecs() = SerializeCodecs(SupportedCodecs_); + request.set_retention_storage_mb(RetentionStorageMb_); + request.set_metering_mode(TProtoAccessor::GetProto(MeteringMode_)); + request.set_partition_write_speed_bytes_per_second(PartitionWriteSpeedBytesPerSecond_); + request.set_partition_write_burst_bytes(PartitionWriteBurstBytes_); + *request.mutable_consumers() = SerializeConsumers(Consumers_); + *request.mutable_attributes() = SerializeAttributes(Attributes_); +} + } // namespace NYdb::NTopic diff --git a/src/client/topic/impl/topic_impl.h b/src/client/topic/impl/topic_impl.h index 494bd07294..c5dfe60741 100644 --- a/src/client/topic/impl/topic_impl.h +++ b/src/client/topic/impl/topic_impl.h @@ -42,20 +42,6 @@ class TTopicClient::TImpl : public TClientImplCommon { { } - template - static void ConvertConsumerToProto(const TConsumerSettings& settings, Ydb::Topic::Consumer& consumerProto) { - consumerProto.set_name(TStringType{settings.ConsumerName_}); - consumerProto.set_important(settings.Important_); - consumerProto.mutable_read_from()->set_seconds(settings.ReadFrom_.Seconds()); - - for (const auto& codec : settings.SupportedCodecs_) { - consumerProto.mutable_supported_codecs()->add_codecs((static_cast(codec))); - } - for (auto& pair : settings.Attributes_) { - (*consumerProto.mutable_attributes())[pair.first] = pair.second; - } - } - static void ConvertAlterConsumerToProto(const TAlterConsumerSettings& settings, Ydb::Topic::AlterConsumer& consumerProto) { consumerProto.set_name(TStringType{settings.ConsumerName_}); if (settings.SetImportant_) @@ -78,34 +64,7 @@ class TTopicClient::TImpl : public TClientImplCommon { static Ydb::Topic::CreateTopicRequest MakePropsCreateRequest(const std::string& path, const TCreateTopicSettings& settings) { Ydb::Topic::CreateTopicRequest request = MakeOperationRequest(settings); request.set_path(TStringType{path}); - - request.mutable_partitioning_settings()->set_min_active_partitions(settings.PartitioningSettings_.GetMinActivePartitions()); - request.mutable_partitioning_settings()->set_partition_count_limit(settings.PartitioningSettings_.GetPartitionCountLimit()); - request.mutable_partitioning_settings()->set_max_active_partitions(settings.PartitioningSettings_.GetMaxActivePartitions()); - request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(static_cast(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetStrategy())); - request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetStabilizationWindow().Seconds()); - request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetUpUtilizationPercent()); - request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetDownUtilizationPercent()); - - request.mutable_retention_period()->set_seconds(settings.RetentionPeriod_.Seconds()); - - for (const auto& codec : settings.SupportedCodecs_) { - request.mutable_supported_codecs()->add_codecs((static_cast(codec))); - } - request.set_partition_write_speed_bytes_per_second(settings.PartitionWriteSpeedBytesPerSecond_); - request.set_partition_write_burst_bytes(settings.PartitionWriteBurstBytes_); - request.set_retention_storage_mb(settings.RetentionStorageMb_); - request.set_metering_mode(TProtoAccessor::GetProto(settings.MeteringMode_)); - - for (auto& pair : settings.Attributes_) { - (*request.mutable_attributes())[pair.first] = pair.second; - } - - for (const auto& consumer : settings.Consumers_) { - Ydb::Topic::Consumer& consumerProto = *request.add_consumers(); - ConvertConsumerToProto(consumer, consumerProto); - } - + settings.SerializeTo(request); return request; } @@ -171,8 +130,7 @@ class TTopicClient::TImpl : public TClientImplCommon { } for (const auto& consumer : settings.AddConsumers_) { - Ydb::Topic::Consumer& consumerProto = *request.add_add_consumers(); - ConvertConsumerToProto(consumer, consumerProto); + consumer.SerializeTo(*request.add_add_consumers()); } for (const auto& consumer : settings.DropConsumers_) { diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index a6e1ba7168..d8c5479580 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -896,7 +896,7 @@ TVector TFixture::ReadFromTopic(const TString& topicPath, if (auto* e = std::get_if(&event)) { Cerr << e->HasCompressedMessages() << " " << e->GetMessagesCount() << Endl; for (auto& m : e->GetMessages()) { - messages.push_back(TString{m.GetData()}); + messages.emplace_back(m.GetData()); } if (!tx) { @@ -2910,6 +2910,63 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks) CheckTabletKeys("topic_A"); } + +Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture) +{ + // Consumes a lot of memory. Temporarily disabled + return; + + // The test verifies the simultaneous execution of several transactions. There is a topic + // with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions. + // The size of the messages is random. Such that both large blobs in the body and small ones in + // the head of the partition are obtained. Message sizes are multiples of 500 KB. This way we + // will make sure that when committing transactions, the division into blocks is taken into account. + + const size_t PARTITIONS_COUNT = 20; + const size_t TXS_COUNT = 100; + + CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); + + std::vector sessions; + std::vector transactions; + + // We open TXS_COUNT transactions and write messages to the topic. + for (size_t i = 0; i < TXS_COUNT; ++i) { + sessions.push_back(CreateTableSession()); + auto& session = sessions.back(); + + transactions.push_back(BeginTx(session)); + auto& tx = transactions.back(); + + for (size_t j = 0; j < PARTITIONS_COUNT; ++j) { + TString sourceId = TEST_MESSAGE_GROUP_ID; + sourceId += "_"; + sourceId += ToString(i); + sourceId += "_"; + sourceId += ToString(j); + + size_t count = RandomNumber(20) + 3; + WriteToTopic("topic_A", sourceId, TString(512 * 1000 * count, 'x'), &tx, j); + + WaitForAcks("topic_A", sourceId); + } + } + + // We are doing an asynchronous commit of transactions. They will be executed simultaneously. + std::vector futures; + + for (size_t i = 0; i < TXS_COUNT; ++i) { + futures.push_back(transactions[i].Commit()); + } + + // All transactions must be completed successfully. + for (size_t i = 0; i < TXS_COUNT; ++i) { + futures[i].Wait(); + const auto& result = futures[i].GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } +} + } } diff --git a/src/client/types/status/status.cpp b/src/client/types/status/status.cpp index 924b369044..1764499e40 100644 --- a/src/client/types/status/status.cpp +++ b/src/client/types/status/status.cpp @@ -104,7 +104,6 @@ bool TStreamPartStatus::EOS() const { namespace NStatusHelpers { void ThrowOnError(TStatus status, std::function onSuccess) { - std::cerr << "THROW ON ERROR" << std::endl; if (!status.IsSuccess()) { throw TYdbErrorException(status) << status; } else { diff --git a/src/client/value/value.cpp b/src/client/value/value.cpp index a6a9e7f951..38ad345b26 100644 --- a/src/client/value/value.cpp +++ b/src/client/value/value.cpp @@ -567,9 +567,13 @@ void FormatTypeInternal(TTypeParser& parser, IOutputStream& out) { out << "Null"sv; break; - default: - ThrowFatalError(TStringBuilder() - << "Unexpected type kind: " << parser.GetKind()); + case TTypeParser::ETypeKind::EmptyList: + out << "EmptyList"sv; + break; + + case TTypeParser::ETypeKind::EmptyDict: + out << "EmptyDict"sv; + break; } } diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index 510ce5332f..9d65935a2d 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -1,2 +1,3 @@ -add_subdirectory(basic_example_it) -add_subdirectory(bulk_upsert_simple_it) +add_subdirectory(basic_example) +add_subdirectory(bulk_upsert) +add_subdirectory(server_restart) diff --git a/tests/integration/basic_example_it/CMakeLists.txt b/tests/integration/basic_example/CMakeLists.txt similarity index 86% rename from tests/integration/basic_example_it/CMakeLists.txt rename to tests/integration/basic_example/CMakeLists.txt index b884f9560a..573e9745a1 100644 --- a/tests/integration/basic_example_it/CMakeLists.txt +++ b/tests/integration/basic_example/CMakeLists.txt @@ -1,4 +1,4 @@ -add_ydb_test(NAME basic-example_it +add_ydb_test(NAME basic-example SOURCES main.cpp basic_example_data.cpp diff --git a/tests/integration/basic_example_it/basic_example.cpp b/tests/integration/basic_example/basic_example.cpp similarity index 100% rename from tests/integration/basic_example_it/basic_example.cpp rename to tests/integration/basic_example/basic_example.cpp diff --git a/tests/integration/basic_example_it/basic_example.h b/tests/integration/basic_example/basic_example.h similarity index 100% rename from tests/integration/basic_example_it/basic_example.h rename to tests/integration/basic_example/basic_example.h diff --git a/tests/integration/basic_example_it/basic_example_data.cpp b/tests/integration/basic_example/basic_example_data.cpp similarity index 100% rename from tests/integration/basic_example_it/basic_example_data.cpp rename to tests/integration/basic_example/basic_example_data.cpp diff --git a/tests/integration/basic_example_it/main.cpp b/tests/integration/basic_example/main.cpp similarity index 99% rename from tests/integration/basic_example_it/main.cpp rename to tests/integration/basic_example/main.cpp index 4ad89b4377..ea3fff1a3f 100644 --- a/tests/integration/basic_example_it/main.cpp +++ b/tests/integration/basic_example/main.cpp @@ -38,7 +38,7 @@ static NYdb::TType MakeOptionalType(NYdb::EPrimitiveType type) { } -TEST(Integration, BasicExample) { +TEST(BasicExample, BasicExample) { auto [driver, path] = GetRunArgs(); NYdb::NTable::TTableClient client(driver); diff --git a/tests/integration/bulk_upsert_simple_it/CMakeLists.txt b/tests/integration/bulk_upsert/CMakeLists.txt similarity index 80% rename from tests/integration/bulk_upsert_simple_it/CMakeLists.txt rename to tests/integration/bulk_upsert/CMakeLists.txt index bc7ffbb35b..1dbed97089 100644 --- a/tests/integration/bulk_upsert_simple_it/CMakeLists.txt +++ b/tests/integration/bulk_upsert/CMakeLists.txt @@ -1,4 +1,4 @@ -add_ydb_test(NAME bulk_upsert_simple_it +add_ydb_test(NAME bulk_upsert SOURCES main.cpp bulk_upsert.cpp diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp b/tests/integration/bulk_upsert/bulk_upsert.cpp similarity index 99% rename from tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp rename to tests/integration/bulk_upsert/bulk_upsert.cpp index ce74b1cc16..dee7c04a15 100644 --- a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp +++ b/tests/integration/bulk_upsert/bulk_upsert.cpp @@ -17,7 +17,6 @@ static std::string JoinPath(const std::string& basePath, const std::string& path } TRunArgs GetRunArgs() { - std::string database = std::getenv("YDB_DATABASE"); std::string endpoint = std::getenv("YDB_ENDPOINT"); diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h b/tests/integration/bulk_upsert/bulk_upsert.h similarity index 100% rename from tests/integration/bulk_upsert_simple_it/bulk_upsert.h rename to tests/integration/bulk_upsert/bulk_upsert.h diff --git a/tests/integration/bulk_upsert_simple_it/main.cpp b/tests/integration/bulk_upsert/main.cpp similarity index 98% rename from tests/integration/bulk_upsert_simple_it/main.cpp rename to tests/integration/bulk_upsert/main.cpp index 589f0ff3d9..5dcfd0f2a2 100644 --- a/tests/integration/bulk_upsert_simple_it/main.cpp +++ b/tests/integration/bulk_upsert/main.cpp @@ -5,8 +5,7 @@ #include -TEST(Integration, BulkUpsert) { - +TEST(BulkUpsert, BulkUpsert) { uint32_t correctSumApp = 0; uint32_t correctSumHost = 0; uint32_t correctRowCount = 0; diff --git a/tests/integration/server_restart/CMakeLists.txt b/tests/integration/server_restart/CMakeLists.txt new file mode 100644 index 0000000000..37fab1cd77 --- /dev/null +++ b/tests/integration/server_restart/CMakeLists.txt @@ -0,0 +1,12 @@ +add_ydb_test(NAME server_restart + SOURCES + main.cpp + LINK_LIBRARIES + yutil + api-grpc + YDB-CPP-SDK::Query + gRPC::grpc++ + GTest::gtest_main + LABELS + integration +) diff --git a/tests/integration/server_restart/main.cpp b/tests/integration/server_restart/main.cpp new file mode 100644 index 0000000000..fafc564c93 --- /dev/null +++ b/tests/integration/server_restart/main.cpp @@ -0,0 +1,207 @@ +#include + +#include + +#include +#include + +#include + +#include + +#include + +using namespace NYdb; +using namespace NYdb::NQuery; + +using namespace std::chrono_literals; + +class TDiscoveryProxy final : public Ydb::Discovery::V1::DiscoveryService::Service { +public: + TDiscoveryProxy(std::atomic_bool& paused) + : Paused_(paused) + { + } + + grpc::Status ListEndpoints([[maybe_unused]] grpc::ServerContext* context, + [[maybe_unused]] const Ydb::Discovery::ListEndpointsRequest* request, + Ydb::Discovery::ListEndpointsResponse* response) override { + if (Paused_.load()) { + return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Server is paused"); + } + + Ydb::Discovery::ListEndpointsResult result; + auto info = result.add_endpoints(); + info->set_address("localhost"); + info->set_port(Port_); + info->set_node_id(1); + + response->mutable_operation()->mutable_result()->PackFrom(result); + response->mutable_operation()->set_id("ydb://operation/1"); + response->mutable_operation()->set_ready(true); + response->mutable_operation()->set_status(Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS); + return grpc::Status::OK; + } + + void SetPort(int port) { + Port_ = port; + } + +private: + int Port_; + std::atomic_bool& Paused_; +}; + +class TQueryProxy final : public Ydb::Query::V1::QueryService::Service { +public: + TQueryProxy(std::shared_ptr channel, std::atomic_bool& paused) + : Stub_(channel) + , Paused_(paused) + { + } + + template + using TGrpcCall = + grpc::Status(Ydb::Query::V1::QueryService::Stub::*)(grpc::ClientContext*, const TRequest& request, TResponse* response); + + template + using TGrpcStreamCall = + std::unique_ptr>(Ydb::Query::V1::QueryService::Stub::*)(grpc::ClientContext*, const TRequest& request); + + template + grpc::Status Run(TGrpcCall call, grpc::ServerContext *context, + const TRequest* request, TResponse* response) { + if (Paused_.load()) { + return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Server is paused"); + } + + auto clientContext = grpc::ClientContext::FromServerContext(*context); + return (Stub_.*call)(clientContext.get(), *request, response); + } + + template + grpc::Status RunStream(TGrpcStreamCall call, grpc::ServerContext *context, + const TRequest* request, grpc::ServerWriter* writer) { + auto clientContext = grpc::ClientContext::FromServerContext(*context); + auto reader = (Stub_.*call)(clientContext.get(), *request); + + TResponse state; + + while (reader->Read(&state)) { + if (Paused_.load()) { + return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Server is paused"); + } + writer->Write(state); + } + + return reader->Finish(); + } + + grpc::Status CreateSession(grpc::ServerContext *context, const Ydb::Query::CreateSessionRequest* request, + Ydb::Query::CreateSessionResponse* response) override { + return Run(&Ydb::Query::V1::QueryService::Stub::CreateSession, context, request, response); + } + + grpc::Status DeleteSession(grpc::ServerContext *context, const Ydb::Query::DeleteSessionRequest *request, + Ydb::Query::DeleteSessionResponse *response) override { + return Run(&Ydb::Query::V1::QueryService::Stub::DeleteSession, context, request, response); + } + + grpc::Status AttachSession(grpc::ServerContext *context, const Ydb::Query::AttachSessionRequest *request, + grpc::ServerWriter *writer) override { + return RunStream(&Ydb::Query::V1::QueryService::Stub::AttachSession, context, request, writer); + } + + grpc::Status ExecuteQuery(grpc::ServerContext *context, const Ydb::Query::ExecuteQueryRequest *request, + grpc::ServerWriter *writer) override { + return RunStream(&Ydb::Query::V1::QueryService::Stub::ExecuteQuery, context, request, writer); + } + +private: + Ydb::Query::V1::QueryService::Stub Stub_; + std::atomic_bool& Paused_; +}; + +class ServerRestartTest : public testing::Test { +protected: + ServerRestartTest() { + std::string endpoint = std::getenv("YDB_ENDPOINT"); + std::string database = std::getenv("YDB_DATABASE"); + Channel_ = grpc::CreateChannel(grpc::string{endpoint}, grpc::InsecureChannelCredentials()); + + DisoveryService_ = std::make_unique(Paused_); + QueryService_ = std::make_unique(Channel_, Paused_); + + int port = 0; + + Server_ = grpc::ServerBuilder() + .AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(), &port) + .RegisterService(DisoveryService_.get()) + .RegisterService(QueryService_.get()) + .BuildAndStart(); + + DisoveryService_->SetPort(port); + + Driver_ = std::make_unique(TDriverConfig() + .SetEndpoint("localhost:" + std::to_string(port)) + .SetDatabase(database) + .SetDiscoveryMode(EDiscoveryMode::Async) + ); + } + + TDriver GetDriver() { + return *Driver_; + } + + void PauseServer() { + Paused_.store(true); + } + + void UnpauseServer() { + Paused_.store(false); + } + +private: + std::atomic_bool Paused_{false}; + + std::shared_ptr Channel_; + + std::unique_ptr DisoveryService_; + std::unique_ptr QueryService_; + std::unique_ptr Server_; + + std::unique_ptr Driver_; +}; + + +TEST_F(ServerRestartTest, RestartOnGetSession) { + TQueryClient client(GetDriver()); + std::atomic_bool closed(false); + + auto thread = std::thread([&client, &closed]() { + std::optional status; + while (!closed.load()) { + status = client.RetryQuerySync([](NYdb::NQuery::TSession session) { + return session.ExecuteQuery("SELECT 1", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + }); + + ASSERT_LE(client.GetActiveSessionCount(), 1); + + std::this_thread::sleep_for(100ms); + } + + ASSERT_TRUE(status.has_value()); + ASSERT_TRUE(status->IsSuccess()) << ToString(*status); + }); + + std::this_thread::sleep_for(1s); + PauseServer(); + std::this_thread::sleep_for(10s); + UnpauseServer(); + std::this_thread::sleep_for(1s); + + closed.store(true); + thread.join(); + + ASSERT_EQ(client.GetActiveSessionCount(), 0); +}