diff --git a/include/ydb-cpp-sdk/client/bsconfig/storage_config.h b/include/ydb-cpp-sdk/client/bsconfig/storage_config.h index 9b70806757..a6196feded 100644 --- a/include/ydb-cpp-sdk/client/bsconfig/storage_config.h +++ b/include/ydb-cpp-sdk/client/bsconfig/storage_config.h @@ -36,27 +36,29 @@ struct TFetchStorageConfigResult : public TStatus { using TAsyncFetchStorageConfigResult = NThreading::TFuture; -struct TStorageConfigSettings : public NYdb::TOperationRequestSettings {}; +struct TReplaceStorageConfigSettings : public NYdb::TOperationRequestSettings {}; +struct TFetchStorageConfigSettings : public NYdb::TOperationRequestSettings {}; +struct TBootstrapClusterSettings : public NYdb::TOperationRequestSettings {}; class TStorageConfigClient { public: - explicit TStorageConfigClient(const TDriver& driver, const TCommonClientSettings& settings = {}); ~TStorageConfigClient(); // Replace config - TAsyncStatus ReplaceStorageConfig(const std::optional& yaml_config, - const std::optional& storage_yaml_config, + TAsyncStatus ReplaceStorageConfig(const std::optional& yaml_config, + const std::optional& storage_yaml_config, std::optional switch_dedicated_storage_section, - bool dedicated_config_mode); + bool dedicated_config_mode, + const TReplaceStorageConfigSettings& settings = {}); // Fetch current cluster storage config TAsyncFetchStorageConfigResult FetchStorageConfig(bool dedicated_storage_section, bool dedicated_cluster_section, - const TStorageConfigSettings& settings = {}); + const TFetchStorageConfigSettings& settings = {}); // Bootstrap cluster with automatic configuration - TAsyncStatus BootstrapCluster(const std::string& selfAssemblyUUID); + TAsyncStatus BootstrapCluster(const std::string& selfAssemblyUUID, const TBootstrapClusterSettings& settings = {}); private: class TImpl; diff --git a/include/ydb-cpp-sdk/client/cms/cms.h b/include/ydb-cpp-sdk/client/cms/cms.h new file mode 100644 index 0000000000..70091b005f --- /dev/null +++ b/include/ydb-cpp-sdk/client/cms/cms.h @@ -0,0 +1,213 @@ +#pragma once + +#include + +namespace Ydb::Cms { + class CreateDatabaseRequest; + class ListDatabasesResult; + class GetDatabaseStatusResult; + + class StorageUnits; + class ComputationalUnits; + class AllocatedComputationalUnit; + class ServerlessResources; + class Resources; + class SchemaOperationQuotas; + class SchemaOperationQuotas_LeakyBucket; + class DatabaseQuotas; + class DatabaseQuotas_StorageQuotas; + class ScaleRecommenderPolicies; + class ScaleRecommenderPolicies_ScaleRecommenderPolicy; + class ScaleRecommenderPolicies_ScaleRecommenderPolicy_TargetTrackingPolicy; +} // namespace Ydb::Cms + +namespace NYdb::inline V3::NCms { + +struct TListDatabasesSettings : public TOperationRequestSettings {}; + +class TListDatabasesResult : public TStatus { +public: + TListDatabasesResult(TStatus&& status, const Ydb::Cms::ListDatabasesResult& proto); + const std::vector& GetPaths() const; +private: + std::vector Paths_; +}; + +using TAsyncListDatabasesResult = NThreading::TFuture; + +struct TGetDatabaseStatusSettings : public TOperationRequestSettings {}; + +enum class EState { + StateUnspecified = 0, + Creating = 1, + Running = 2, + Removing = 3, + PendingResources = 4, + Configuring = 5, +}; + +struct TStorageUnits { + TStorageUnits() = default; + TStorageUnits(const Ydb::Cms::StorageUnits& proto); + + std::string UnitKind; + std::uint64_t Count; +}; + +struct TComputationalUnits { + TComputationalUnits() = default; + TComputationalUnits(const Ydb::Cms::ComputationalUnits& proto); + + std::string UnitKind; + std::string AvailabilityZone; + std::uint64_t Count; +}; + +struct TAllocatedComputationalUnit { + TAllocatedComputationalUnit() = default; + TAllocatedComputationalUnit(const Ydb::Cms::AllocatedComputationalUnit& proto); + + std::string Host; + std::uint32_t Port; + std::string UnitKind; +}; + +struct TResources { + TResources() = default; + TResources(const Ydb::Cms::Resources& proto); + + std::vector StorageUnits; + std::vector ComputationalUnits; +}; + +struct TSharedResources : public TResources { + using TResources::TResources; +}; + +struct TServerlessResources { + TServerlessResources() = default; + TServerlessResources(const Ydb::Cms::ServerlessResources& proto); + + std::string SharedDatabasePath; +}; + +struct TSchemaOperationQuotas { + struct TLeakyBucket { + TLeakyBucket() = default; + TLeakyBucket(const Ydb::Cms::SchemaOperationQuotas_LeakyBucket& proto); + + double BucketSize = 1; + std::uint64_t BucketSeconds = 2; + }; + + TSchemaOperationQuotas() = default; + TSchemaOperationQuotas(const Ydb::Cms::SchemaOperationQuotas& proto); + + std::vector LeakyBucketQuotas; +}; + +struct TDatabaseQuotas { + struct TStorageQuotas { + TStorageQuotas() = default; + TStorageQuotas(const Ydb::Cms::DatabaseQuotas_StorageQuotas& proto); + + std::string UnitKind; + std::uint64_t DataSizeHardQuota; + std::uint64_t DataSizeSoftQuota; + }; + + TDatabaseQuotas() = default; + TDatabaseQuotas(const Ydb::Cms::DatabaseQuotas& proto); + + std::uint64_t DataSizeHardQuota; + std::uint64_t DataSizeSoftQuota; + std::uint64_t DataStreamShardsQuota; + std::uint64_t DataStreamReservedStorageQuota; + std::uint32_t TtlMinRunInternalSeconds; + std::vector StorageQuotas; +}; + +struct TTargetTrackingPolicy { + using TAverageCpuUtilizationPercent = std::uint32_t; + + TTargetTrackingPolicy() = default; + TTargetTrackingPolicy(const Ydb::Cms::ScaleRecommenderPolicies_ScaleRecommenderPolicy_TargetTrackingPolicy& proto); + + std::variant Target; +}; + +struct TScaleRecommenderPolicy { + TScaleRecommenderPolicy() = default; + TScaleRecommenderPolicy(const Ydb::Cms::ScaleRecommenderPolicies_ScaleRecommenderPolicy& proto); + + std::variant Policy; +}; + +struct TScaleRecommenderPolicies { + TScaleRecommenderPolicies() = default; + TScaleRecommenderPolicies(const Ydb::Cms::ScaleRecommenderPolicies& proto); + + std::vector Policies; +}; + +using TResourcesKind = std::variant; + +class TGetDatabaseStatusResult : public TStatus { +public: + TGetDatabaseStatusResult(TStatus&& status, const Ydb::Cms::GetDatabaseStatusResult& proto); + + const std::string& GetPath() const; + EState GetState() const; + const TResourcesKind& GetResourcesKind() const; + const TResources& GetAllocatedResources() const; + const std::vector& GetRegisteredResources() const; + std::uint64_t GetGeneration() const; + const TSchemaOperationQuotas& GetSchemaOperationQuotas() const; + const TDatabaseQuotas& GetDatabaseQuotas() const; + const TScaleRecommenderPolicies& GetScaleRecommenderPolicies() const; + + // Fills CreateDatabaseRequest proto from this database status + void SerializeTo(Ydb::Cms::CreateDatabaseRequest& request) const; + +private: + std::string Path_; + EState State_; + TResourcesKind ResourcesKind_; + TResources AllocatedResources_; + std::vector RegisteredResources_; + std::uint64_t Generation_; + TSchemaOperationQuotas SchemaOperationQuotas_; + TDatabaseQuotas DatabaseQuotas_; + TScaleRecommenderPolicies ScaleRecommenderPolicies_; +}; + +using TAsyncGetDatabaseStatusResult = NThreading::TFuture; + +struct TCreateDatabaseSettings : public TOperationRequestSettings { + TCreateDatabaseSettings() = default; + explicit TCreateDatabaseSettings(const Ydb::Cms::CreateDatabaseRequest& request); + + // Fills CreateDatabaseRequest proto from this settings + void SerializeTo(Ydb::Cms::CreateDatabaseRequest& request) const; + + FLUENT_SETTING(TResourcesKind, ResourcesKind); + FLUENT_SETTING(TSchemaOperationQuotas, SchemaOperationQuotas); + FLUENT_SETTING(TDatabaseQuotas, DatabaseQuotas); + FLUENT_SETTING(TScaleRecommenderPolicies, ScaleRecommenderPolicies); +}; + +class TCmsClient { +public: + explicit TCmsClient(const TDriver& driver, const TCommonClientSettings& settings = TCommonClientSettings()); + + TAsyncListDatabasesResult ListDatabases(const TListDatabasesSettings& settings = TListDatabasesSettings()); + TAsyncGetDatabaseStatusResult GetDatabaseStatus(const std::string& path, + const TGetDatabaseStatusSettings& settings = TGetDatabaseStatusSettings()); + TAsyncStatus CreateDatabase(const std::string& path, + const TCreateDatabaseSettings& settings = TCreateDatabaseSettings()); +private: + class TImpl; + std::shared_ptr Impl_; +}; + +} // namespace NYdb::inline V3::NCms diff --git a/include/ydb-cpp-sdk/client/coordination/coordination.h b/include/ydb-cpp-sdk/client/coordination/coordination.h index 8d4fa0f1f4..d1893ee6ff 100644 --- a/include/ydb-cpp-sdk/client/coordination/coordination.h +++ b/include/ydb-cpp-sdk/client/coordination/coordination.h @@ -4,9 +4,11 @@ namespace Ydb { namespace Coordination { + class Config; + class CreateNodeRequest; class DescribeNodeResult; - class SemaphoreSession; class SemaphoreDescription; + class SemaphoreSession; } } @@ -107,6 +109,8 @@ class TNodeDescription { const std::vector& GetEffectivePermissions() const; const Ydb::Coordination::DescribeNodeResult& GetProto() const; + void SerializeTo(Ydb::Coordination::CreateNodeRequest& creationRequest) const; + private: struct TImpl; std::shared_ptr Impl_; @@ -189,7 +193,10 @@ struct TNodeSettings : public TOperationRequestSettings { FLUENT_SETTING_DEFAULT(ERateLimiterCountersMode, RateLimiterCountersMode, ERateLimiterCountersMode::UNSET); }; -struct TCreateNodeSettings : public TNodeSettings { }; +struct TCreateNodeSettings : public TNodeSettings { + TCreateNodeSettings() = default; + TCreateNodeSettings(const Ydb::Coordination::Config& config); +}; struct TAlterNodeSettings : public TNodeSettings { }; struct TDropNodeSettings : public TOperationRequestSettings { }; struct TDescribeNodeSettings : public TOperationRequestSettings { }; diff --git a/include/ydb-cpp-sdk/client/draft/ydb_dynamic_config.h b/include/ydb-cpp-sdk/client/draft/ydb_dynamic_config.h index 5333a66fa7..8949758990 100644 --- a/include/ydb-cpp-sdk/client/draft/ydb_dynamic_config.h +++ b/include/ydb-cpp-sdk/client/draft/ydb_dynamic_config.h @@ -160,6 +160,21 @@ struct TVerboseResolveConfigResult : public TStatus { using TAsyncVerboseResolveConfigResult = NThreading::TFuture; +struct TFetchStartupConfigResult : public TStatus { + TFetchStartupConfigResult(TStatus&& status, std::string&& config) + : TStatus(std::move(status)) + , Config_(std::move(config)) + {} + + const std::string& GetConfig() const { + return Config_; + } + +private: + std::string Config_; +}; + +using TAsyncFetchStartupConfigResult = NThreading::TFuture; struct TDynamicConfigClientSettings : public TCommonClientSettingsBase { using TSelf = TDynamicConfigClientSettings; @@ -240,6 +255,9 @@ class TDynamicConfigClient { const std::map& volatileConfigs, const TClusterConfigSettings& settings = {}); + // Fetch startup config + TAsyncFetchStartupConfigResult FetchStartupConfig(const TClusterConfigSettings& settings = {}); + private: std::shared_ptr Impl_; }; diff --git a/include/ydb-cpp-sdk/client/driver/driver.h b/include/ydb-cpp-sdk/client/driver/driver.h index a2fa24ca62..9aad0ac0a2 100644 --- a/include/ydb-cpp-sdk/client/driver/driver.h +++ b/include/ydb-cpp-sdk/client/driver/driver.h @@ -142,6 +142,7 @@ class TDriver { template void AddExtension(typename TExtension::TParams params = typename TExtension::TParams()); + TDriverConfig GetConfig() const; private: std::shared_ptr Impl_; }; diff --git a/include/ydb-cpp-sdk/client/proto/accessor.h b/include/ydb-cpp-sdk/client/proto/accessor.h index cee4869a98..fcd53785dc 100644 --- a/include/ydb-cpp-sdk/client/proto/accessor.h +++ b/include/ydb-cpp-sdk/client/proto/accessor.h @@ -43,6 +43,8 @@ class TProtoAccessor { static ::google::protobuf::Map* GetProtoMapPtr(TParams& params); static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats); static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription); + static const Ydb::Table::DescribeExternalDataSourceResult& GetProto(const NTable::TExternalDataSourceDescription&); + static const Ydb::Table::DescribeExternalTableResult& GetProto(const NTable::TExternalTableDescription&); static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription); static const Ydb::Topic::DescribeConsumerResult& GetProto(const NYdb::NTopic::TConsumerDescription& consumerDescription); static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult); diff --git a/include/ydb-cpp-sdk/client/query/stats.h b/include/ydb-cpp-sdk/client/query/stats.h index 902478b446..0b7ca6cc6d 100644 --- a/include/ydb-cpp-sdk/client/query/stats.h +++ b/include/ydb-cpp-sdk/client/query/stats.h @@ -29,6 +29,7 @@ class TExecStats { std::optional GetPlan() const; std::optional GetAst() const; + std::optional GetMeta() const; TDuration GetTotalDuration() const; TDuration GetTotalCpuTime() const; diff --git a/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h b/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h index 040e65f8f7..05f4df0643 100644 --- a/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h +++ b/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h @@ -2,18 +2,62 @@ #include +#include +#include +#include + namespace Ydb::RateLimiter { -class DescribeResourceResult; -class HierarchicalDrrSettings; + class CreateResourceRequest; + class DescribeResourceResult; + class HierarchicalDrrSettings; + class ReplicatedBucketSettings; + class MeteringConfig; + class MeteringConfig_Metric; } // namespace Ydb::RateLimiter namespace NYdb::inline V3::NRateLimiter { +struct TReplicatedBucketSettings { + using TSelf = TReplicatedBucketSettings; + + TReplicatedBucketSettings() = default; + TReplicatedBucketSettings(const Ydb::RateLimiter::ReplicatedBucketSettings&); + + void SerializeTo(Ydb::RateLimiter::ReplicatedBucketSettings&) const; + + // Interval between syncs from kesus and between consumption reports. + // Default value equals 5000 ms and not inherited. + FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, ReportInterval); +}; + +class TLeafBehavior { +public: + enum EBehavior { + REPLICATED_BUCKET, + }; + + EBehavior GetBehavior() const; + + TLeafBehavior(const TReplicatedBucketSettings&); + TLeafBehavior(const Ydb::RateLimiter::ReplicatedBucketSettings&); + const TReplicatedBucketSettings& GetReplicatedBucket() const; + + void SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings&) const; + +private: + std::variant BehaviorSettings_; +}; + // Settings for hierarchical deficit round robin (HDRR) algorithm. template -struct THierarchicalDrrSettings : public TOperationRequestSettings { +struct THierarchicalDrrSettings { using TSelf = TDerived; + THierarchicalDrrSettings() = default; + THierarchicalDrrSettings(const Ydb::RateLimiter::HierarchicalDrrSettings&); + + void SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings&) const; + // Resource consumption speed limit. // Value is required for root resource. // Must be nonnegative. @@ -40,14 +84,110 @@ struct THierarchicalDrrSettings : public TOperationRequestSettings { // Default value is inherited from parent or 0.75 for root. // Must be nonnegative and less than or equal to 1. FLUENT_SETTING_OPTIONAL(double, PrefetchWatermark); + + // Prevents bucket from going too deep in negative values. If somebody reports value that will exceed + // this limit the final amount in bucket will be equal to this limit. + // Should be negative value. + // Unset means no limit. + FLUENT_SETTING_OPTIONAL(double, ImmediatelyFillUpTo); + + // Behavior of leafs in tree. + // Not inherited. + FLUENT_SETTING_OPTIONAL(TLeafBehavior, LeafBehavior); +}; + +struct TMetric { + using TSelf = TMetric; + using TLabels = std::unordered_map; + + TMetric() = default; + TMetric(const Ydb::RateLimiter::MeteringConfig_Metric&); + + void SerializeTo(Ydb::RateLimiter::MeteringConfig_Metric&) const; + + // Send this metric to billing. + // Default value is false (not inherited). + FLUENT_SETTING_DEFAULT(bool, Enabled, false); + + // Billing metric period (aligned to hour boundary). + // Default value is inherited from parent or equals 60 seconds for root. + FLUENT_SETTING_OPTIONAL(std::chrono::seconds, BillingPeriod); + + // User-defined labels. + FLUENT_SETTING(TLabels, Labels); + + // Billing metric JSON fields (inherited from parent if not set) + FLUENT_SETTING(std::string, MetricFieldsJson); +}; + +struct TMeteringConfig { + using TSelf = TMeteringConfig; + + TMeteringConfig() = default; + TMeteringConfig(const Ydb::RateLimiter::MeteringConfig&); + + void SerializeTo(Ydb::RateLimiter::MeteringConfig&) const; + + // Meter consumed resources and send billing metrics. + FLUENT_SETTING_DEFAULT(bool, Enabled, false); + + // Period to report consumption history from clients to kesus + // Default value is inherited from parent or equals 5000 ms for root. + FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, ReportPeriod); + + // Consumption history period that is sent in one message to metering actor. + // Default value is inherited from parent or equals 1000 ms for root. + FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, MeterPeriod); + + // Time window to collect data from every client. + // Any client metering message that is `collect_period` late is discarded (not metered or billed). + // Default value is inherited from parent or equals 30 seconds for root. + FLUENT_SETTING_OPTIONAL(std::chrono::seconds, CollectPeriod); + + // Provisioned consumption limit in units per second. + // Effective value is limited by corresponding `max_units_per_second`. + // Default value is 0 (not inherited). + FLUENT_SETTING_OPTIONAL(double, ProvisionedUnitsPerSecond); + + // Provisioned allowed burst equals `provisioned_coefficient * provisioned_units_per_second` units. + // Effective value is limited by corresponding PrefetchCoefficient. + // Default value is inherited from parent or equals 60 for root. + FLUENT_SETTING_OPTIONAL(double, ProvisionedCoefficient); + + // On-demand allowed burst equals `overshoot_coefficient * prefetch_coefficient * max_units_per_second` units. + // Should be greater or equal to 1.0 + // Default value is inherited from parent or equals 1.1 for root + FLUENT_SETTING_OPTIONAL(double, OvershootCoefficient); + + // Consumption within provisioned limit. + // Informative metric that should be sent to billing (not billed). + FLUENT_SETTING_OPTIONAL(TMetric, Provisioned); + + // Consumption that exceeds provisioned limit is billed as on-demand. + FLUENT_SETTING_OPTIONAL(TMetric, OnDemand); + + // Consumption that exceeds even on-demand limit. + // Normally it is free and should not be billed. + FLUENT_SETTING_OPTIONAL(TMetric, Overshoot); }; // Settings for create resource request. -struct TCreateResourceSettings : public THierarchicalDrrSettings { +struct TCreateResourceSettings + : public TOperationRequestSettings + , public THierarchicalDrrSettings +{ + TCreateResourceSettings() = default; + TCreateResourceSettings(const Ydb::RateLimiter::CreateResourceRequest&); + + FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig); }; // Settings for alter resource request. -struct TAlterResourceSettings : public THierarchicalDrrSettings { +struct TAlterResourceSettings + : public TOperationRequestSettings + , public THierarchicalDrrSettings +{ + FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig); }; // Settings for drop resource request. @@ -89,7 +229,9 @@ using TAsyncListResourcesResult = NThreading::TFuture; // Result for describe resource request. struct TDescribeResourceResult : public TStatus { - struct THierarchicalDrrProps { + // Note for YDB developers: THierarchicalDrrProps wrapper class exists for compatibility with older client code. + // Newer code should use the THierarchicalDrrSettings class directly. + struct THierarchicalDrrProps : public THierarchicalDrrSettings { THierarchicalDrrProps(const Ydb::RateLimiter::HierarchicalDrrSettings&); // Resource consumption speed limit. @@ -113,11 +255,13 @@ struct TDescribeResourceResult : public TStatus { return PrefetchWatermark_; } - private: - std::optional MaxUnitsPerSecond_; - std::optional MaxBurstSizeCoefficient_; - std::optional PrefetchCoefficient_; - std::optional PrefetchWatermark_; + std::optional GetImmediatelyFillUpTo() const { + return ImmediatelyFillUpTo_; + } + + const std::optional& GetLeafBehavior() const { + return LeafBehavior_; + } }; TDescribeResourceResult(TStatus status, const Ydb::RateLimiter::DescribeResourceResult& result); @@ -131,9 +275,14 @@ struct TDescribeResourceResult : public TStatus { return HierarchicalDrrProps_; } + const std::optional& GetMeteringConfig() const { + return MeteringConfig_; + } + private: std::string ResourcePath_; THierarchicalDrrProps HierarchicalDrrProps_; + std::optional MeteringConfig_; }; using TAsyncDescribeResourceResult = NThreading::TFuture; diff --git a/include/ydb-cpp-sdk/client/table/query_stats/stats.h b/include/ydb-cpp-sdk/client/table/query_stats/stats.h index 38740d2dd1..9eda4620ff 100644 --- a/include/ydb-cpp-sdk/client/table/query_stats/stats.h +++ b/include/ydb-cpp-sdk/client/table/query_stats/stats.h @@ -30,7 +30,7 @@ namespace NTable { enum class ECollectQueryStatsMode { None = 0, // Stats collection is disabled Basic = 1, // Aggregated stats of reads, updates and deletes per table - Full = 2, // Add per-stage execution profile and query plan on top of Basic mode + Full = 2, // Add per-stage execution profile, query plan and query meta on top of Basic mode Profile = 3 // Detailed execution stats including stats for individual tasks and channels }; diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index 98a15f9284..e83101f28d 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -22,6 +22,8 @@ class ColumnFamily; class CreateTableRequest; class Changefeed; class ChangefeedDescription; +class DescribeExternalDataSourceResult; +class DescribeExternalTableResult; class DescribeTableResult; class ExplicitPartitions; class GlobalIndexSettings; @@ -293,7 +295,7 @@ class TIndexDescription { EIndexType GetIndexType() const; const std::vector& GetIndexColumns() const; const std::vector& GetDataColumns() const; - const std::variant& GetVectorIndexSettings() const; + const std::variant& GetIndexSettings() const; uint64_t GetSizeBytes() const; void SerializeTo(Ydb::Table::TableIndex& proto) const; @@ -1068,11 +1070,16 @@ class TRenameItem { //////////////////////////////////////////////////////////////////////////////// +class TDescribeExternalDataSourceResult; +class TDescribeExternalTableResult; + using TAsyncCreateSessionResult = NThreading::TFuture; using TAsyncDataQueryResult = NThreading::TFuture; using TAsyncPrepareQueryResult = NThreading::TFuture; using TAsyncExplainDataQueryResult = NThreading::TFuture; using TAsyncDescribeTableResult = NThreading::TFuture; +using TAsyncDescribeExternalDataSourceResult = NThreading::TFuture; +using TAsyncDescribeExternalTableResult = NThreading::TFuture; using TAsyncBeginTransactionResult = NThreading::TFuture; using TAsyncCommitTransactionResult = NThreading::TFuture; using TAsyncTablePartIterator = NThreading::TFuture; @@ -1157,6 +1164,7 @@ struct TStreamExecScanQuerySettings : public TRequestSettings= ECollectQueryStatsMode::Full to get QueryMeta in QueryStats // Collect full query compilation diagnostics FLUENT_SETTING_DEFAULT(bool, CollectFullDiagnostics, false); }; @@ -1694,6 +1702,10 @@ struct TDescribeTableSettings : public TOperationRequestSettings {}; + +struct TDescribeExternalTableSettings : public TOperationRequestSettings {}; + struct TExplainDataQuerySettings : public TOperationRequestSettings { FLUENT_SETTING_DEFAULT(bool, WithCollectFullDiagnostics, false); }; @@ -1778,6 +1790,12 @@ class TSession { TAsyncDescribeTableResult DescribeTable(const std::string& path, const TDescribeTableSettings& settings = TDescribeTableSettings()); + TAsyncDescribeExternalDataSourceResult DescribeExternalDataSource(const std::string& path, + const TDescribeExternalDataSourceSettings& settings = {}); + + TAsyncDescribeExternalTableResult DescribeExternalTable(const std::string& path, + const TDescribeExternalTableSettings& settings = {}); + TAsyncBeginTransactionResult BeginTransaction(const TTxSettings& txSettings = TTxSettings(), const TBeginTxSettings& settings = TBeginTxSettings()); @@ -2092,6 +2110,7 @@ class TScanQueryPart : public TStreamPartStatus { const TQueryStats& GetQueryStats() const { return *QueryStats_; } TQueryStats ExtractQueryStats() { return std::move(*QueryStats_); } + // Deprecated. Use GetMeta() of TQueryStats bool HasDiagnostics() const { return Diagnostics_.has_value(); } const std::string& GetDiagnostics() const { return *Diagnostics_; } std::string&& ExtractDiagnostics() { return std::move(*Diagnostics_); } @@ -2194,5 +2213,57 @@ class TReadRowsResult : public TStatus { } }; +class TExternalDataSourceDescription { +public: + TExternalDataSourceDescription(Ydb::Table::DescribeExternalDataSourceResult&& description); + +private: + class TImpl; + std::shared_ptr Impl_; + + friend class NYdb::V3::TProtoAccessor; + const Ydb::Table::DescribeExternalDataSourceResult& GetProto() const; +}; + +//! Represents the result of a DescribeExternalDataSource call. +class TDescribeExternalDataSourceResult : public NScheme::TDescribePathResult { +public: + TDescribeExternalDataSourceResult( + TStatus&& status, + Ydb::Table::DescribeExternalDataSourceResult&& description + ); + + TExternalDataSourceDescription GetExternalDataSourceDescription() const; + +private: + TExternalDataSourceDescription ExternalDataSourceDescription_; +}; + +class TExternalTableDescription { +public: + TExternalTableDescription(Ydb::Table::DescribeExternalTableResult&& description); + +private: + class TImpl; + std::shared_ptr Impl_; + + friend class NYdb::V3::TProtoAccessor; + const Ydb::Table::DescribeExternalTableResult& GetProto() const; +}; + +//! Represents the result of a DescribeExternalTable call. +class TDescribeExternalTableResult : public NScheme::TDescribePathResult { +public: + TDescribeExternalTableResult( + TStatus&& status, + Ydb::Table::DescribeExternalTableResult&& description + ); + + TExternalTableDescription GetExternalTableDescription() const; + +private: + TExternalTableDescription ExternalTableDescription_; +}; + } // namespace NTable } // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/types/status/status.h b/include/ydb-cpp-sdk/client/types/status/status.h index 253f975cab..96314b6e2d 100644 --- a/include/ydb-cpp-sdk/client/types/status/status.h +++ b/include/ydb-cpp-sdk/client/types/status/status.h @@ -61,6 +61,14 @@ class TYdbErrorException : public TYdbException { return out << e.Status_; } + const TStatus& GetStatus() const { + return Status_; + } + + TStatus&& ExtractStatus() { + return std::move(Status_); + } + private: TStatus Status_; }; diff --git a/src/api/grpc/draft/ydb_dynamic_config_v1.proto b/src/api/grpc/draft/ydb_dynamic_config_v1.proto index 7a3b185ec2..084793d97a 100644 --- a/src/api/grpc/draft/ydb_dynamic_config_v1.proto +++ b/src/api/grpc/draft/ydb_dynamic_config_v1.proto @@ -46,4 +46,7 @@ service DynamicConfigService { // Resolve config for all possible labels combinations. rpc ResolveAllConfig(DynamicConfig.ResolveAllConfigRequest) returns (DynamicConfig.ResolveAllConfigResponse); + + // Generate dynamic config based on cluster's static config. + rpc FetchStartupConfig(DynamicConfig.FetchStartupConfigRequest) returns (DynamicConfig.FetchStartupConfigResponse); } diff --git a/src/api/grpc/ydb_table_v1.proto b/src/api/grpc/ydb_table_v1.proto index fe03b667d2..66b508bdd3 100644 --- a/src/api/grpc/ydb_table_v1.proto +++ b/src/api/grpc/ydb_table_v1.proto @@ -10,7 +10,7 @@ service TableService { // Create new session. Implicit session creation is forbidden, // so user must create new session before execute any query, // otherwise BAD_SESSION status will be returned. - // Simultaneous execution of requests are forbiden. + // Simultaneous execution of requests are forbidden. // Sessions are volatile, can be invalidated by server, for example in case // of fatal errors. All requests with this session will fail with BAD_SESSION status. // So, client must be able to handle BAD_SESSION status. @@ -85,4 +85,10 @@ service TableService { // Executes scan query with streaming result. rpc StreamExecuteScanQuery(Table.ExecuteScanQueryRequest) returns (stream Table.ExecuteScanQueryPartialResponse); + + // Returns information about a given external data source. + rpc DescribeExternalDataSource(Table.DescribeExternalDataSourceRequest) returns (Table.DescribeExternalDataSourceResponse); + + // Returns information about a given external table. + rpc DescribeExternalTable(Table.DescribeExternalTableRequest) returns (Table.DescribeExternalTableResponse); } diff --git a/src/api/protos/draft/ydb_dynamic_config.proto b/src/api/protos/draft/ydb_dynamic_config.proto index 666e69da7a..3e37964f29 100644 --- a/src/api/protos/draft/ydb_dynamic_config.proto +++ b/src/api/protos/draft/ydb_dynamic_config.proto @@ -9,8 +9,12 @@ import "src/api/protos/ydb_operation.proto"; message ConfigIdentity { // Current main config version uint64 version = 1; - // Cluster name (should be set on node with console tablet, unknown by default) - string cluster = 2; + oneof type { + // Cluster name (should be set on node with console tablet, unknown by default) + string cluster = 2; + // database name + string database = 3; + } } message SetConfigRequest { @@ -19,6 +23,7 @@ message SetConfigRequest { string config = 2; bool dry_run = 3; bool allow_unknown_fields = 4; + bool allow_absent_database = 5; } message SetConfigResponse { @@ -31,6 +36,7 @@ message ReplaceConfigRequest { string config = 2; bool dry_run = 3; bool allow_unknown_fields = 4; + bool allow_absent_database = 5; } message ReplaceConfigResponse { @@ -74,12 +80,12 @@ message GetConfigResponse { } message GetConfigResult { - // Main dynamic config with metadata in YAML format - string config = 1; + // Main/database dynamic config with metadata in YAML format + repeated string config = 1; // All volatile configs repeated VolatileConfig volatile_configs = 2; - ConfigIdentity identity = 3; + repeated ConfigIdentity identity = 3; } message VolatileConfigMetadata { @@ -222,3 +228,17 @@ message ResolveAllConfigResult { // Verbose resolved configs repeated ResolvedConfig configs = 2; } + +message FetchStartupConfigRequest { + Ydb.Operations.OperationParams operation_params = 1; +} + +message FetchStartupConfigResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +message FetchStartupConfigResult { + // YAML document with generated dynamic config + string config = 1; +} diff --git a/src/api/protos/persqueue_error_codes_v1.proto b/src/api/protos/persqueue_error_codes_v1.proto index c11c3fad8d..65ef172ff0 100644 --- a/src/api/protos/persqueue_error_codes_v1.proto +++ b/src/api/protos/persqueue_error_codes_v1.proto @@ -59,4 +59,6 @@ enum ErrorCode { INVALID_ARGUMENT = 500040; VALIDATION_ERROR = 500080; + + UNKNOWN_READ_RULE = 500032; } diff --git a/src/api/protos/ydb_query_stats.proto b/src/api/protos/ydb_query_stats.proto index 300d5d9837..34f4f49bdb 100644 --- a/src/api/protos/ydb_query_stats.proto +++ b/src/api/protos/ydb_query_stats.proto @@ -43,4 +43,7 @@ message QueryStats { string query_ast = 5; uint64 total_duration_us = 6; uint64 total_cpu_time_us = 7; + // will be filled only in MODE_EXPLAIN or in MODE_EXEC with QueryStatsCollection.Mode >= STATS_COLLECTION_FULL, + // collects additional meta about query compilation, including table metadata + string query_meta = 8; } diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index 415ba2b0f1..af2a6d869d 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -543,8 +543,7 @@ message ColumnFamily { COMPRESSION_ZSTD = 3; } - // Name of the column family, the name "default" must be used for the - // primary column family that contains at least primary key columns + // Name of the column family string name = 1; // This specifies data storage settings for column family @@ -557,8 +556,9 @@ message ColumnFamily { // WARNING: DO NOT USE Ydb.FeatureFlag.Status keep_in_memory = 4; - // Not all compression algorithms support - // Set if want to change default value + // Set the compression level for selected compression type. If no value is specified, default value will be chosen. + // For ZSTD compression level must be in range [-131072:22] + // For other compression types compression level must be empty optional int32 compression_level = 5; } @@ -1274,7 +1274,7 @@ message ExecuteScanQueryRequest { QueryStatsCollection.Mode collect_stats = 8; // works only in mode: MODE_EXPLAIN, // collects additional diagnostics about query compilation, including query plan and scheme - bool collect_full_diagnostics = 9; + bool collect_full_diagnostics = 9 [deprecated=true]; } message ExecuteScanQueryPartialResponse { @@ -1292,5 +1292,45 @@ message ExecuteScanQueryPartialResult { Ydb.TableStats.QueryStats query_stats = 6; // works only in mode: MODE_EXPLAIN, // collects additional diagnostics about query compilation, including query plan and scheme - string query_full_diagnostics = 7; + string query_full_diagnostics = 7 [deprecated = true]; +} + +// Returns information about an external data source with a given path. +message DescribeExternalDataSourceRequest { + Ydb.Operations.OperationParams operation_params = 1; + string path = 2; +} + +message DescribeExternalDataSourceResponse { + // Holds DescribeExternalDataSourceResult in case of a successful call. + Ydb.Operations.Operation operation = 1; +} + +message DescribeExternalDataSourceResult { + // Description of a generic scheme object. + Ydb.Scheme.Entry self = 1; + optional string source_type = 2; + optional string location = 3; + map properties = 4; +} + +// Returns information about an external table with a given path. +message DescribeExternalTableRequest { + Ydb.Operations.OperationParams operation_params = 1; + string path = 2; +} + +message DescribeExternalTableResponse { + // Holds DescribeExternalTableResult in case of a successful call. + Ydb.Operations.Operation operation = 1; +} + +message DescribeExternalTableResult { + // Description of a generic scheme object. + Ydb.Scheme.Entry self = 1; + optional string source_type = 2; + optional string data_source_path = 3; + optional string location = 4; + repeated ColumnMeta columns = 5; + map content = 6; } diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index 65426eeacc..2b33ff89fe 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -5,6 +5,7 @@ add_subdirectory(iam) add_subdirectory(iam_private) add_subdirectory(impl) add_subdirectory(resources) +add_subdirectory(cms) add_subdirectory(common_client) add_subdirectory(coordination) add_subdirectory(datastreams) diff --git a/src/client/bsconfig/storage_config.cpp b/src/client/bsconfig/storage_config.cpp index 0f472abfd6..44baf6b038 100644 --- a/src/client/bsconfig/storage_config.cpp +++ b/src/client/bsconfig/storage_config.cpp @@ -14,10 +14,11 @@ class TStorageConfigClient::TImpl : public TClientImplCommon& yaml_config, - const std::optional& storage_yaml_config, + TAsyncStatus ReplaceStorageConfig(const std::optional& yaml_config, + const std::optional& storage_yaml_config, std::optional switch_dedicated_storage_section, - bool dedicated_config_mode) { + bool dedicated_config_mode, + const TReplaceStorageConfigSettings& settings) { auto request = MakeRequest(); if (yaml_config) { @@ -33,11 +34,12 @@ class TStorageConfigClient::TImpl : public TClientImplCommon( std::move(request), - &Ydb::BSConfig::V1::BSConfigService::Stub::AsyncReplaceStorageConfig); + &Ydb::BSConfig::V1::BSConfigService::Stub::AsyncReplaceStorageConfig, + TRpcRequestSettings::Make(settings)); } TAsyncFetchStorageConfigResult FetchStorageConfig(bool dedicated_storage_section, bool dedicated_cluster_section, - const TStorageConfigSettings& settings = {}) { + const TFetchStorageConfigSettings& settings) { auto request = MakeOperationRequest(settings); if (dedicated_storage_section) { request.set_dedicated_storage_section(true); @@ -70,12 +72,13 @@ class TStorageConfigClient::TImpl : public TClientImplCommon(); request.set_self_assembly_uuid(selfAssemblyUUID); return RunSimple(std::move(request), - &Ydb::BSConfig::V1::BSConfigService::Stub::AsyncBootstrapCluster); + &Ydb::BSConfig::V1::BSConfigService::Stub::AsyncBootstrapCluster, + TRpcRequestSettings::Make(settings)); } }; @@ -85,20 +88,20 @@ TStorageConfigClient::TStorageConfigClient(const TDriver& driver, const TCommonC TStorageConfigClient::~TStorageConfigClient() = default; -TAsyncStatus TStorageConfigClient::ReplaceStorageConfig(const std::optional& yaml_config, - const std::optional& storage_yaml_config, std::optional switch_dedicated_storage_section, - bool dedicated_config_mode) { +TAsyncStatus TStorageConfigClient::ReplaceStorageConfig(const std::optional& yaml_config, + const std::optional& storage_yaml_config, std::optional switch_dedicated_storage_section, + bool dedicated_config_mode, const TReplaceStorageConfigSettings& settings) { return Impl_->ReplaceStorageConfig(yaml_config, storage_yaml_config, switch_dedicated_storage_section, - dedicated_config_mode); + dedicated_config_mode, settings); } TAsyncFetchStorageConfigResult TStorageConfigClient::FetchStorageConfig(bool dedicated_storage_section, - bool dedicated_cluster_section, const TStorageConfigSettings& settings) { + bool dedicated_cluster_section, const TFetchStorageConfigSettings& settings) { return Impl_->FetchStorageConfig(dedicated_storage_section, dedicated_cluster_section, settings); } -TAsyncStatus TStorageConfigClient::BootstrapCluster(const std::string& selfAssemblyUUID) { - return Impl_->BootstrapCluster(selfAssemblyUUID); +TAsyncStatus TStorageConfigClient::BootstrapCluster(const std::string& selfAssemblyUUID, const TBootstrapClusterSettings& settings) { + return Impl_->BootstrapCluster(selfAssemblyUUID, settings); } diff --git a/src/client/cms/CMakeLists.txt b/src/client/cms/CMakeLists.txt new file mode 100644 index 0000000000..a330cf842f --- /dev/null +++ b/src/client/cms/CMakeLists.txt @@ -0,0 +1,17 @@ +_ydb_sdk_add_library(client-cms) + +target_link_libraries(client-cms + PUBLIC + client-ydb_driver + PRIVATE + api-grpc + api-protos + client-ydb_common_client-impl +) + +target_sources(client-cms + PRIVATE + cms.cpp +) + +_ydb_sdk_make_client_component(Cms client-cms) diff --git a/src/client/cms/cms.cpp b/src/client/cms/cms.cpp new file mode 100644 index 0000000000..9d013fc24a --- /dev/null +++ b/src/client/cms/cms.cpp @@ -0,0 +1,383 @@ +#include + +#include +#include +#include + +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + +namespace NYdb::inline V3::NCms { + +namespace { + EState ConvertState(Ydb::Cms::GetDatabaseStatusResult_State protoState) { + switch (protoState) { + case Ydb::Cms::GetDatabaseStatusResult_State_STATE_UNSPECIFIED: + return EState::StateUnspecified; + case Ydb::Cms::GetDatabaseStatusResult_State_CREATING: + return EState::Creating; + case Ydb::Cms::GetDatabaseStatusResult_State_RUNNING: + return EState::Running; + case Ydb::Cms::GetDatabaseStatusResult_State_REMOVING: + return EState::Removing; + case Ydb::Cms::GetDatabaseStatusResult_State_PENDING_RESOURCES: + return EState::PendingResources; + case Ydb::Cms::GetDatabaseStatusResult_State_CONFIGURING: + return EState::Configuring; + default: + return EState::StateUnspecified; + } + } + + void SerializeToImpl( + const TResourcesKind& resourcesKind, + const TSchemaOperationQuotas& schemaQuotas, + const TDatabaseQuotas& dbQuotas, + const TScaleRecommenderPolicies& scaleRecommenderPolicies, + Ydb::Cms::CreateDatabaseRequest& out) + { + if (std::holds_alternative(resourcesKind)) { + const auto& resources = std::get(resourcesKind); + for (const auto& storageUnit : resources.StorageUnits) { + auto* protoUnit = out.mutable_resources()->add_storage_units(); + protoUnit->set_unit_kind(storageUnit.UnitKind); + protoUnit->set_count(storageUnit.Count); + } + for (const auto& computationalUnit : resources.ComputationalUnits) { + auto* protoUnit = out.mutable_resources()->add_computational_units(); + protoUnit->set_unit_kind(computationalUnit.UnitKind); + protoUnit->set_count(computationalUnit.Count); + protoUnit->set_availability_zone(computationalUnit.AvailabilityZone); + } + } else if (std::holds_alternative(resourcesKind)) { + const auto& resources = std::get(resourcesKind); + for (const auto& storageUnit : resources.StorageUnits) { + auto* protoUnit = out.mutable_shared_resources()->add_storage_units(); + protoUnit->set_unit_kind(storageUnit.UnitKind); + protoUnit->set_count(storageUnit.Count); + } + for (const auto& computationalUnit : resources.ComputationalUnits) { + auto* protoUnit = out.mutable_shared_resources()->add_computational_units(); + protoUnit->set_unit_kind(computationalUnit.UnitKind); + protoUnit->set_count(computationalUnit.Count); + protoUnit->set_availability_zone(computationalUnit.AvailabilityZone); + } + } else if (std::holds_alternative(resourcesKind)) { + const auto& resources = std::get(resourcesKind); + out.mutable_serverless_resources()->set_shared_database_path(resources.SharedDatabasePath); + } else if (std::holds_alternative(resourcesKind)) { + out.clear_resources_kind(); + } + + for (const auto& quota : schemaQuotas.LeakyBucketQuotas) { + auto protoQuota = out.mutable_schema_operation_quotas()->add_leaky_bucket_quotas(); + protoQuota->set_bucket_seconds(quota.BucketSeconds); + protoQuota->set_bucket_size(quota.BucketSize); + } + + out.mutable_database_quotas()->set_data_size_hard_quota(dbQuotas.DataSizeHardQuota); + out.mutable_database_quotas()->set_data_size_soft_quota(dbQuotas.DataSizeSoftQuota); + out.mutable_database_quotas()->set_data_stream_shards_quota(dbQuotas.DataStreamShardsQuota); + out.mutable_database_quotas()->set_data_stream_reserved_storage_quota(dbQuotas.DataStreamReservedStorageQuota); + out.mutable_database_quotas()->set_ttl_min_run_internal_seconds(dbQuotas.TtlMinRunInternalSeconds); + + for (const auto& quota : dbQuotas.StorageQuotas) { + auto protoQuota = out.mutable_database_quotas()->add_storage_quotas(); + protoQuota->set_unit_kind(quota.UnitKind); + protoQuota->set_data_size_hard_quota(quota.DataSizeHardQuota); + protoQuota->set_data_size_soft_quota(quota.DataSizeSoftQuota); + } + + for (const auto& policy : scaleRecommenderPolicies.Policies) { + auto* protoPolicy = out.mutable_scale_recommender_policies()->add_policies(); + if (std::holds_alternative(policy.Policy)) { + const auto& targetTracking = std::get(policy.Policy); + auto* protoTargetTracking = protoPolicy->mutable_target_tracking_policy(); + if (std::holds_alternative(targetTracking.Target)) { + const auto& target = std::get(targetTracking.Target); + protoTargetTracking->set_average_cpu_utilization_percent(target); + } else if (std::holds_alternative(targetTracking.Target)) { + protoTargetTracking->clear_target(); + } + } else if (std::holds_alternative(policy.Policy)) { + protoPolicy->clear_policy(); + } + } + } +} // anonymous namespace + +TListDatabasesResult::TListDatabasesResult(TStatus&& status, const Ydb::Cms::ListDatabasesResult& proto) + : TStatus(std::move(status)) + , Paths_(proto.paths().begin(), proto.paths().end()) +{} + +const std::vector& TListDatabasesResult::GetPaths() const { + return Paths_; +} + +TStorageUnits::TStorageUnits(const Ydb::Cms::StorageUnits& proto) + : UnitKind(proto.unit_kind()) + , Count(proto.count()) +{} + +TComputationalUnits::TComputationalUnits(const Ydb::Cms::ComputationalUnits& proto) + : UnitKind(proto.unit_kind()) + , AvailabilityZone(proto.availability_zone()) + , Count(proto.count()) +{} + +TAllocatedComputationalUnit::TAllocatedComputationalUnit(const Ydb::Cms::AllocatedComputationalUnit& proto) + : Host(proto.host()) + , Port(proto.port()) + , UnitKind(proto.unit_kind()) +{} + +TResources::TResources(const Ydb::Cms::Resources& proto) + : StorageUnits(proto.storage_units().begin(), proto.storage_units().end()) + , ComputationalUnits(proto.computational_units().begin(), proto.computational_units().end()) +{} + +TServerlessResources::TServerlessResources(const Ydb::Cms::ServerlessResources& proto) + : SharedDatabasePath(proto.shared_database_path()) +{} + +TSchemaOperationQuotas::TLeakyBucket::TLeakyBucket(const Ydb::Cms::SchemaOperationQuotas_LeakyBucket& proto) + : BucketSize(proto.bucket_size()) + , BucketSeconds(proto.bucket_seconds()) +{} + +TSchemaOperationQuotas::TSchemaOperationQuotas(const Ydb::Cms::SchemaOperationQuotas& proto) + : LeakyBucketQuotas(proto.leaky_bucket_quotas().begin(), proto.leaky_bucket_quotas().end()) +{} + +TDatabaseQuotas::TStorageQuotas::TStorageQuotas(const Ydb::Cms::DatabaseQuotas_StorageQuotas& proto) + : UnitKind(proto.unit_kind()) + , DataSizeHardQuota(proto.data_size_hard_quota()) + , DataSizeSoftQuota(proto.data_size_soft_quota()) +{} + +TDatabaseQuotas::TDatabaseQuotas(const Ydb::Cms::DatabaseQuotas& proto) + : DataSizeHardQuota(proto.data_size_hard_quota()) + , DataSizeSoftQuota(proto.data_size_soft_quota()) + , DataStreamShardsQuota(proto.data_stream_shards_quota()) + , DataStreamReservedStorageQuota(proto.data_stream_reserved_storage_quota()) + , TtlMinRunInternalSeconds(proto.ttl_min_run_internal_seconds()) + , StorageQuotas(proto.storage_quotas().begin(), proto.storage_quotas().end()) +{} + +TTargetTrackingPolicy::TTargetTrackingPolicy(const Ydb::Cms::ScaleRecommenderPolicies_ScaleRecommenderPolicy_TargetTrackingPolicy& proto) +{ + switch (proto.target_case()) { + case Ydb::Cms::ScaleRecommenderPolicies_ScaleRecommenderPolicy_TargetTrackingPolicy::kAverageCpuUtilizationPercent: + Target = proto.average_cpu_utilization_percent(); + break; + case Ydb::Cms::ScaleRecommenderPolicies_ScaleRecommenderPolicy_TargetTrackingPolicy::TARGET_NOT_SET: + Target = std::monostate(); + break; + } +} + +TScaleRecommenderPolicy::TScaleRecommenderPolicy(const Ydb::Cms::ScaleRecommenderPolicies_ScaleRecommenderPolicy& proto) +{ + switch (proto.policy_case()) { + case Ydb::Cms::ScaleRecommenderPolicies_ScaleRecommenderPolicy::kTargetTrackingPolicy: + Policy = proto.target_tracking_policy(); + break; + case Ydb::Cms::ScaleRecommenderPolicies_ScaleRecommenderPolicy::POLICY_NOT_SET: + Policy = std::monostate(); + break; + } +} + +TScaleRecommenderPolicies::TScaleRecommenderPolicies(const Ydb::Cms::ScaleRecommenderPolicies& proto) + : Policies(proto.policies().begin(), proto.policies().end()) +{} + +TGetDatabaseStatusResult::TGetDatabaseStatusResult(TStatus&& status, const Ydb::Cms::GetDatabaseStatusResult& proto) + : TStatus(std::move(status)) + , Path_(proto.path()) + , State_(ConvertState(proto.state())) + , AllocatedResources_(proto.allocated_resources()) + , RegisteredResources_(proto.registered_resources().begin(), proto.registered_resources().end()) + , Generation_(proto.generation()) + , SchemaOperationQuotas_(proto.schema_operation_quotas()) + , DatabaseQuotas_(proto.database_quotas()) + , ScaleRecommenderPolicies_(proto.scale_recommender_policies()) +{ + switch (proto.resources_kind_case()) { + case Ydb::Cms::GetDatabaseStatusResult::kRequiredResources: + ResourcesKind_ = TResources(proto.required_resources()); + break; + case Ydb::Cms::GetDatabaseStatusResult::kRequiredSharedResources: + ResourcesKind_ = TSharedResources(proto.required_shared_resources()); + break; + case Ydb::Cms::GetDatabaseStatusResult::kServerlessResources: + ResourcesKind_ = proto.serverless_resources(); + break; + case Ydb::Cms::GetDatabaseStatusResult::RESOURCES_KIND_NOT_SET: + ResourcesKind_ = std::monostate(); + break; + } +} + +const std::string& TGetDatabaseStatusResult::GetPath() const { + return Path_; +} + +EState TGetDatabaseStatusResult::GetState() const { + return State_; +} + +const TResourcesKind& TGetDatabaseStatusResult::GetResourcesKind() const { + return ResourcesKind_; +} + +const TResources& TGetDatabaseStatusResult::GetAllocatedResources() const { + return AllocatedResources_; +} + +const std::vector& TGetDatabaseStatusResult::GetRegisteredResources() const { + return RegisteredResources_; +} + +std::uint64_t TGetDatabaseStatusResult::GetGeneration() const { + return Generation_; +} + +const TSchemaOperationQuotas& TGetDatabaseStatusResult::GetSchemaOperationQuotas() const { + return SchemaOperationQuotas_; +} + +const TDatabaseQuotas& TGetDatabaseStatusResult::GetDatabaseQuotas() const { + return DatabaseQuotas_; +} + +const TScaleRecommenderPolicies& TGetDatabaseStatusResult::GetScaleRecommenderPolicies() const { + return ScaleRecommenderPolicies_; +} + +void TGetDatabaseStatusResult::SerializeTo(Ydb::Cms::CreateDatabaseRequest& request) const { + request.set_path(Path_); + SerializeToImpl(ResourcesKind_, SchemaOperationQuotas_, DatabaseQuotas_, ScaleRecommenderPolicies_, request); +} + +TCreateDatabaseSettings::TCreateDatabaseSettings(const Ydb::Cms::CreateDatabaseRequest& request) + : SchemaOperationQuotas_(request.schema_operation_quotas()) + , DatabaseQuotas_(request.database_quotas()) + , ScaleRecommenderPolicies_(request.scale_recommender_policies()) +{ + switch (request.resources_kind_case()) { + case Ydb::Cms::CreateDatabaseRequest::kResources: + ResourcesKind_ = TResources(request.resources()); + break; + case Ydb::Cms::CreateDatabaseRequest::kSharedResources: + ResourcesKind_ = TSharedResources(request.shared_resources()); + break; + case Ydb::Cms::CreateDatabaseRequest::kServerlessResources: + ResourcesKind_ = request.serverless_resources(); + break; + case Ydb::Cms::CreateDatabaseRequest::RESOURCES_KIND_NOT_SET: + ResourcesKind_ = std::monostate(); + break; + } +} + +void TCreateDatabaseSettings::SerializeTo(Ydb::Cms::CreateDatabaseRequest& request) const { + SerializeToImpl(ResourcesKind_, SchemaOperationQuotas_, DatabaseQuotas_, ScaleRecommenderPolicies_, request); +} + +class TCmsClient::TImpl : public TClientImplCommon { +public: + TImpl(std::shared_ptr&& connections, const TCommonClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) + { } + + TAsyncListDatabasesResult ListDatabases(const TListDatabasesSettings& settings) { + auto request = MakeOperationRequest(settings); + + auto promise = NThreading::NewPromise(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::Cms::ListDatabasesResult result; + if (any) { + any->UnpackTo(&result); + } + TListDatabasesResult val{TStatus(std::move(status)), result}; + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred( + std::move(request), + extractor, + &Ydb::Cms::V1::CmsService::Stub::AsyncListDatabases, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); + } + + TAsyncGetDatabaseStatusResult GetDatabaseStatus(const std::string& path, const TGetDatabaseStatusSettings& settings) { + auto request = MakeOperationRequest(settings); + request.set_path(path); + + auto promise = NThreading::NewPromise(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::Cms::GetDatabaseStatusResult result; + if (any) { + any->UnpackTo(&result); + } + TGetDatabaseStatusResult val{TStatus(std::move(status)), result}; + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred( + std::move(request), + extractor, + &Ydb::Cms::V1::CmsService::Stub::AsyncGetDatabaseStatus, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); + } + + TAsyncStatus CreateDatabase(const std::string& path, const TCreateDatabaseSettings& settings) { + auto request = MakeOperationRequest(settings); + request.set_path(path); + settings.SerializeTo(request); + + return RunSimple( + std::move(request), + &Ydb::Cms::V1::CmsService::Stub::AsyncCreateDatabase, + TRpcRequestSettings::Make(settings)); + } +}; + +TCmsClient::TCmsClient(const TDriver& driver, const TCommonClientSettings& settings) + : Impl_(new TImpl(CreateInternalInterface(driver), settings)) +{} + +TAsyncListDatabasesResult TCmsClient::ListDatabases(const TListDatabasesSettings& settings) { + return Impl_->ListDatabases(settings); +} + +TAsyncGetDatabaseStatusResult TCmsClient::GetDatabaseStatus( + const std::string& path, + const TGetDatabaseStatusSettings& settings) +{ + return Impl_->GetDatabaseStatus(path, settings); +} + +TAsyncStatus TCmsClient::CreateDatabase( + const std::string& path, + const TCreateDatabaseSettings& settings) +{ + return Impl_->CreateDatabase(path, settings); +} + +} // namespace NYdb::NCms diff --git a/src/client/coordination/coordination.cpp b/src/client/coordination/coordination.cpp index f141a58a7e..080cd18659 100644 --- a/src/client/coordination/coordination.cpp +++ b/src/client/coordination/coordination.cpp @@ -17,7 +17,6 @@ namespace NCoordination { using NThreading::TFuture; using NThreading::TPromise; using NThreading::NewPromise; -using NYdbGrpc::TQueueClientFixedEvent; namespace { @@ -35,30 +34,6 @@ inline TResultPromise NewResultPromise() { //////////////////////////////////////////////////////////////////////////////// -template -void ConvertSettingsToProtoConfig( - const Settings& settings, - Ydb::Coordination::Config* config) -{ - if (settings.SelfCheckPeriod_) { - config->set_self_check_period_millis(settings.SelfCheckPeriod_->MilliSeconds()); - } - if (settings.SessionGracePeriod_) { - config->set_session_grace_period_millis(settings.SessionGracePeriod_->MilliSeconds()); - } - if (settings.ReadConsistencyMode_ != EConsistencyMode::UNSET) { - config->set_read_consistency_mode(static_cast(settings.ReadConsistencyMode_)); - } - if (settings.AttachConsistencyMode_ != EConsistencyMode::UNSET) { - config->set_attach_consistency_mode(static_cast(settings.AttachConsistencyMode_)); - } - if (settings.RateLimiterCountersMode_ != ERateLimiterCountersMode::UNSET) { - config->set_rate_limiter_counters_mode(static_cast(settings.RateLimiterCountersMode_)); - } -} - -//////////////////////////////////////////////////////////////////////////////// - std::string GenerateProtectionKey(size_t size) { std::string key; if (size > 0) { @@ -89,6 +64,12 @@ struct TNodeDescription::TImpl { Proto_ = desc; } + void SerializeTo(Ydb::Coordination::CreateNodeRequest& creationRequest) { + auto& config = *creationRequest.mutable_config(); + config.CopyFrom(Proto_.config()); + config.clear_path(); + } + std::optional SelfCheckPeriod_; std::optional SessionGracePeriod_; EConsistencyMode ReadConsistencyMode_; @@ -136,6 +117,10 @@ const Ydb::Coordination::DescribeNodeResult& TNodeDescription::GetProto() const return Impl_->Proto_; } +void TNodeDescription::SerializeTo(Ydb::Coordination::CreateNodeRequest& creationRequest) const { + return Impl_->SerializeTo(creationRequest); +} + //////////////////////////////////////////////////////////////////////////////// TSemaphoreSession::TSemaphoreSession() { @@ -1807,6 +1792,52 @@ class TSessionContext : public TThrRefBase { //////////////////////////////////////////////////////////////////////////////// +namespace { + +template +void ConvertSettingsToProtoConfig( + const Settings& settings, + Ydb::Coordination::Config* config) +{ + if (settings.SelfCheckPeriod_) { + config->set_self_check_period_millis(settings.SelfCheckPeriod_->MilliSeconds()); + } + if (settings.SessionGracePeriod_) { + config->set_session_grace_period_millis(settings.SessionGracePeriod_->MilliSeconds()); + } + if (settings.ReadConsistencyMode_ != EConsistencyMode::UNSET) { + config->set_read_consistency_mode(static_cast(settings.ReadConsistencyMode_)); + } + if (settings.AttachConsistencyMode_ != EConsistencyMode::UNSET) { + config->set_attach_consistency_mode(static_cast(settings.AttachConsistencyMode_)); + } + if (settings.RateLimiterCountersMode_ != ERateLimiterCountersMode::UNSET) { + config->set_rate_limiter_counters_mode(static_cast(settings.RateLimiterCountersMode_)); + } +} + +} + +TCreateNodeSettings::TCreateNodeSettings(const Ydb::Coordination::Config& config) { + if (config.self_check_period_millis() != 0u) { + SelfCheckPeriod(TDuration::MilliSeconds(config.self_check_period_millis())); + } + if (config.session_grace_period_millis() != 0u) { + SessionGracePeriod(TDuration::MilliSeconds(config.session_grace_period_millis())); + } + if (config.read_consistency_mode() != Ydb::Coordination::CONSISTENCY_MODE_UNSET) { + ReadConsistencyMode(static_cast(config.read_consistency_mode())); + } + if (config.attach_consistency_mode() != Ydb::Coordination::CONSISTENCY_MODE_UNSET) { + AttachConsistencyMode(static_cast(config.attach_consistency_mode())); + } + if (config.rate_limiter_counters_mode() != Ydb::Coordination::RATE_LIMITER_COUNTERS_MODE_UNSET) { + RateLimiterCountersMode(static_cast(config.rate_limiter_counters_mode())); + } +} + +//////////////////////////////////////////////////////////////////////////////// + class TClient::TImpl : public TClientImplCommon { public: TImpl(std::shared_ptr&& connections, const TCommonClientSettings& settings) diff --git a/src/client/draft/ydb_dynamic_config.cpp b/src/client/draft/ydb_dynamic_config.cpp index 315bbe3ade..de1a1a8b08 100644 --- a/src/client/draft/ydb_dynamic_config.cpp +++ b/src/client/draft/ydb_dynamic_config.cpp @@ -156,9 +156,12 @@ class TDynamicConfigClient::TImpl : public TClientImplCommon volatileConfigs; if (Ydb::DynamicConfig::GetConfigResult result; any && any->UnpackTo(&result)) { - clusterName = result.identity().cluster(); - version = result.identity().version(); - config = result.config(); + // only if they are present + if (result.identity_size() && result.config_size()) { + clusterName = result.identity(0).cluster(); + version = result.identity(0).version(); + config = result.config(0); + } for (const auto& config : result.volatile_configs()) { volatileConfigs.emplace(config.id(), config.config()); } @@ -336,6 +339,32 @@ class TDynamicConfigClient::TImpl : public TClientImplCommon(settings); + + auto promise = NThreading::NewPromise(); + + auto extractor = [promise] (google::protobuf::Any* any, TPlainStatus status) mutable { + std::string config; + if (Ydb::DynamicConfig::FetchStartupConfigResult result; any && any->UnpackTo(&result)) { + config = result.config(); + } + + TFetchStartupConfigResult val(TStatus(std::move(status)), std::move(config)); + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred( + std::move(request), + extractor, + &Ydb::DynamicConfig::V1::DynamicConfigService::Stub::AsyncFetchStartupConfig, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); + } }; TDynamicConfigClient::TDynamicConfigClient(const TDriver& driver) @@ -431,4 +460,8 @@ TAsyncVerboseResolveConfigResult TDynamicConfigClient::VerboseResolveConfig( return Impl_->VerboseResolveConfig(config, volatileConfigs, settings); } +TAsyncFetchStartupConfigResult TDynamicConfigClient::FetchStartupConfig(const TClusterConfigSettings& settings) { + return Impl_->FetchStartupConfig(settings); +} + } // namespace NYdb::V3::NDynamicConfig diff --git a/src/client/driver/driver.cpp b/src/client/driver/driver.cpp index af4e96a277..7596a1840a 100644 --- a/src/client/driver/driver.cpp +++ b/src/client/driver/driver.cpp @@ -223,4 +223,39 @@ void TDriver::Stop(bool wait) { Impl_->Stop(wait); } +TDriverConfig TDriver::GetConfig() const { + TDriverConfig config; + + config.SetEndpoint(Impl_->DefaultDiscoveryEndpoint_); + config.SetNetworkThreadsNum(Impl_->NetworkThreadsNum_); + config.SetClientThreadsNum(Impl_->ClientThreadsNum_); + config.SetMaxClientQueueSize(Impl_->MaxQueuedResponses_); + if (Impl_->SslCredentials_.IsEnabled) { + config.UseSecureConnection(Impl_->SslCredentials_.CaCert); + } + config.UseClientCertificate(Impl_->SslCredentials_.Cert, Impl_->SslCredentials_.PrivateKey); + config.SetCredentialsProviderFactory(Impl_->DefaultCredentialsProviderFactory_); + config.SetDatabase(Impl_->DefaultDatabase_); + config.SetDiscoveryMode(Impl_->DefaultDiscoveryMode_); + config.SetMaxQueuedRequests(Impl_->MaxQueuedRequests_); + config.SetGrpcMemoryQuota(Impl_->MemoryQuota_); + config.SetTcpKeepAliveSettings( + Impl_->TcpKeepAliveSettings_.Enabled, + Impl_->TcpKeepAliveSettings_.Idle, + Impl_->TcpKeepAliveSettings_.Count, + Impl_->TcpKeepAliveSettings_.Interval + ); + config.SetDrainOnDtors(Impl_->DrainOnDtors_); + config.SetBalancingPolicy(Impl_->BalancingSettings_.Policy, Impl_->BalancingSettings_.PolicyParams); + config.SetGRpcKeepAliveTimeout(Impl_->GRpcKeepAliveTimeout_); + config.SetGRpcKeepAlivePermitWithoutCalls(Impl_->GRpcKeepAlivePermitWithoutCalls_); + config.SetSocketIdleTimeout(Impl_->SocketIdleTimeout_); + config.SetMaxInboundMessageSize(Impl_->MaxInboundMessageSize_); + config.SetMaxOutboundMessageSize(Impl_->MaxOutboundMessageSize_); + config.SetMaxMessageSize(Impl_->MaxMessageSize_); + config.Impl_->Log = Impl_->Log; + + return config; +} + } // namespace NYdb diff --git a/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp b/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp index a3eba4dc8a..dc20d3a580 100644 --- a/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp +++ b/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp @@ -136,7 +136,8 @@ class TScheduledFuture : public TScheduledObject { TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr params) : MetricRegistryPtr_(nullptr) - , ResponseQueue_(CreateThreadPool(params->GetClientThreadsNum())) + , ClientThreadsNum_(params->GetClientThreadsNum()) + , ResponseQueue_(CreateThreadPool(ClientThreadsNum_)) , DefaultDiscoveryEndpoint_(params->GetEndpoint()) , SslCredentials_(params->GetSslCredentials()) , DefaultDatabase_(params->GetDatabase()) @@ -144,6 +145,7 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p , StateTracker_(this) , DefaultDiscoveryMode_(params->GetDiscoveryMode()) , MaxQueuedRequests_(params->GetMaxQueuedRequests()) + , MaxQueuedResponses_(params->GetMaxQueuedResponses()) , DrainOnDtors_(params->GetDrinOnDtors()) , BalancingSettings_(params->GetBalancingSettings()) , GRpcKeepAliveTimeout_(params->GetGRpcKeepAliveTimeout()) @@ -153,14 +155,17 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p , MaxOutboundMessageSize_(params->GetMaxOutboundMessageSize()) , MaxMessageSize_(params->GetMaxMessageSize()) , QueuedRequests_(0) + , TcpKeepAliveSettings_(params->GetTcpKeepAliveSettings()) + , SocketIdleTimeout_(params->GetSocketIdleTimeout()) #ifndef YDB_GRPC_BYPASS_CHANNEL_POOL - , ChannelPool_(params->GetTcpKeepAliveSettings(), params->GetSocketIdleTimeout()) + , ChannelPool_(TcpKeepAliveSettings_, SocketIdleTimeout_) #endif - , GRpcClientLow_(params->GetNetworkThreadsNum()) + , NetworkThreadsNum_(params->GetNetworkThreadsNum()) + , GRpcClientLow_(NetworkThreadsNum_) , Log(params->GetLog()) { #ifndef YDB_GRPC_BYPASS_CHANNEL_POOL - if (params->GetSocketIdleTimeout() != TDuration::Max()) { + if (SocketIdleTimeout_ != TDuration::Max()) { auto channelPoolUpdateWrapper = [this] (NYdb::NIssue::TIssues&&, EStatus status) mutable { @@ -171,11 +176,11 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p ChannelPool_.DeleteExpiredStubsHolders(); return true; }; - AddPeriodicTask(channelPoolUpdateWrapper, params->GetSocketIdleTimeout() * 0.1); + AddPeriodicTask(channelPoolUpdateWrapper, SocketIdleTimeout_ * 0.1); } #endif //TAdaptiveThreadPool ignores params - ResponseQueue_->Start(params->GetClientThreadsNum(), params->GetMaxQueuedResponses()); + ResponseQueue_->Start(ClientThreadsNum_, MaxQueuedResponses_); if (!DefaultDatabase_.empty()) { DefaultState_ = StateTracker_.GetDriverState( DefaultDatabase_, diff --git a/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h index 3eeb79af21..b927d0c511 100644 --- a/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h +++ b/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h @@ -40,6 +40,7 @@ class TGRpcConnectionsImpl , public IInternalClient { friend class TDeferredAction; + friend class TDriver; public: TGRpcConnectionsImpl(std::shared_ptr params); ~TGRpcConnectionsImpl(); @@ -689,6 +690,7 @@ class TGRpcConnectionsImpl std::mutex ExtensionsLock_; ::NMonitoring::TMetricRegistry* MetricRegistryPtr_ = nullptr; + const size_t ClientThreadsNum_; std::unique_ptr ResponseQueue_; const std::string DefaultDiscoveryEndpoint_; @@ -698,6 +700,7 @@ class TGRpcConnectionsImpl TDbDriverStateTracker StateTracker_; const EDiscoveryMode DefaultDiscoveryMode_; const i64 MaxQueuedRequests_; + const i64 MaxQueuedResponses_; const bool DrainOnDtors_; const TBalancingSettings BalancingSettings_; const TDuration GRpcKeepAliveTimeout_; @@ -708,6 +711,8 @@ class TGRpcConnectionsImpl const ui64 MaxMessageSize_; std::atomic_int64_t QueuedRequests_; + const NYdbGrpc::TTcpKeepAliveSettings TcpKeepAliveSettings_; + const TDuration SocketIdleTimeout_; #ifndef YDB_GRPC_BYPASS_CHANNEL_POOL NYdbGrpc::TChannelPool ChannelPool_; #endif @@ -719,6 +724,7 @@ class TGRpcConnectionsImpl IDiscoveryMutatorApi::TMutatorCb DiscoveryMutatorCb; + const size_t NetworkThreadsNum_; // Must be the last member (first called destructor) NYdbGrpc::TGRpcClientLow GRpcClientLow_; TLog Log; diff --git a/src/client/query/stats.cpp b/src/client/query/stats.cpp index 02da828dfa..fc82e4293d 100644 --- a/src/client/query/stats.cpp +++ b/src/client/query/stats.cpp @@ -31,6 +31,7 @@ std::string TExecStats::ToString(bool withPlan) const { if (!withPlan) { proto.clear_query_plan(); proto.clear_query_ast(); + proto.clear_query_meta(); } TStringType res; @@ -58,6 +59,16 @@ std::optional TExecStats::GetAst() const { return proto.query_ast(); } +std::optional TExecStats::GetMeta() const { + auto proto = Impl_->Proto; + + if (proto.query_meta().empty()) { + return {}; + } + + return proto.query_meta(); +} + TDuration TExecStats::GetTotalDuration() const { return TDuration::MicroSeconds(Impl_->Proto.total_duration_us()); } diff --git a/src/client/rate_limiter/rate_limiter.cpp b/src/client/rate_limiter/rate_limiter.cpp index 05f8cca249..4cfc8a99eb 100644 --- a/src/client/rate_limiter/rate_limiter.cpp +++ b/src/client/rate_limiter/rate_limiter.cpp @@ -7,39 +7,229 @@ #include #include +#include + namespace NYdb::inline V3::NRateLimiter { -TListResourcesResult::TListResourcesResult(TStatus status, std::vector paths) - : TStatus(std::move(status)) - , ResourcePaths_(std::move(paths)) +TReplicatedBucketSettings::TReplicatedBucketSettings(const Ydb::RateLimiter::ReplicatedBucketSettings& proto) { + if (proto.has_report_interval_ms()) { + ReportInterval_ = std::chrono::milliseconds(proto.report_interval_ms()); + } +} + +void TReplicatedBucketSettings::SerializeTo(Ydb::RateLimiter::ReplicatedBucketSettings& proto) const { + if (ReportInterval_) { + proto.set_report_interval_ms(ReportInterval_->count()); + } +} + +TLeafBehavior::EBehavior TLeafBehavior::GetBehavior() const { + return static_cast(BehaviorSettings_.index()); +} + +TLeafBehavior::TLeafBehavior(const TReplicatedBucketSettings& replicatedBucket) + : BehaviorSettings_(replicatedBucket) { } -TDescribeResourceResult::TDescribeResourceResult(TStatus status, const Ydb::RateLimiter::DescribeResourceResult& result) - : TStatus(std::move(status)) - , ResourcePath_(result.resource().resource_path()) - , HierarchicalDrrProps_(result.resource().hierarchical_drr()) +TLeafBehavior::TLeafBehavior(const Ydb::RateLimiter::ReplicatedBucketSettings& replicatedBucket) + : BehaviorSettings_(replicatedBucket) { } -TDescribeResourceResult::THierarchicalDrrProps::THierarchicalDrrProps(const Ydb::RateLimiter::HierarchicalDrrSettings& settings) { - if (settings.max_units_per_second()) { - MaxUnitsPerSecond_ = settings.max_units_per_second(); +const TReplicatedBucketSettings& TLeafBehavior::GetReplicatedBucket() const { + return std::get(BehaviorSettings_); +} + +void TLeafBehavior::SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings& proto) const { + switch (GetBehavior()) { + case REPLICATED_BUCKET: + return GetReplicatedBucket().SerializeTo(*proto.mutable_replicated_bucket()); + } +} + +template +THierarchicalDrrSettings::THierarchicalDrrSettings(const Ydb::RateLimiter::HierarchicalDrrSettings& proto) { + if (proto.max_units_per_second()) { + MaxUnitsPerSecond_ = proto.max_units_per_second(); + } + + if (proto.max_burst_size_coefficient()) { + MaxBurstSizeCoefficient_ = proto.max_burst_size_coefficient(); + } + + if (proto.prefetch_coefficient()) { + PrefetchCoefficient_ = proto.prefetch_coefficient(); + } + + if (proto.prefetch_watermark()) { + PrefetchWatermark_ = proto.prefetch_watermark(); + } + + if (proto.has_immediately_fill_up_to()) { + ImmediatelyFillUpTo_ = proto.immediately_fill_up_to(); + } + + switch (proto.leaf_behavior_case()) { + case Ydb::RateLimiter::HierarchicalDrrSettings::kReplicatedBucket: + LeafBehavior_.emplace(proto.replicated_bucket()); + break; + case Ydb::RateLimiter::HierarchicalDrrSettings::LEAF_BEHAVIOR_NOT_SET: + break; + } +} + +template +void THierarchicalDrrSettings::SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings& proto) const { + if (MaxUnitsPerSecond_) { + proto.set_max_units_per_second(*MaxUnitsPerSecond_); + } + + if (MaxBurstSizeCoefficient_) { + proto.set_max_burst_size_coefficient(*MaxBurstSizeCoefficient_); + } + + if (PrefetchCoefficient_) { + proto.set_prefetch_coefficient(*PrefetchCoefficient_); + } + + if (PrefetchWatermark_) { + proto.set_prefetch_watermark(*PrefetchWatermark_); + } + + if (ImmediatelyFillUpTo_) { + proto.set_immediately_fill_up_to(*ImmediatelyFillUpTo_); + } + + if (LeafBehavior_) { + LeafBehavior_->SerializeTo(proto); + } +} + +TMetric::TMetric(const Ydb::RateLimiter::MeteringConfig_Metric& proto) { + Enabled_ = proto.enabled(); + if (proto.billing_period_sec()) { + BillingPeriod_ = std::chrono::seconds(proto.billing_period_sec()); + } + for (const auto& [k, v] : proto.labels()) { + Labels_[k] = v; + } + if (proto.has_metric_fields()) { + TStringType jsonStr; + if (auto st = google::protobuf::util::MessageToJsonString(proto.metric_fields(), &jsonStr); st.ok()) { + MetricFieldsJson_ = jsonStr; + } + } +} + +void TMetric::SerializeTo(Ydb::RateLimiter::MeteringConfig_Metric& proto) const { + proto.set_enabled(Enabled_); + if (BillingPeriod_) { + proto.set_billing_period_sec(BillingPeriod_->count()); + } + for (const auto& [k, v] : Labels_) { + (*proto.mutable_labels())[k] = v; + } + if (!MetricFieldsJson_.empty()) { + google::protobuf::util::JsonStringToMessage(MetricFieldsJson_, proto.mutable_metric_fields()); + } +} + +TMeteringConfig::TMeteringConfig(const Ydb::RateLimiter::MeteringConfig& proto) { + Enabled_ = proto.enabled(); + if (proto.report_period_ms()) { + ReportPeriod_ = std::chrono::milliseconds(proto.report_period_ms()); + } + if (proto.meter_period_ms()) { + MeterPeriod_ = std::chrono::milliseconds(proto.meter_period_ms()); + } + if (proto.collect_period_sec()) { + CollectPeriod_ = std::chrono::seconds(proto.collect_period_sec()); } + if (proto.provisioned_units_per_second()) { + ProvisionedUnitsPerSecond_ = proto.provisioned_units_per_second(); + } + if (proto.provisioned_coefficient()) { + ProvisionedCoefficient_ = proto.provisioned_coefficient(); + } + if (proto.overshoot_coefficient()) { + OvershootCoefficient_ = proto.overshoot_coefficient(); + } + if (proto.has_provisioned()) { + Provisioned_.emplace(proto.provisioned()); + } + if (proto.has_on_demand()) { + OnDemand_.emplace(proto.on_demand()); + } + if (proto.has_overshoot()) { + Overshoot_.emplace(proto.overshoot()); + } +} - if (settings.max_burst_size_coefficient()) { - MaxBurstSizeCoefficient_ = settings.max_burst_size_coefficient(); +void TMeteringConfig::SerializeTo(Ydb::RateLimiter::MeteringConfig& proto) const { + proto.set_enabled(Enabled_); + if (ReportPeriod_) { + proto.set_report_period_ms(ReportPeriod_->count()); + } + if (MeterPeriod_) { + proto.set_meter_period_ms(MeterPeriod_->count()); + } + if (CollectPeriod_) { + proto.set_collect_period_sec(CollectPeriod_->count()); + } + if (ProvisionedUnitsPerSecond_) { + proto.set_provisioned_units_per_second(*ProvisionedUnitsPerSecond_); + } + if (ProvisionedCoefficient_) { + proto.set_provisioned_coefficient(*ProvisionedCoefficient_); + } + if (OvershootCoefficient_) { + proto.set_overshoot_coefficient(*OvershootCoefficient_); } + if (Provisioned_) { + Provisioned_->SerializeTo(*proto.mutable_provisioned()); + } + if (OnDemand_) { + OnDemand_->SerializeTo(*proto.mutable_on_demand()); + } + if (Overshoot_) { + Overshoot_->SerializeTo(*proto.mutable_overshoot()); + } +} + +template struct THierarchicalDrrSettings; +template struct THierarchicalDrrSettings; +template struct THierarchicalDrrSettings; - if (settings.prefetch_coefficient()) { - PrefetchCoefficient_ = settings.prefetch_coefficient(); +TCreateResourceSettings::TCreateResourceSettings(const Ydb::RateLimiter::CreateResourceRequest& proto) + : THierarchicalDrrSettings(proto.resource().hierarchical_drr()) +{ + if (proto.resource().has_metering_config()) { + MeteringConfig_ = proto.resource().metering_config(); } +} + +TListResourcesResult::TListResourcesResult(TStatus status, std::vector paths) + : TStatus(std::move(status)) + , ResourcePaths_(std::move(paths)) +{ +} - if (settings.prefetch_watermark()) { - PrefetchWatermark_ = settings.prefetch_watermark(); +TDescribeResourceResult::TDescribeResourceResult(TStatus status, const Ydb::RateLimiter::DescribeResourceResult& result) + : TStatus(std::move(status)) + , ResourcePath_(result.resource().resource_path()) + , HierarchicalDrrProps_(result.resource().hierarchical_drr()) +{ + if (result.resource().has_metering_config()) { + MeteringConfig_ = result.resource().metering_config(); } } +TDescribeResourceResult::THierarchicalDrrProps::THierarchicalDrrProps(const Ydb::RateLimiter::HierarchicalDrrSettings& settings) + : THierarchicalDrrSettings(settings) +{ +} + class TRateLimiterClient::TImpl : public TClientImplCommon { public: TImpl(std::shared_ptr connections, const TCommonClientSettings& settings) @@ -68,6 +258,15 @@ class TRateLimiterClient::TImpl : public TClientImplCommonSerializeTo(hdrr); + } + if (settings.MeteringConfig_) { + settings.MeteringConfig_->SerializeTo(*resource.mutable_metering_config()); + } return request; } diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index 25fdbcf54e..af68960e83 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -557,6 +557,62 @@ TAsyncDescribeTableResult TTableClient::TImpl::DescribeTable(const std::string& return promise.GetFuture(); } +TAsyncDescribeExternalDataSourceResult TTableClient::TImpl::DescribeExternalDataSource(const std::string& path, const TDescribeExternalDataSourceSettings& settings) { + auto request = MakeOperationRequest(settings); + request.set_path(path); + + auto promise = NewPromise(); + + auto extractor = [promise, settings](google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::Table::DescribeExternalDataSourceResult proto; + if (any) { + any->UnpackTo(&proto); + } + promise.SetValue(TDescribeExternalDataSourceResult(TStatus(std::move(status)), std::move(proto))); + }; + + Connections_->RunDeferred( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncDescribeExternalDataSource, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings) + ); + + return promise.GetFuture(); +} + +TAsyncDescribeExternalTableResult TTableClient::TImpl::DescribeExternalTable(const std::string& path, const TDescribeExternalTableSettings& settings) { + auto request = MakeOperationRequest(settings); + request.set_path(path); + + auto promise = NewPromise(); + + auto extractor = [promise, settings](google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::Table::DescribeExternalTableResult proto; + if (any) { + any->UnpackTo(&proto); + } + promise.SetValue(TDescribeExternalTableResult(TStatus(std::move(status)), std::move(proto))); + }; + + Connections_->RunDeferred( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncDescribeExternalTable, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings) + ); + + return promise.GetFuture(); +} + TAsyncPrepareQueryResult TTableClient::TImpl::PrepareDataQuery(const TSession& session, const std::string& query, const TPrepareDataQuerySettings& settings) { diff --git a/src/client/table/impl/table_client.h b/src/client/table/impl/table_client.h index 03c78f282b..879025e4c7 100644 --- a/src/client/table/impl/table_client.h +++ b/src/client/table/impl/table_client.h @@ -66,6 +66,8 @@ class TTableClient::TImpl: public TClientImplCommon, public TFuture RenameTables(Ydb::Table::RenameTablesRequest&& request, const TRenameTablesSettings& settings); TFuture DropTable(const std::string& sessionId, const std::string& path, const TDropTableSettings& settings); TAsyncDescribeTableResult DescribeTable(const std::string& sessionId, const std::string& path, const TDescribeTableSettings& settings); + TAsyncDescribeExternalDataSourceResult DescribeExternalDataSource(const std::string& path, const TDescribeExternalDataSourceSettings& settings); + TAsyncDescribeExternalTableResult DescribeExternalTable(const std::string& path, const TDescribeExternalTableSettings& settings); template TAsyncDataQueryResult ExecuteDataQuery(TSession& session, const std::string& query, const TTxControl& txControl, diff --git a/src/client/table/proto_accessor.cpp b/src/client/table/proto_accessor.cpp index 98224b2578..131c50943f 100644 --- a/src/client/table/proto_accessor.cpp +++ b/src/client/table/proto_accessor.cpp @@ -12,6 +12,14 @@ const Ydb::Table::DescribeTableResult& TProtoAccessor::GetProto(const NTable::TT return tableDescription.GetProto(); } +const Ydb::Table::DescribeExternalDataSourceResult& TProtoAccessor::GetProto(const NTable::TExternalDataSourceDescription& description) { + return description.GetProto(); +} + +const Ydb::Table::DescribeExternalTableResult& TProtoAccessor::GetProto(const NTable::TExternalTableDescription& description) { + return description.GetProto(); +} + NTable::TQueryStats TProtoAccessor::FromProto(const Ydb::TableStats::QueryStats& queryStats) { return NTable::TQueryStats(queryStats); } diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index 99121ffd5b..820e5ed59b 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -1367,7 +1367,7 @@ TScanQueryPartIterator::TScanQueryPartIterator( TAsyncScanQueryPart TScanQueryPartIterator::ReadNext() { if (!ReaderImpl_ || ReaderImpl_->IsFinished()) { if (!IsSuccess()) - RaiseError(TStringBuilder() << "Attempt to perform read on an unsuccessful result " + RaiseError(TStringBuilder() << "Attempt to perform read on an unsuccessful result " << GetIssues().ToString()); RaiseError("Attempt to perform read on invalid or finished stream"); } @@ -1830,6 +1830,14 @@ TAsyncDescribeTableResult TSession::DescribeTable(const std::string& path, const return Client_->DescribeTable(SessionImpl_->GetId(), path, settings); } +TAsyncDescribeExternalDataSourceResult TSession::DescribeExternalDataSource(const std::string& path, const TDescribeExternalDataSourceSettings& settings) { + return Client_->DescribeExternalDataSource(path, settings); +} + +TAsyncDescribeExternalTableResult TSession::DescribeExternalTable(const std::string& path, const TDescribeExternalTableSettings& settings) { + return Client_->DescribeExternalTable(path, settings); +} + TAsyncDataQueryResult TSession::ExecuteDataQuery(const std::string& query, const TTxControl& txControl, const TExecDataQuerySettings& settings) { @@ -2340,7 +2348,7 @@ const std::vector& TIndexDescription::GetDataColumns() const { return DataColumns_; } -const std::variant& TIndexDescription::GetVectorIndexSettings() const { +const std::variant& TIndexDescription::GetIndexSettings() const { return SpecializedIndexSettings_; } @@ -3308,5 +3316,73 @@ TReadRowsResult::TReadRowsResult(TStatus&& status, TResultSet&& resultSet) , ResultSet(std::move(resultSet)) {} +//////////////////////////////////////////////////////////////////////////////// + +class TExternalDataSourceDescription::TImpl { + Ydb::Table::DescribeExternalDataSourceResult Proto_; + +public: + TImpl(Ydb::Table::DescribeExternalDataSourceResult&& description) + : Proto_(std::move(description)) + {} + + const Ydb::Table::DescribeExternalDataSourceResult& GetProto() const { + return Proto_; + } +}; + +TExternalDataSourceDescription::TExternalDataSourceDescription(Ydb::Table::DescribeExternalDataSourceResult&& description) + : Impl_(std::make_shared(std::move(description))) +{ +} + +const Ydb::Table::DescribeExternalDataSourceResult& TExternalDataSourceDescription::GetProto() const { + return Impl_->GetProto(); +} + +TDescribeExternalDataSourceResult::TDescribeExternalDataSourceResult(TStatus&& status, Ydb::Table::DescribeExternalDataSourceResult&& description) + : NScheme::TDescribePathResult(std::move(status), description.self()) + , ExternalDataSourceDescription_(std::move(description)) +{} + +TExternalDataSourceDescription TDescribeExternalDataSourceResult::GetExternalDataSourceDescription() const { + CheckStatusOk("TDescribeExternalDataSourceResult::GetExternalDataSourceDescription"); + return ExternalDataSourceDescription_; +} + +//////////////////////////////////////////////////////////////////////////////// + +class TExternalTableDescription::TImpl { + Ydb::Table::DescribeExternalTableResult Proto_; + +public: + TImpl(Ydb::Table::DescribeExternalTableResult&& description) + : Proto_(std::move(description)) + {} + + const Ydb::Table::DescribeExternalTableResult& GetProto() const { + return Proto_; + } +}; + +TExternalTableDescription::TExternalTableDescription(Ydb::Table::DescribeExternalTableResult&& description) + : Impl_(std::make_shared(std::move(description))) +{ +} + +const Ydb::Table::DescribeExternalTableResult& TExternalTableDescription::GetProto() const { + return Impl_->GetProto(); +} + +TDescribeExternalTableResult::TDescribeExternalTableResult(TStatus&& status, Ydb::Table::DescribeExternalTableResult&& description) + : NScheme::TDescribePathResult(std::move(status), description.self()) + , ExternalTableDescription_(std::move(description)) +{} + +TExternalTableDescription TDescribeExternalTableResult::GetExternalTableDescription() const { + CheckStatusOk("TDescribeExternalTableResult::GetExternalTableDescription"); + return ExternalTableDescription_; +} + } // namespace NTable } // namespace NYdb diff --git a/src/client/topic/impl/write_session_impl.cpp b/src/client/topic/impl/write_session_impl.cpp index 0401e17f43..6a0b40faae 100644 --- a/src/client/topic/impl/write_session_impl.cpp +++ b/src/client/topic/impl/write_session_impl.cpp @@ -167,7 +167,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat THandleResult result; if (Aborting.load()) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session is aborting and will not restart"); return result; } SessionEstablished = false; @@ -190,12 +190,12 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat if (nextDelay) { result.StartDelay = *nextDelay; result.DoRestart = true; - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got error. " << status.ToDebugString()); - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will restart in " << result.StartDelay); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Got error. " << status.ToDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session will restart in " << result.StartDelay); ResetForRetryImpl(); } else { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Got error. " << status.ToDebugString()); - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session will not restart after a fatal error"); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Got error. " << status.ToDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Write session will not restart after a fatal error"); result.DoStop = true; CheckHandleResultImpl(result); } @@ -230,7 +230,7 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del auto partition_id = Settings.PartitionId_.has_value() ? *Settings.PartitionId_ : *DirectWriteToPartitionId; - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Get partition location async, partition " << partition_id << ", delay " << delay ); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Get partition location async, partition " << partition_id << ", delay " << delay ); NYdbGrpc::IQueueClientContextPtr prevDescribePartitionContext; NYdbGrpc::IQueueClientContextPtr describePartitionContext = Client->CreateContext(); @@ -269,7 +269,7 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del auto callback = [req = std::move(request), extr = std::move(extractor), connections = std::shared_ptr(Connections), dbState = DbDriverState, - context = describePartitionContext, prefix = std::string(LogPrefix()), + context = describePartitionContext, prefix = std::string(LogPrefixImpl()), partId = partition_id]() mutable { LOG_LAZY(dbState->Log, TLOG_DEBUG, prefix + " Getting partition location, partition " + ToString(partId)); connections->Run( @@ -286,16 +286,24 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::Topic::DescribePartitionResult& proto, const NYdbGrpc::IQueueClientContextPtr& describePartitionContext) { - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got PartitionLocation response. Status " << status.GetStatus() << ", proto:\n" << proto.DebugString()); - std::string endpoint, name; THandleResult handleResult; + const Ydb::Topic::DescribeTopicResult_PartitionInfo& partition = proto.partition(); + with_lock(Lock) { + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Got PartitionLocation response. Status " << status.GetStatus() << ", proto:\n" << proto.DebugString()); + if (DescribePartitionContext == describePartitionContext) { DescribePartitionContext = nullptr; } else { return; } + + TRACE_LAZY(DbDriverState->Log, "DescribePartitionResponse", + TRACE_KV("partition_id", partition.partition_id()), + TRACE_KV("active", partition.active()), + TRACE_KV("pl_node_id", partition.partition_location().node_id()), + TRACE_KV("pl_generation", partition.partition_location().generation())); } if (!status.IsSuccess()) { @@ -314,14 +322,6 @@ void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::To return; } - const Ydb::Topic::DescribeTopicResult_PartitionInfo& partition = proto.partition(); - - TRACE_LAZY(DbDriverState->Log, "DescribePartitionResponse", - TRACE_KV("partition_id", partition.partition_id()), - TRACE_KV("active", partition.active()), - TRACE_KV("pl_node_id", partition.partition_location().node_id()), - TRACE_KV("pl_generation", partition.partition_location().generation())); - if (partition.partition_id() != Settings.PartitionId_ && Settings.PartitionId_.has_value() || !partition.has_partition_location() || partition.partition_location().node_id() == 0 || partition.partition_location().generation() == 0) { { @@ -368,12 +368,12 @@ std::optional TWriteSessionImpl::GetPreferredEndpointImpl(ui32 par bool nodeIsKnown = (bool)DbDriverState->EndpointPool.GetEndpoint(preferredEndpoint, true); if (nodeIsKnown) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "GetPreferredEndpoint: partitionId " << partitionId << ", partitionNodeId " << partitionNodeId << " exists in the endpoint pool."); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "GetPreferredEndpoint: partitionId " << partitionId << ", partitionNodeId " << partitionNodeId << " exists in the endpoint pool."); return preferredEndpoint; } else { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "GetPreferredEndpoint: partitionId " << partitionId << ", nodeId " << partitionNodeId << " does not exist in the endpoint pool."); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "GetPreferredEndpoint: partitionId " << partitionId << ", nodeId " << partitionNodeId << " does not exist in the endpoint pool."); DbDriverState->EndpointPool.UpdateAsync(); return {}; } @@ -405,13 +405,15 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race } else { // Deduplication explicitly disabled, ProducerId & MessageGroupId must be empty. if (!Settings.ProducerId_.empty() || !Settings.MessageGroupId_.empty()) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() + std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "ProducerId or MessageGroupId is not empty when deduplication is switched off"); ThrowFatalError("Explicitly disabled deduplication conflicts with non-empty ProducerId or MessageGroupId"); } } if (!Settings.ProducerId_.empty() && !Settings.MessageGroupId_.empty() && Settings.ProducerId_ != Settings.MessageGroupId_) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() + std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "ProducerId and MessageGroupId mismatch"); ThrowFatalError("ProducerId != MessageGroupId scenario is currently not supported"); } @@ -419,21 +421,23 @@ void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race Settings.CompressionExecutor_->Start(); Settings.EventHandlers_.HandlersExecutor_->Start(); - } + // Client method NThreading::TFuture TWriteSessionImpl::GetInitSeqNo() { if (!Settings.DeduplicationEnabled_.value_or(true)) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "GetInitSeqNo called with deduplication disabled"); + std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "GetInitSeqNo called with deduplication disabled"); ThrowFatalError("Cannot call GetInitSeqNo when deduplication is disabled"); } if (Settings.ValidateSeqNo_) { if (AutoSeqNoMode.has_value() && *AutoSeqNoMode) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); + std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode"); - } - else + } else { AutoSeqNoMode = false; + } } return InitSeqNoPromise.GetFuture(); } @@ -466,7 +470,6 @@ uint64_t TWriteSessionImpl::GetSeqNoImpl(uint64_t id) { } uint64_t TWriteSessionImpl::GetNextIdImpl(const std::optional& seqNo) { - Y_ABORT_UNLESS(Lock.IsLocked()); uint64_t id = ++NextId; @@ -475,13 +478,13 @@ uint64_t TWriteSessionImpl::GetNextIdImpl(const std::optional& seqNo) } if (seqNo.has_value()) { if (!Settings.DeduplicationEnabled_.value_or(true)) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "SeqNo is provided on write when deduplication is disabled"); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "SeqNo is provided on write when deduplication is disabled"); ThrowFatalError("Cannot provide SeqNo on Write() when deduplication is disabled"); } if (*AutoSeqNoMode) { LOG_LAZY(DbDriverState->Log, TLOG_ERR, - LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" + LogPrefixImpl() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" ); ThrowFatalError( "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" @@ -492,7 +495,7 @@ uint64_t TWriteSessionImpl::GetNextIdImpl(const std::optional& seqNo) } else if (!(*AutoSeqNoMode)) { LOG_LAZY(DbDriverState->Log, TLOG_ERR, - LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" + LogPrefixImpl() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" ); ThrowFatalError( "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" @@ -570,7 +573,7 @@ void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo) auto p = WrittenInTx.find(seqNo); if (p == WrittenInTx.end()) { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=?"); + LogPrefixImpl() << "OnAck: seqNo=" << seqNo << ", txId=?"); return; } @@ -581,7 +584,7 @@ void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo) ++txInfo->AckCount; LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + LogPrefixImpl() << "OnAck: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); if (txInfo->CommitCalled && (txInfo->WriteCount == txInfo->AckCount)) { txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess()); @@ -628,7 +631,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess ++txInfo->WriteCount; LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "OnWrite: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + LogPrefixImpl() << "OnWrite: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); } WrittenInTx[seqNo] = txId; } @@ -689,7 +692,7 @@ void TWriteSessionImpl::Connect(const TDuration& delay) { return; } - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Start write session. Will connect to nodeId: " << PreferredPartitionLocation.Endpoint.NodeId); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Start write session. Will connect to nodeId: " << PreferredPartitionLocation.Endpoint.NodeId); ++ConnectionGeneration; @@ -764,10 +767,10 @@ void TWriteSessionImpl::Connect(const TDuration& delay) { // RPC callback. void TWriteSessionImpl::OnConnectTimeout(const NYdbGrpc::IQueueClientContextPtr& connectTimeoutContext) { - LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Write session: connect timeout"); THandleResult handleResult; { std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefixImpl() << "Write session: connect timeout"); if (ConnectTimeoutContext == connectTimeoutContext) { Cancel(ConnectContext); ConnectContext = nullptr; @@ -843,20 +846,20 @@ void TWriteSessionImpl::InitImpl() { auto* p = init->mutable_partition_with_generation(); p->set_partition_id(partition_id); p->set_generation(PreferredPartitionLocation.Generation); - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: direct write to partition: " << partition_id << ", generation " << PreferredPartitionLocation.Generation); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: direct write to partition: " << partition_id << ", generation " << PreferredPartitionLocation.Generation); } else if (Settings.PartitionId_.has_value()) { init->set_partition_id(*Settings.PartitionId_); - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: write to partition: " << *Settings.PartitionId_); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: write to partition: " << *Settings.PartitionId_); } else { init->set_message_group_id(TStringType{Settings.MessageGroupId_}); - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: write to message_group: " << Settings.MessageGroupId_); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: write to message_group: " << Settings.MessageGroupId_); } for (const auto& attr : Settings.Meta_.Fields) { (*init->mutable_write_session_meta())[attr.first] = attr.second; } - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); - + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: send init request: "<< req.ShortDebugString()); + TRACE_LAZY(DbDriverState->Log, "InitRequest", TRACE_KV_IF(init->partitioning_case() == Ydb::Topic::StreamWriteMessage_InitRequest::kPartitionId, "partition_id", init->partition_id()), TRACE_IF(init->partitioning_case() == Ydb::Topic::StreamWriteMessage_InitRequest::kPartitionWithGeneration, @@ -911,11 +914,10 @@ void TWriteSessionImpl::ReadFromProcessor() { } void TWriteSessionImpl::OnWriteDone(NYdbGrpc::TGrpcStatus&& status, size_t connectionGeneration) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: OnWriteDone " << status.ToDebugString()); - THandleResult handleResult; { std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: OnWriteDone " << status.ToDebugString()); if (connectionGeneration != ConnectionGeneration) { return; // Message from previous connection. Ignore. } @@ -930,8 +932,6 @@ void TWriteSessionImpl::OnWriteDone(NYdbGrpc::TGrpcStatus&& status, size_t conne } void TWriteSessionImpl::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: OnReadDone " << grpcStatus.ToDebugString()); - TPlainStatus errorStatus; TProcessSrvMessageResult processResult; bool needSetValue = false; @@ -941,6 +941,7 @@ void TWriteSessionImpl::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t co bool doRead = false; { std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: OnReadDone " << grpcStatus.ToDebugString()); UpdateTimedCountersImpl(); if (connectionGeneration != ConnectionGeneration) { return; // Message from previous connection. Ignore. @@ -983,8 +984,11 @@ void TWriteSessionImpl::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t co ProcessHandleResult(processResult.HandleResult); } -TStringBuilder TWriteSessionImpl::LogPrefix() const { +TStringBuilder TWriteSessionImpl::LogPrefixImpl() const { + Y_ABORT_UNLESS(Lock.IsLocked()); + TStringBuilder ret; + ret << " TraceId [" << Settings.TraceId_ << "] ";; ret << " SessionId [" << SessionId << "] "; if (Settings.PartitionId_.has_value() || DirectWriteToPartitionId.has_value()) { @@ -1042,7 +1046,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess } case TServerMessage::kInitResponse: { const auto& initResponse = ServerMessage->init_response(); - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session established. Init response: " << initResponse.ShortDebugString()); TRACE_LAZY(DbDriverState->Log, "InitResponse", TRACE_KV("partition_id", initResponse.partition_id()), TRACE_KV("session_id", initResponse.session_id())); @@ -1050,7 +1054,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess auto prevDirectWriteToPartitionId = DirectWriteToPartitionId; if (Settings.DirectWriteToPartition_ && !Settings.PartitionId_.has_value()) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: set DirectWriteToPartitionId " << initResponse.partition_id()); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: set DirectWriteToPartitionId " << initResponse.partition_id()); DirectWriteToPartitionId = initResponse.partition_id(); } PartitionId = initResponse.partition_id(); @@ -1089,7 +1093,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess const auto& batchWriteResponse = ServerMessage->write_response(); LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString() + LogPrefixImpl() << "Write session got write response: " << batchWriteResponse.ShortDebugString() ); TWriteStat::TPtr writeStat = new TWriteStat{}; const auto& stat = batchWriteResponse.write_statistics(); @@ -1136,7 +1140,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess writeStat, }); - if (CleanupOnAcknowledged(msgId)) { + if (CleanupOnAcknowledgedImpl(msgId)) { result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } } @@ -1146,7 +1150,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess } case TServerMessage::kUpdateTokenResponse: { UpdateTokenInProgress = false; - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: token updated successfully"); UpdateTokenIfNeededImpl(); break; } @@ -1154,9 +1158,11 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess return result; } -bool TWriteSessionImpl::CleanupOnAcknowledged(uint64_t id) { +bool TWriteSessionImpl::CleanupOnAcknowledgedImpl(uint64_t id) { + Y_ABORT_UNLESS(Lock.IsLocked()); + bool result = false; - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << id); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: acknoledged message " << id); UpdateTimedCountersImpl(); const auto& sentFront = SentOriginalMessages.front(); uint64_t size = 0; @@ -1213,14 +1219,14 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) { if (wasOk) { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Estimated memory usage " << MemoryUsage + LogPrefixImpl() << "Estimated memory usage " << MemoryUsage << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])" ); } else { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" + LogPrefixImpl() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" ); } } @@ -1360,7 +1366,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with Id from " + LogPrefixImpl() << "Write " << CurrentBatch.Messages.size() << " messages with Id from " << CurrentBatch.Messages.begin()->Id << " to " << CurrentBatch.Messages.back().Id ); @@ -1451,7 +1457,7 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const { void TWriteSessionImpl::UpdateTokenIfNeededImpl() { Y_ABORT_UNLESS(Lock.IsLocked()); - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: try to update token"); if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished) { return; @@ -1462,7 +1468,7 @@ void TWriteSessionImpl::UpdateTokenIfNeededImpl() { return; } - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: updating token"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: updating token"); UpdateTokenInProgress = true; PrevToken = token; @@ -1552,7 +1558,7 @@ void TWriteSessionImpl::SendImpl() { UpdateTokenIfNeededImpl(); LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Send " << writeRequest->messages_size() << " message(s) (" + LogPrefixImpl() << "Send " << writeRequest->messages_size() << " message(s) (" << OriginalMessagesToSend.size() << " left), first sequence number is " << writeRequest->messages(0).seq_no() ); @@ -1565,7 +1571,10 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) { if (Aborting.load()) { return false; } - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: close. Timeout " << closeTimeout); + { + std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session: close. Timeout " << closeTimeout); + } auto startTime = TInstant::Now(); auto remaining = closeTimeout; bool ready = false; @@ -1588,20 +1597,17 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) { { std::lock_guard guard(Lock); ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !Aborting.load(); - } - { - std::lock_guard guard(Lock); CloseImpl(EStatus::SUCCESS, NYdb::NIssue::TIssues{}); needSetSeqNoValue = !InitSeqNoSetDone && (InitSeqNoSetDone = true); + if (ready) { + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session: gracefully shut down, all writes complete"); + } else { + LOG_LAZY(DbDriverState->Log, TLOG_WARNING, LogPrefixImpl() << "Write session: could not confirm all writes in time or session aborted, perform hard shutdown"); + } } if (needSetSeqNoValue) { InitSeqNoPromise.SetException("session closed"); } - if (ready) { - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); - } else { - LOG_LAZY(DbDriverState->Log, TLOG_WARNING, LogPrefix() << "Write session: could not confirm all writes in time or session aborted, perform hard shutdown"); - } return ready; } @@ -1654,7 +1660,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() { << Counters->counter->Val() \ /**/ - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Counters: {" LOG_COUNTER(Errors) LOG_COUNTER(CurrentSessionLifetimeMs) @@ -1676,7 +1682,7 @@ void TWriteSessionImpl::AbortImpl() { Y_ABORT_UNLESS(Lock.IsLocked()); if (!Aborting.load()) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting"); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: aborting"); Aborting.store(1); Cancel(DescribePartitionContext); Cancel(ConnectContext); @@ -1686,7 +1692,7 @@ void TWriteSessionImpl::AbortImpl() { Processor->Cancel(); Cancel(ClientContext); ClientContext.reset(); // removes context from contexts set from underlying gRPC-client. - + CancelTransactions(); } } @@ -1708,7 +1714,7 @@ void TWriteSessionImpl::CancelTransactions() void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYdb::NIssue::TIssues&& issues) { Y_ABORT_UNLESS(Lock.IsLocked()); - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session will now close"); EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); AbortImpl(); } @@ -1724,16 +1730,16 @@ void TWriteSessionImpl::CloseImpl(EStatus statusCode, const std::string& message void TWriteSessionImpl::CloseImpl(TPlainStatus&& status) { Y_ABORT_UNLESS(Lock.IsLocked()); - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session will now close"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Write session will now close"); EventsQueue->Close(TSessionClosedEvent(std::move(status))); AbortImpl(); } TWriteSessionImpl::~TWriteSessionImpl() { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: destroy"); bool needClose = false; { std::lock_guard guard(Lock); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefixImpl() << "Write session: destroy"); if (!Aborting.load()) { CloseImpl(EStatus::SUCCESS, NYdb::NIssue::TIssues{}); diff --git a/src/client/topic/impl/write_session_impl.h b/src/client/topic/impl/write_session_impl.h index 448134b60f..5a3a536d71 100644 --- a/src/client/topic/impl/write_session_impl.h +++ b/src/client/topic/impl/write_session_impl.h @@ -368,7 +368,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, private: - TStringBuilder LogPrefix() const; + TStringBuilder LogPrefixImpl() const; void UpdateTokenIfNeededImpl(); @@ -399,7 +399,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, //std::string GetDebugIdentity() const; TClientMessage GetInitClientMessage(); - bool CleanupOnAcknowledged(uint64_t id); + bool CleanupOnAcknowledgedImpl(uint64_t id); bool IsReadyToSendNextImpl() const; uint64_t GetNextIdImpl(const std::optional& seqNo); uint64_t GetSeqNoImpl(uint64_t id); diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index d8c5479580..a057b113f8 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -237,6 +238,8 @@ class TFixture : public NUnitTest::TBaseFixture { virtual bool GetEnableHtapTx() const; virtual bool GetAllowOlapDataQuery() const; + size_t GetPQCacheRenameKeysCount(); + private: template E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx); @@ -246,8 +249,10 @@ class TFixture : public NUnitTest::TBaseFixture { ui64 GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition); - TVector GetTabletKeys(const TActorId& actorId, - ui64 tabletId); + std::vector GetTabletKeys(const TActorId& actorId, + ui64 tabletId); + std::vector GetPQTabletDataKeys(const TActorId& actorId, + ui64 tabletId); NPQ::TWriteId GetTransactionWriteId(const TActorId& actorId, ui64 tabletId); void SendLongTxLockStatus(const TActorId& actorId, @@ -1047,7 +1052,8 @@ ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPat return Max(); } -TVector TFixture::GetTabletKeys(const TActorId& actorId, ui64 tabletId) +std::vector TFixture::GetTabletKeys(const TActorId& actorId, + ui64 tabletId) { using TEvKeyValue = NKikimr::TEvKeyValue; @@ -1072,7 +1078,7 @@ TVector TFixture::GetTabletKeys(const TActorId& actorId, ui64 tabletId) UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345); UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadRangeResultSize(), 1); - TVector keys; + std::vector keys; auto& result = response->Record.GetReadRangeResult(0); for (size_t i = 0; i < result.PairSize(); ++i) { @@ -1083,6 +1089,43 @@ TVector TFixture::GetTabletKeys(const TActorId& actorId, ui64 tabletId) return keys; } +std::vector TFixture::GetPQTabletDataKeys(const TActorId& actorId, + ui64 tabletId) +{ + using namespace NKikimr::NPQ; + + std::vector keys; + + for (const auto& key : GetTabletKeys(actorId, tabletId)) { + if (key.empty() || + ((std::tolower(key.front()) != TKeyPrefix::TypeData) && + (std::tolower(key.front()) != TKeyPrefix::TypeTmpData))) { + continue; + } + + keys.push_back(key); + } + + return keys; +} + +size_t TFixture::GetPQCacheRenameKeysCount() +{ + using namespace NKikimr::NPQ; + + auto& runtime = Setup->GetRuntime(); + TActorId edge = runtime.AllocateEdgeActor(); + + auto request = MakeHolder(); + + runtime.Send(MakePersQueueL2CacheID(), edge, request.Release()); + + TAutoPtr handle; + auto* result = runtime.GrabEdgeEvent(handle); + + return result->RenamedKeys; +} + void TFixture::RestartLongTxService() { auto& runtime = Setup->GetRuntime(); @@ -1754,7 +1797,7 @@ void TFixture::CheckTabletKeys(const TString& topicName) }; bool found; - TVector keys; + std::vector keys; for (size_t i = 0; i < 20; ++i) { keys = GetTabletKeys(edge, tabletId); @@ -2578,6 +2621,55 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture) UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2); } +Y_UNIT_TEST_F(WriteToTopic_Demo_50, TFixture) +{ + // We write to the topic in the transaction. When a transaction is committed, the keys in the blob + // cache are renamed. + CreateTopic("topic_A", TEST_CONSUMER); + CreateTopic("topic_B", TEST_CONSUMER); + + TString message(128_KB, 'x'); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); + + auto session = CreateTableSession(); + + // tx #1 + // After the transaction commit, there will be no large blobs in the batches. The number of renames + // will not change in the cache. + auto tx = BeginTx(session); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx); + + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); + + CommitTx(tx, EStatus::SUCCESS); + + Sleep(TDuration::Seconds(5)); + + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); + + // tx #2 + // After the commit, the party will rename one big blob + tx = BeginTx(session); + + for (unsigned i = 0; i < 80; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx); + } + + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx); + + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); + + CommitTx(tx, EStatus::SUCCESS); + + Sleep(TDuration::Seconds(5)); + + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 1); +} + class TFixtureSinks : public TFixture { protected: void CreateRowTable(const TString& path);