From 70ba5ba2e81545e6b1fdc239857e4692ea164b34 Mon Sep 17 00:00:00 2001 From: Semyon Date: Wed, 4 Dec 2024 14:18:21 +0000 Subject: [PATCH 1/8] Moved commit "add tiering info to TTL in public api" from ydb repo --- include/ydb-cpp-sdk/client/table/table.h | 69 +++++++-- src/api/protos/ydb_table.proto | 48 +++++- src/client/table/table.cpp | 177 ++++++++++++++++++----- 3 files changed, 247 insertions(+), 47 deletions(-) diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index d60bc342a1..d040b97c29 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -28,9 +28,12 @@ class KMeansTreeSettings; class PartitioningSettings; class DateTypeColumnModeSettings; class TtlSettings; +class TtlTier; class TableIndex; class TableIndexDescription; class ValueSinceUnixEpochModeSettings; +class DateTypeColumnModeSettingsV1; +class ValueSinceUnixEpochModeSettingsV1; } // namespace Table } // namespace Ydb @@ -427,17 +430,46 @@ struct TPartitionStats { uint32_t LeaderNodeId = 0; }; +struct TTtlDeleteAction {}; + +struct TTtlEvictToExternalStorageAction { + std::string StorageName; +}; + +class TTtlTierSettings { +public: + using TAction = std::variant< + std::monostate, + TTtlDeleteAction, + TTtlEvictToExternalStorageAction + >; + +public: + explicit TTtlTierSettings(TDuration applyAfter, const TAction& action); + explicit TTtlTierSettings(const Ydb::Table::TtlTier& tier); + void SerializeTo(Ydb::Table::TtlTier& proto) const; + + TDuration GetApplyAfter() const; + const TAction& GetAction() const; + +private: + TDuration ApplyAfter_; + TAction Action_; +}; + class TDateTypeColumnModeSettings { public: - explicit TDateTypeColumnModeSettings(const std::string& columnName, const TDuration& expireAfter); + explicit TDateTypeColumnModeSettings(const std::string& columnName, const TDuration& deprecatedExpireAfter = TDuration::Max()); void SerializeTo(Ydb::Table::DateTypeColumnModeSettings& proto) const; + void SerializeTo(Ydb::Table::DateTypeColumnModeSettingsV1& proto) const; const std::string& GetColumnName() const; + // Deprecated. Use TTtlSettings::GetExpireAfter() const TDuration& GetExpireAfter() const; private: std::string ColumnName_; - TDuration ExpireAfter_; + TDuration DeprecatedExpireAfter_; }; class TValueSinceUnixEpochModeSettings { @@ -452,8 +484,9 @@ class TValueSinceUnixEpochModeSettings { }; public: - explicit TValueSinceUnixEpochModeSettings(const std::string& columnName, EUnit columnUnit, const TDuration& expireAfter); + explicit TValueSinceUnixEpochModeSettings(const std::string& columnName, EUnit columnUnit, const TDuration& deprecatedExpireAfter = TDuration::Max()); void SerializeTo(Ydb::Table::ValueSinceUnixEpochModeSettings& proto) const; + void SerializeTo(Ydb::Table::ValueSinceUnixEpochModeSettingsV1& proto) const; const std::string& GetColumnName() const; EUnit GetColumnUnit() const; @@ -466,11 +499,17 @@ class TValueSinceUnixEpochModeSettings { private: std::string ColumnName_; EUnit ColumnUnit_; - TDuration ExpireAfter_; + TDuration DeprecatedExpireAfter_; }; //! Represents ttl settings class TTtlSettings { +private: + using TMode = std::variant< + TDateTypeColumnModeSettings, + TValueSinceUnixEpochModeSettings + >; + public: using EUnit = TValueSinceUnixEpochModeSettings::EUnit; @@ -479,25 +518,35 @@ class TTtlSettings { ValueSinceUnixEpoch = 1, }; + explicit TTtlSettings(const std::string& columnName, const std::vector& tiers); explicit TTtlSettings(const std::string& columnName, const TDuration& expireAfter); - explicit TTtlSettings(const Ydb::Table::DateTypeColumnModeSettings& mode, uint32_t runIntervalSeconds); const TDateTypeColumnModeSettings& GetDateTypeColumn() const; + // Deprecated. Use FromProto() + explicit TTtlSettings(const Ydb::Table::DateTypeColumnModeSettings& mode, uint32_t runIntervalSeconds); + explicit TTtlSettings(const std::string& columnName, EUnit columnUnit, const std::vector& tiers); explicit TTtlSettings(const std::string& columnName, EUnit columnUnit, const TDuration& expireAfter); - explicit TTtlSettings(const Ydb::Table::ValueSinceUnixEpochModeSettings& mode, uint32_t runIntervalSeconds); const TValueSinceUnixEpochModeSettings& GetValueSinceUnixEpoch() const; + // Deprecated. Use FromProto() + explicit TTtlSettings(const Ydb::Table::ValueSinceUnixEpochModeSettings& mode, uint32_t runIntervalSeconds); + static std::optional FromProto(const Ydb::Table::TtlSettings& proto); void SerializeTo(Ydb::Table::TtlSettings& proto) const; EMode GetMode() const; TTtlSettings& SetRunInterval(const TDuration& value); const TDuration& GetRunInterval() const; + const std::vector& GetTiers() const; + std::optional GetExpireAfter() const; + private: - std::variant< - TDateTypeColumnModeSettings, - TValueSinceUnixEpochModeSettings - > Mode_; + explicit TTtlSettings(TMode mode, const std::vector& tiers, ui32 runIntervalSeconds); + static std::optional GetExpireAfterFrom(const std::vector& tiers); + +private: + TMode Mode_; + std::vector Tiers_; TDuration RunInterval_ = TDuration::Zero(); }; diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index 38988c4a12..df760b4cc3 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -433,8 +433,46 @@ message ColumnMeta { } } +message EvictionToExternalStorageSettings { + // Path to external data source + string storage_name = 1; +} + +message TtlTier { + uint32 apply_after_seconds = 1; + + oneof action { + google.protobuf.Empty delete = 2; + EvictionToExternalStorageSettings evict_to_external_storage = 3; + } +} + +message DateTypeColumnModeSettingsV1 { + // The row will be assigned a tier at the moment of time, when the value + // stored in is less than or equal to the current time (in epoch + // time format), and has passed since that moment; + // i.e. the eviction threshold is the value of plus . + + // The column type must be a date type + string column_name = 1; +} + +message ValueSinceUnixEpochModeSettingsV1 { + // Same as DateTypeColumnModeSettings (above), but useful when type of the + // value stored in is not a date type. + + // The column type must be one of: + // - Uint32 + // - Uint64 + // - DyNumber + string column_name = 1; + + // Interpretation of the value stored in + ValueSinceUnixEpochModeSettings.Unit column_unit = 2; +} + message DateTypeColumnModeSettings { - // The row will be considered as expired at the moment of time, when the value + // The row will be assigned a tier at the moment of time, when the value // stored in is less than or equal to the current time (in epoch // time format), and has passed since that moment; // i.e. the expiration threshold is the value of plus . @@ -473,8 +511,10 @@ message ValueSinceUnixEpochModeSettings { message TtlSettings { oneof mode { - DateTypeColumnModeSettings date_type_column = 1; - ValueSinceUnixEpochModeSettings value_since_unix_epoch = 2; + DateTypeColumnModeSettings date_type_column = 1 [deprecated = true]; + ValueSinceUnixEpochModeSettings value_since_unix_epoch = 2 [deprecated = true]; + DateTypeColumnModeSettingsV1 date_type_column_v1 = 4; + ValueSinceUnixEpochModeSettingsV1 value_since_unix_epoch_v1 = 5; } // There is no guarantee that expired row will be deleted immediately upon @@ -490,6 +530,8 @@ message TtlSettings { // How often to run BRO on the same partition. // BRO will not be started more often, but may be started less often. uint32 run_interval_seconds = 3; + + repeated TtlTier tiers = 6; } message StorageSettings { diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index a01c53f717..96f5213b9b 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -326,23 +327,8 @@ class TTableDescription::TImpl { } // ttl settings - switch (proto.ttl_settings().mode_case()) { - case Ydb::Table::TtlSettings::kDateTypeColumn: - TtlSettings_ = TTtlSettings( - proto.ttl_settings().date_type_column(), - proto.ttl_settings().run_interval_seconds() - ); - break; - - case Ydb::Table::TtlSettings::kValueSinceUnixEpoch: - TtlSettings_ = TTtlSettings( - proto.ttl_settings().value_since_unix_epoch(), - proto.ttl_settings().run_interval_seconds() - ); - break; - - default: - break; + if (auto ttlSettings = TTtlSettings::FromProto(proto.ttl_settings())) { + TtlSettings_ = std::move(*ttlSettings); } // tiering @@ -2942,14 +2928,59 @@ bool operator!=(const TChangefeedDescription& lhs, const TChangefeedDescription& //////////////////////////////////////////////////////////////////////////////// -TDateTypeColumnModeSettings::TDateTypeColumnModeSettings(const std::string& columnName, const TDuration& expireAfter) +TTtlTierSettings::TTtlTierSettings(TDuration applyAfter, const TAction& action) + : ApplyAfter_(applyAfter) + , Action_(action) +{ } + +TTtlTierSettings::TTtlTierSettings(const Ydb::Table::TtlTier& tier) + : ApplyAfter_(TDuration::Seconds(tier.apply_after_seconds())) +{ + switch (tier.action_case()) { + case Ydb::Table::TtlTier::kDelete: + Action_ = TTtlDeleteAction(); + break; + case Ydb::Table::TtlTier::kEvictToExternalStorage: + Action_ = TTtlEvictToExternalStorageAction(tier.evict_to_external_storage().storage_name()); + break; + case Ydb::Table::TtlTier::ACTION_NOT_SET: + break; + } +} + +void TTtlTierSettings::SerializeTo(Ydb::Table::TtlTier& proto) const { + proto.set_apply_after_seconds(ApplyAfter_.Seconds()); + + std::visit(TOverloaded{ + [&proto](const TTtlDeleteAction&) { proto.mutable_delete_(); }, + [&proto](const TTtlEvictToExternalStorageAction& action) { + proto.mutable_evict_to_external_storage()->set_storage_name(action.StorageName); + }, + [](const std::monostate) {}, + }, + Action_); +} + +TDuration TTtlTierSettings::GetApplyAfter() const { + return ApplyAfter_; +} + +const TTtlTierSettings::TAction& TTtlTierSettings::GetAction() const { + return Action_; +} + +TDateTypeColumnModeSettings::TDateTypeColumnModeSettings(const std::string& columnName, const TDuration& deprecatedExpireAfter) : ColumnName_(columnName) - , ExpireAfter_(expireAfter) + , DeprecatedExpireAfter_(deprecatedExpireAfter) {} void TDateTypeColumnModeSettings::SerializeTo(Ydb::Table::DateTypeColumnModeSettings& proto) const { proto.set_column_name(TStringType{ColumnName_}); - proto.set_expire_after_seconds(ExpireAfter_.Seconds()); + proto.set_expire_after_seconds(DeprecatedExpireAfter_.Seconds()); +} + +void TDateTypeColumnModeSettings::SerializeTo(Ydb::Table::DateTypeColumnModeSettingsV1& proto) const { + proto.set_column_name(ColumnName_); } const std::string& TDateTypeColumnModeSettings::GetColumnName() const { @@ -2957,19 +2988,24 @@ const std::string& TDateTypeColumnModeSettings::GetColumnName() const { } const TDuration& TDateTypeColumnModeSettings::GetExpireAfter() const { - return ExpireAfter_; + return DeprecatedExpireAfter_; } -TValueSinceUnixEpochModeSettings::TValueSinceUnixEpochModeSettings(const std::string& columnName, EUnit columnUnit, const TDuration& expireAfter) +TValueSinceUnixEpochModeSettings::TValueSinceUnixEpochModeSettings(const std::string& columnName, EUnit columnUnit, const TDuration& deprecatedExpireAfter) : ColumnName_(columnName) , ColumnUnit_(columnUnit) - , ExpireAfter_(expireAfter) + , DeprecatedExpireAfter_(deprecatedExpireAfter) {} void TValueSinceUnixEpochModeSettings::SerializeTo(Ydb::Table::ValueSinceUnixEpochModeSettings& proto) const { proto.set_column_name(TStringType{ColumnName_}); proto.set_column_unit(TProtoAccessor::GetProto(ColumnUnit_)); - proto.set_expire_after_seconds(ExpireAfter_.Seconds()); + proto.set_expire_after_seconds(DeprecatedExpireAfter_.Seconds()); +} + +void TValueSinceUnixEpochModeSettings::SerializeTo(Ydb::Table::ValueSinceUnixEpochModeSettingsV1& proto) const { + proto.set_column_name(ColumnName_); + proto.set_column_unit(TProtoAccessor::GetProto(ColumnUnit_)); } const std::string& TValueSinceUnixEpochModeSettings::GetColumnName() const { @@ -2981,7 +3017,7 @@ TValueSinceUnixEpochModeSettings::EUnit TValueSinceUnixEpochModeSettings::GetCol } const TDuration& TValueSinceUnixEpochModeSettings::GetExpireAfter() const { - return ExpireAfter_; + return DeprecatedExpireAfter_; } void TValueSinceUnixEpochModeSettings::Out(IOutputStream& out, EUnit unit) { @@ -3024,8 +3060,13 @@ TValueSinceUnixEpochModeSettings::EUnit TValueSinceUnixEpochModeSettings::UnitFr return EUnit::Unknown; } +TTtlSettings::TTtlSettings(const std::string& columnName, const std::vector& tiers) + : Mode_(TDateTypeColumnModeSettings(columnName, GetExpireAfterFrom(tiers).value_or(TDuration::Max()))) + , Tiers_(tiers) +{} + TTtlSettings::TTtlSettings(const std::string& columnName, const TDuration& expireAfter) - : Mode_(TDateTypeColumnModeSettings(columnName, expireAfter)) + : TTtlSettings(columnName, {TTtlTierSettings(expireAfter, TTtlDeleteAction())}) {} TTtlSettings::TTtlSettings(const Ydb::Table::DateTypeColumnModeSettings& mode, ui32 runIntervalSeconds) @@ -3038,8 +3079,13 @@ const TDateTypeColumnModeSettings& TTtlSettings::GetDateTypeColumn() const { return std::get(Mode_); } +TTtlSettings::TTtlSettings(const std::string& columnName, EUnit columnUnit, const std::vector& tiers) + : Mode_(TValueSinceUnixEpochModeSettings(columnName, columnUnit, GetExpireAfterFrom(tiers).value_or(TDuration::Max()))) + , Tiers_(tiers) +{} + TTtlSettings::TTtlSettings(const std::string& columnName, EUnit columnUnit, const TDuration& expireAfter) - : Mode_(TValueSinceUnixEpochModeSettings(columnName, columnUnit, expireAfter)) + : TTtlSettings(columnName, columnUnit, {TTtlTierSettings(expireAfter, TTtlDeleteAction())}) {} TTtlSettings::TTtlSettings(const Ydb::Table::ValueSinceUnixEpochModeSettings& mode, ui32 runIntervalSeconds) @@ -3052,14 +3098,54 @@ const TValueSinceUnixEpochModeSettings& TTtlSettings::GetValueSinceUnixEpoch() c return std::get(Mode_); } +std::optional TTtlSettings::FromProto(const Ydb::Table::TtlSettings& proto) { + std::vector tiers; + for (const auto& tier : proto.tiers()) { + tiers.emplace_back(tier); + } + TDuration legacyExpireAfter = GetExpireAfterFrom(tiers).value_or(TDuration::Max()); + + switch(proto.mode_case()) { + case Ydb::Table::TtlSettings::kDateTypeColumn: + return TTtlSettings(proto.date_type_column(), proto.run_interval_seconds()); + case Ydb::Table::TtlSettings::kValueSinceUnixEpoch: + return TTtlSettings(proto.value_since_unix_epoch(), proto.run_interval_seconds()); + case Ydb::Table::TtlSettings::kDateTypeColumnV1: + return TTtlSettings( + TDateTypeColumnModeSettings(proto.date_type_column_v1().column_name(), legacyExpireAfter), tiers, proto.run_interval_seconds()); + case Ydb::Table::TtlSettings::kValueSinceUnixEpochV1: + return TTtlSettings(TValueSinceUnixEpochModeSettings(proto.value_since_unix_epoch_v1().column_name(), + TProtoAccessor::FromProto(proto.value_since_unix_epoch_v1().column_unit()), legacyExpireAfter), + tiers, proto.run_interval_seconds()); + case Ydb::Table::TtlSettings::MODE_NOT_SET: + return std::nullopt; + } +} + void TTtlSettings::SerializeTo(Ydb::Table::TtlSettings& proto) const { - switch (GetMode()) { - case EMode::DateTypeColumn: - GetDateTypeColumn().SerializeTo(*proto.mutable_date_type_column()); - break; - case EMode::ValueSinceUnixEpoch: - GetValueSinceUnixEpoch().SerializeTo(*proto.mutable_value_since_unix_epoch()); - break; + if (Tiers_.size() == 1 && std::holds_alternative(Tiers_.back().GetAction())) { + // serialize DELETE-only TTL to legacy format for backwards-compatibility + switch (GetMode()) { + case EMode::DateTypeColumn: + GetDateTypeColumn().SerializeTo(*proto.mutable_date_type_column()); + break; + case EMode::ValueSinceUnixEpoch: + GetValueSinceUnixEpoch().SerializeTo(*proto.mutable_value_since_unix_epoch()); + break; + } + } else { + switch (GetMode()) { + case EMode::DateTypeColumn: + GetDateTypeColumn().SerializeTo(*proto.mutable_date_type_column_v1()); + break; + case EMode::ValueSinceUnixEpoch: + GetValueSinceUnixEpoch().SerializeTo(*proto.mutable_value_since_unix_epoch_v1()); + break; + } + + for (const auto& tier : Tiers_) { + tier.SerializeTo(*proto.add_tiers()); + } } if (RunInterval_) { @@ -3080,6 +3166,29 @@ const TDuration& TTtlSettings::GetRunInterval() const { return RunInterval_; } +const std::vector& TTtlSettings::GetTiers() const { + return Tiers_; +} + +std::optional TTtlSettings::GetExpireAfter() const { + return GetExpireAfterFrom(Tiers_); +} + +std::optional TTtlSettings::GetExpireAfterFrom(const std::vector& tiers) { + for (const auto& tier : tiers) { + if (std::holds_alternative(tier.GetAction())) { + return tier.GetApplyAfter(); + } + } + return std::nullopt; +} + +TTtlSettings::TTtlSettings(TMode mode, const std::vector& tiers, uint32_t runIntervalSeconds) + : Mode_(std::move(mode)) + , Tiers_(tiers) + , RunInterval_(TDuration::Seconds(runIntervalSeconds)) +{ } + TAlterTtlSettings::EAction TAlterTtlSettings::GetAction() const { return static_cast(Action_.index()); } From f9d67cffdd1aacea1b161b0562b5b3ec8afbd6b6 Mon Sep 17 00:00:00 2001 From: ijon Date: Wed, 4 Dec 2024 14:31:39 +0000 Subject: [PATCH 2/8] Moved commit "ydb(d) cli: add commands for setting interrupt-inheritance flag in acl" from ydb repo --- include/ydb-cpp-sdk/client/scheme/scheme.h | 12 ++++++++++++ src/client/scheme/scheme.cpp | 4 +++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/include/ydb-cpp-sdk/client/scheme/scheme.h b/include/ydb-cpp-sdk/client/scheme/scheme.h index e87b4a941c..cec51105ae 100644 --- a/include/ydb-cpp-sdk/client/scheme/scheme.h +++ b/include/ydb-cpp-sdk/client/scheme/scheme.h @@ -119,25 +119,37 @@ struct TModifyPermissionsSettings : public TOperationRequestSettings> Actions_; bool ClearAcl_ = false; + bool SetInterruptInheritance_ = false; + bool InterruptInheritanceValue_ = false; void AddAction(EModifyPermissionsAction action, const TPermissions& permissions) { Actions_.emplace_back(std::pair{action, permissions}); } diff --git a/src/client/scheme/scheme.cpp b/src/client/scheme/scheme.cpp index 78c2786a46..08d9fb5e0e 100644 --- a/src/client/scheme/scheme.cpp +++ b/src/client/scheme/scheme.cpp @@ -245,7 +245,6 @@ class TSchemeClient::TImpl : public TClientImplCommon { TRpcRequestSettings::Make(settings)); return promise.GetFuture(); - } void PermissionsToRequest(const TPermissions& permissions, Permissions* to) { @@ -261,6 +260,9 @@ class TSchemeClient::TImpl : public TClientImplCommon { if (settings.ClearAcl_) { request.set_clear_permissions(true); } + if (settings.SetInterruptInheritance_) { + request.set_interrupt_inheritance(settings.InterruptInheritanceValue_); + } for (const auto& action : settings.Actions_) { auto protoAction = request.add_actions(); From becc1932e1dffcf0359b6f12a658ab47eddff0a9 Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Wed, 4 Dec 2024 14:46:31 +0000 Subject: [PATCH 3/8] Moved commit "Make it possible to change in-memory setting for tables" from ydb repo --- include/ydb-cpp-sdk/client/table/table.h | 11 +++++++++++ src/client/table/table.cpp | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index d040b97c29..6f02b10b8e 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -811,6 +811,7 @@ class TColumnFamilyBuilder { TColumnFamilyBuilder& SetData(const std::string& media); TColumnFamilyBuilder& SetCompression(EColumnFamilyCompression compression); + TColumnFamilyBuilder& SetKeepInMemory(bool enabled); TColumnFamilyDescription Build() const; @@ -873,6 +874,11 @@ class TTableColumnFamilyBuilder { return *this; } + TTableColumnFamilyBuilder& SetKeepInMemory(bool enabled) { + Builder_.SetKeepInMemory(enabled); + return *this; + } + TTableBuilder& EndColumnFamily(); private: @@ -1491,6 +1497,11 @@ class TAlterColumnFamilyBuilder { return *this; } + TAlterColumnFamilyBuilder& SetKeepInMemory(bool enabled) { + Builder_.SetKeepInMemory(enabled); + return *this; + } + TAlterTableSettings& EndAddColumnFamily(); TAlterTableSettings& EndAlterColumnFamily(); diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index 96f5213b9b..c5d2f08937 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -1116,6 +1116,11 @@ TColumnFamilyBuilder& TColumnFamilyBuilder::SetCompression(EColumnFamilyCompress return *this; } +TColumnFamilyBuilder& TColumnFamilyBuilder::SetKeepInMemory(bool enabled) { + Impl_->Proto.set_keep_in_memory(enabled ? Ydb::FeatureFlag::ENABLED : Ydb::FeatureFlag::DISABLED); + return *this; +} + TColumnFamilyDescription TColumnFamilyBuilder::Build() const { return TColumnFamilyDescription(Impl_->Proto); } From ca0fb03ebb7593dc200b24979654551c398be362 Mon Sep 17 00:00:00 2001 From: Evgeniy Ivanov Date: Wed, 4 Dec 2024 15:32:38 +0000 Subject: [PATCH 4/8] Moved commit "Ping service" from ydb repo --- include/ydb-cpp-sdk/client/debug/client.h | 83 +++++++++++++++++++++++ src/api/grpc/ydb_debug_v1.proto | 14 ++++ src/api/protos/ydb_debug.proto | 59 ++++++++++++++++ src/client/CMakeLists.txt | 1 + src/client/debug/CMakeLists.txt | 17 +++++ src/client/debug/client.cpp | 76 +++++++++++++++++++++ 6 files changed, 250 insertions(+) create mode 100644 include/ydb-cpp-sdk/client/debug/client.h create mode 100644 src/api/grpc/ydb_debug_v1.proto create mode 100644 src/api/protos/ydb_debug.proto create mode 100644 src/client/debug/CMakeLists.txt create mode 100644 src/client/debug/client.cpp diff --git a/include/ydb-cpp-sdk/client/debug/client.h b/include/ydb-cpp-sdk/client/debug/client.h new file mode 100644 index 0000000000..bfd8a94679 --- /dev/null +++ b/include/ydb-cpp-sdk/client/debug/client.h @@ -0,0 +1,83 @@ +#pragma once + +#include + +namespace NYdb::NDebug { + +//////////////////////////////////////////////////////////////////////////////// + +class TPlainGrpcPingResult: public TStatus { +public: + TPlainGrpcPingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + +class TGrpcProxyPingResult: public TStatus { +public: + TGrpcProxyPingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + +class TKqpProxyPingResult: public TStatus { +public: + TKqpProxyPingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + +class TSchemeCachePingResult: public TStatus { +public: + TSchemeCachePingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + +class TTxProxyPingResult: public TStatus { +public: + TTxProxyPingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + +//////////////////////////////////////////////////////////////////////////////// + +using TAsyncPlainGrpcPingResult = NThreading::TFuture; +using TAsyncGrpcProxyPingResult = NThreading::TFuture; +using TAsyncKqpProxyPingResult = NThreading::TFuture; +using TAsyncSchemeCachePingResult = NThreading::TFuture; +using TAsyncTxProxyPingResult = NThreading::TFuture; + +//////////////////////////////////////////////////////////////////////////////// + +struct TPlainGrpcPingSettings : public TOperationRequestSettings {}; +struct TGrpcProxyPingSettings : public TOperationRequestSettings {}; +struct TKqpProxyPingSettings : public TOperationRequestSettings {}; +struct TSchemeCachePingSettings : public TOperationRequestSettings {}; +struct TTxProxyPingSettings : public TOperationRequestSettings {}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TClientSettings : public TCommonClientSettingsBase { +}; + +class TDebugClient { +public: + using TAsyncPlainGrpcPingResult = TAsyncPlainGrpcPingResult; +public: + TDebugClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); + + TAsyncPlainGrpcPingResult PingPlainGrpc(const TPlainGrpcPingSettings& settings); + TAsyncGrpcProxyPingResult PingGrpcProxy(const TGrpcProxyPingSettings& settings); + TAsyncKqpProxyPingResult PingKqpProxy(const TKqpProxyPingSettings& settings); + + TAsyncSchemeCachePingResult PingSchemeCache(const TSchemeCachePingSettings& settings); + TAsyncTxProxyPingResult PingTxProxy(const TTxProxyPingSettings& settings); + +private: + class TImpl; + std::shared_ptr Impl_; +}; + +} // namespace NYdb::NDebug diff --git a/src/api/grpc/ydb_debug_v1.proto b/src/api/grpc/ydb_debug_v1.proto new file mode 100644 index 0000000000..0905b8412d --- /dev/null +++ b/src/api/grpc/ydb_debug_v1.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package Ydb.Debug.V1; +option java_package = "com.yandex.ydb.debug.v1"; + +import "src/api/protos/ydb_debug.proto"; + +service DebugService { + rpc PingPlainGrpc(Debug.PlainGrpcRequest) returns (Debug.PlainGrpcResponse); + rpc PingGrpcProxy(Debug.GrpcProxyRequest) returns (Debug.GrpcProxyResponse); + rpc PingKqpProxy(Debug.KqpProxyRequest) returns (Debug.KqpProxyResponse); + rpc PingSchemeCache(Debug.SchemeCacheRequest) returns (Debug.SchemeCacheResponse); + rpc PingTxProxy(Debug.TxProxyRequest) returns (Debug.TxProxyResponse); +} diff --git a/src/api/protos/ydb_debug.proto b/src/api/protos/ydb_debug.proto new file mode 100644 index 0000000000..64d01c52dc --- /dev/null +++ b/src/api/protos/ydb_debug.proto @@ -0,0 +1,59 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package Ydb.Debug; +option java_package = "com.yandex.ydb.debug"; +option java_outer_classname = "DebugProtos"; + +import "src/api/protos/ydb_issue_message.proto"; +import "src/api/protos/ydb_status_codes.proto"; + +// just go to GRPC without the rest of YDB + +message PlainGrpcRequest { +} + +message PlainGrpcResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} + +// Go until GrpcProxy + +message GrpcProxyRequest { +} + +message GrpcProxyResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} + +// Ping KQP proxy without executing anything + +message KqpProxyRequest { +} + +message KqpProxyResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} + +// Ping SchemeCache + +message SchemeCacheRequest { +} + +message SchemeCacheResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} + +// Ping TxProxy + +message TxProxyRequest { +} + +message TxProxyResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index 6e469a5884..736310d223 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -7,6 +7,7 @@ add_subdirectory(resources) add_subdirectory(common_client) add_subdirectory(coordination) add_subdirectory(datastreams) +add_subdirectory(debug) add_subdirectory(discovery) add_subdirectory(driver) add_subdirectory(export) diff --git a/src/client/debug/CMakeLists.txt b/src/client/debug/CMakeLists.txt new file mode 100644 index 0000000000..3095290c04 --- /dev/null +++ b/src/client/debug/CMakeLists.txt @@ -0,0 +1,17 @@ +_ydb_sdk_add_library(client-debug) + +target_link_libraries(client-debug + PUBLIC + client-ydb_driver + PRIVATE + api-grpc + api-protos + client-ydb_common_client-impl +) + +target_sources(client-debug + PRIVATE + client.cpp +) + +_ydb_sdk_make_client_component(Debug client-debug) diff --git a/src/client/debug/client.cpp b/src/client/debug/client.cpp new file mode 100644 index 0000000000..383d6e7737 --- /dev/null +++ b/src/client/debug/client.cpp @@ -0,0 +1,76 @@ +#include + +#include +#include +#include + +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + +#include + +namespace NYdb::NDebug { + +using namespace Ydb; + +using namespace NThreading; + +class TDebugClient::TImpl: public TClientImplCommon { +public: + TImpl(std::shared_ptr&& connections, const TClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) + {} + + template + auto Ping(const TSettings& settings, auto serviceMethod) { + auto pingPromise = NewPromise(); + auto responseCb = [pingPromise] (TResponse*, TPlainStatus status) mutable { + TResult val(TStatus(std::move(status))); + pingPromise.SetValue(std::move(val)); + }; + + Connections_->Run( + TRequest(), + responseCb, + serviceMethod, + DbDriverState_, + TRpcRequestSettings::Make(settings)); + + return pingPromise; + } + + ~TImpl() = default; +}; + +TDebugClient::TDebugClient(const TDriver& driver, const TClientSettings& settings) + : Impl_(new TImpl(CreateInternalInterface(driver), settings)) +{ +} + +TAsyncPlainGrpcPingResult TDebugClient::PingPlainGrpc(const TPlainGrpcPingSettings& settings) { + return Impl_->Ping( + settings, &Debug::V1::DebugService::Stub::AsyncPingPlainGrpc); +} + +TAsyncGrpcProxyPingResult TDebugClient::PingGrpcProxy(const TGrpcProxyPingSettings& settings) { + return Impl_->Ping( + settings, &Debug::V1::DebugService::Stub::AsyncPingGrpcProxy); +} + +TAsyncKqpProxyPingResult TDebugClient::PingKqpProxy(const TKqpProxyPingSettings& settings) { + return Impl_->Ping( + settings, &Debug::V1::DebugService::Stub::AsyncPingKqpProxy); +} + +TAsyncSchemeCachePingResult TDebugClient::PingSchemeCache(const TSchemeCachePingSettings& settings) { + return Impl_->Ping( + settings, &Debug::V1::DebugService::Stub::AsyncPingSchemeCache); +} + +TAsyncTxProxyPingResult TDebugClient::PingTxProxy(const TTxProxyPingSettings& settings) { + return Impl_->Ping( + settings, &Debug::V1::DebugService::Stub::AsyncPingTxProxy); +} + +} // namespace NYdb::NDebug From 1e4094bec3f428b2114fed226dc1fe47ffc6c11a Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Wed, 4 Dec 2024 15:39:12 +0000 Subject: [PATCH 5/8] Moved commit "Enhanced parallelism of data restoring in `ydb tools restore`" from ydb repo --- src/client/import/CMakeLists.txt | 6 ++++-- src/client/import/out.cpp | 5 +++++ 2 files changed, 9 insertions(+), 2 deletions(-) create mode 100644 src/client/import/out.cpp diff --git a/src/client/import/CMakeLists.txt b/src/client/import/CMakeLists.txt index 602dd45329..237fe5f7f0 100644 --- a/src/client/import/CMakeLists.txt +++ b/src/client/import/CMakeLists.txt @@ -11,8 +11,10 @@ target_link_libraries(client-ydb_import PUBLIC client-ydb_types-operation ) -target_sources(client-ydb_import PRIVATE - import.cpp +target_sources(client-ydb_import + PRIVATE + import.cpp + out.cpp ) generate_enum_serilization(client-ydb_import diff --git a/src/client/import/out.cpp b/src/client/import/out.cpp new file mode 100644 index 0000000000..af523528dd --- /dev/null +++ b/src/client/import/out.cpp @@ -0,0 +1,5 @@ +#include + +Y_DECLARE_OUT_SPEC(, NYdb::NImport::TImportDataResult, o, x) { + return x.Out(o); +} From e05d4be69777079db2e64383b3c4c1d591751059 Mon Sep 17 00:00:00 2001 From: Semyon Date: Wed, 4 Dec 2024 17:19:58 +0000 Subject: [PATCH 6/8] Moved commit "configure tiering on CS via ttl" from ydb repo --- include/ydb-cpp-sdk/client/table/table.h | 1 + src/client/table/table.cpp | 20 ++------------------ 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index 6f02b10b8e..69503ff1ad 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -665,6 +665,7 @@ class TTableDescription { std::vector GetIndexDescriptions() const; std::vector GetChangefeedDescriptions() const; std::optional GetTtlSettings() const; + // Deprecated. Use GetTtlSettings() instead std::optional GetTiering() const; EStoreType GetStoreType() const; diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index c5d2f08937..0e1f3dc3e3 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -331,11 +331,6 @@ class TTableDescription::TImpl { TtlSettings_ = std::move(*ttlSettings); } - // tiering - if (proto.tiering().size()) { - Tiering_ = proto.tiering(); - } - if (proto.store_type()) { StoreType_ = (proto.store_type() == Ydb::Table::STORE_TYPE_COLUMN) ? EStoreType::Column : EStoreType::Row; } @@ -407,9 +402,7 @@ class TTableDescription::TImpl { } for (const auto& shardStats : Proto_.table_stats().partition_stats()) { - PartitionStats_.emplace_back( - TPartitionStats{shardStats.rows_estimate(), shardStats.store_size(), shardStats.leader_node_id()} - ); + PartitionStats_.emplace_back(TPartitionStats{ shardStats.rows_estimate(), shardStats.store_size(), shardStats.leader_node_id() }); } TableStats.Rows = Proto_.table_stats().rows_estimate(); @@ -566,10 +559,6 @@ class TTableDescription::TImpl { return TtlSettings_; } - const std::optional& GetTiering() const { - return Tiering_; - } - EStoreType GetStoreType() const { return StoreType_; } @@ -650,7 +639,6 @@ class TTableDescription::TImpl { std::vector Indexes_; std::vector Changefeeds_; std::optional TtlSettings_; - std::optional Tiering_; std::string Owner_; std::vector Permissions_; std::vector EffectivePermissions_; @@ -718,7 +706,7 @@ std::optional TTableDescription::GetTtlSettings() const { } std::optional TTableDescription::GetTiering() const { - return Impl_->GetTiering(); + return std::nullopt; } EStoreType TTableDescription::GetStoreType() const { @@ -941,10 +929,6 @@ void TTableDescription::SerializeTo(Ydb::Table::CreateTableRequest& request) con ttl->SerializeTo(*request.mutable_ttl_settings()); } - if (const auto& tiering = Impl_->GetTiering()) { - request.set_tiering(TStringType{tiering.value()}); - } - if (Impl_->GetStoreType() == EStoreType::Column) { request.set_store_type(Ydb::Table::StoreType::STORE_TYPE_COLUMN); } From 67c963ce1ad67ed24818e025de005142632720c1 Mon Sep 17 00:00:00 2001 From: Bulat Gayazov Date: Thu, 5 Dec 2024 23:19:12 +0000 Subject: [PATCH 7/8] Sync api protos --- src/api/protos/draft/ydb_logstore.proto | 10 +++------- src/api/protos/ydb_table.proto | 9 ++------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/api/protos/draft/ydb_logstore.proto b/src/api/protos/draft/ydb_logstore.proto index 1680424c2e..c2dbae4767 100644 --- a/src/api/protos/draft/ydb_logstore.proto +++ b/src/api/protos/draft/ydb_logstore.proto @@ -60,10 +60,6 @@ message Tier { Ydb.Table.TtlSettings eviction = 2; // When to evict data to the next tier (or remove if none) } -message TieringSettings { - optional string tiering_id = 2; -} - message CreateLogStoreRequest { Ydb.Operations.OperationParams operation_params = 1; @@ -135,8 +131,8 @@ message CreateLogTableRequest { }; oneof ttl_specification { Ydb.Table.TtlSettings ttl_settings = 5; - TieringSettings tiering_settings = 6; }; + reserved 6; // Specifies the desired number of ColumnShards for this table uint32 shards_count = 7; @@ -160,9 +156,9 @@ message DescribeLogTableResult { string schema_preset_name = 2; Schema schema = 3; + reserved 4; oneof ttl_specification { Ydb.Table.TtlSettings ttl_settings = 5; - TieringSettings tiering_settings = 4; } // Specifies the desired number of ColumnShards for this table @@ -195,9 +191,9 @@ message AlterLogTableRequest { oneof ttl_action { google.protobuf.Empty drop_ttl_settings = 3; Ydb.Table.TtlSettings set_ttl_settings = 4; - TieringSettings set_tiering_settings = 5; google.protobuf.Empty drop_tiering_settings = 6; } + reserved 5; } message AlterLogTableResponse { diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index df760b4cc3..616963432c 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -667,8 +667,7 @@ message CreateTableRequest { Ydb.FeatureFlag.Status key_bloom_filter = 16; // Read replicas settings for table ReadReplicasSettings read_replicas_settings = 17; - // Tiering rules name. It specifies how data migrates from one tier (logical storage) to another. - string tiering = 18; + reserved 18; // Is temporary table bool temporary = 19; // Is table column or row oriented @@ -747,11 +746,7 @@ message AlterTableRequest { repeated string drop_changefeeds = 20; // Rename existed index repeated RenameIndexItem rename_indexes = 21; - // Setup or remove tiering - oneof tiering_action { - string set_tiering = 22; - google.protobuf.Empty drop_tiering = 23; - } + reserved 22, 23; } message AlterTableResponse { From 90f0213b4baac16f6d1d26acd8c3f7baf7ff5ede Mon Sep 17 00:00:00 2001 From: Bulat Gayazov Date: Thu, 5 Dec 2024 23:19:29 +0000 Subject: [PATCH 8/8] Refactor topic tests --- src/client/topic/ut/topic_to_table_ut.cpp | 258 +++++++++++++++++- .../ut/ut_utils/topic_sdk_test_setup.cpp | 4 +- .../topic/ut/ut_utils/topic_sdk_test_setup.h | 2 +- 3 files changed, 249 insertions(+), 15 deletions(-) diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index b3fcc7368e..fb77fbae99 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -3,21 +3,25 @@ #include #include #include + #include #include #include #include - +#include #include -#include +#include +#include #include namespace NYdb::NTopic::NTests { const auto TEST_MESSAGE_GROUP_ID_1 = TEST_MESSAGE_GROUP_ID + "_1"; const auto TEST_MESSAGE_GROUP_ID_2 = TEST_MESSAGE_GROUP_ID + "_2"; +const auto TEST_MESSAGE_GROUP_ID_3 = TEST_MESSAGE_GROUP_ID + "_3"; +const auto TEST_MESSAGE_GROUP_ID_4 = TEST_MESSAGE_GROUP_ID + "_4"; Y_UNIT_TEST_SUITE(TxUsage) { @@ -79,9 +83,16 @@ class TFixture : public NUnitTest::TBaseFixture { const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1, std::optional maxPartitionCount = std::nullopt); - void DescribeTopic(const TString& path); + TTopicDescription DescribeTopic(const TString& path); - void AddConsumer(const TString& topic, const TVector& consumers); + void AddConsumer(const TString& topicPath, const TVector& consumers); + void AlterAutoPartitioning(const TString& topicPath, + ui64 minActivePartitions, + ui64 maxActivePartitions, + EAutoPartitioningStrategy strategy, + TDuration stabilizationWindow, + ui64 downUtilizationPercent, + ui64 upUtilizationPercent); void WriteToTopicWithInvalidTxId(bool invalidTxId); @@ -185,6 +196,24 @@ class TFixture : public NUnitTest::TBaseFixture { const TString& consumerName, size_t count); + struct TAvgWriteBytes { + ui64 PerSec = 0; + ui64 PerMin = 0; + ui64 PerHour = 0; + ui64 PerDay = 0; + }; + + TAvgWriteBytes GetAvgWriteBytes(const TString& topicPath, + ui32 partitionId); + + void CheckAvgWriteBytes(const TString& topicPath, + ui32 partitionId, + size_t minSize, size_t maxSize); + + void SplitPartition(const TString& topicPath, + ui32 partitionId, + const TString& boundary); + private: template E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx); @@ -213,6 +242,8 @@ class TFixture : public NUnitTest::TBaseFixture { THashMap, TTopicWriteSessionContext> TopicWriteSessions; THashMap TopicReadSessions; + + ui64 SchemaTxId = 1000; }; TFixture::TTableRecord::TTableRecord(const TString& key, const TString& value) : @@ -225,6 +256,7 @@ void TFixture::SetUp(NUnitTest::TTestContext&) { NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetEnableTopicServiceTx(true); + settings.SetEnableTopicSplitMerge(true); settings.SetEnablePQConfigTransactionsAtSchemeShard(true); Setup = std::make_unique(TEST_CASE_NAME, settings); @@ -405,7 +437,7 @@ void TFixture::CreateTopic(const TString& path, Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount); } -void TFixture::AddConsumer(const TString& path, +void TFixture::AddConsumer(const TString& topicPath, const TVector& consumers) { NTopic::TTopicClient client(GetDriver()); @@ -415,13 +447,41 @@ void TFixture::AddConsumer(const TString& path, settings.BeginAddConsumer(consumer); } - auto result = client.AlterTopic(path, settings).GetValueSync(); + auto result = client.AlterTopic(topicPath, settings).GetValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } -void TFixture::DescribeTopic(const TString& path) +void TFixture::AlterAutoPartitioning(const TString& topicPath, + ui64 minActivePartitions, + ui64 maxActivePartitions, + EAutoPartitioningStrategy strategy, + TDuration stabilizationWindow, + ui64 downUtilizationPercent, + ui64 upUtilizationPercent) { - Setup->DescribeTopic(path); + NTopic::TTopicClient client(GetDriver()); + NTopic::TAlterTopicSettings settings; + + settings + .BeginAlterPartitioningSettings() + .MinActivePartitions(minActivePartitions) + .MaxActivePartitions(maxActivePartitions) + .BeginAlterAutoPartitioningSettings() + .Strategy(strategy) + .StabilizationWindow(stabilizationWindow) + .DownUtilizationPercent(downUtilizationPercent) + .UpUtilizationPercent(upUtilizationPercent) + .EndAlterAutoPartitioningSettings() + .EndAlterTopicPartitioningSettings() + ; + + auto result = client.AlterTopic(topicPath, settings).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); +} + +TTopicDescription TFixture::DescribeTopic(const TString& path) +{ + return Setup->DescribeTopic(path); } const TDriver& TFixture::GetDriver() const @@ -545,11 +605,11 @@ Y_UNIT_TEST_F(Offsets_Cannot_Be_Promoted_When_Reading_In_A_Transaction, TFixture UNIT_ASSERT_EXCEPTION(ReadMessage(reader, {.Tx = tx, .CommitOffsets = true}), yexception); } -//Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture) -//{ -// WriteToTopicWithInvalidTxId(false); -//} -// +Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture) +{ + WriteToTopicWithInvalidTxId(false); +} + //Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture) //{ // WriteToTopicWithInvalidTxId(true); @@ -1021,6 +1081,34 @@ void TFixture::Read_Exactly_N_Messages_From_Topic(const TString& topicPath, UNIT_ASSERT_VALUES_EQUAL(count, limit); } +auto TFixture::GetAvgWriteBytes(const TString& topicName, + ui32 partitionId) -> TAvgWriteBytes +{ + auto& runtime = Setup->GetRuntime(); + TActorId edge = runtime.AllocateEdgeActor(); + ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partitionId); + + runtime.SendToPipe(tabletId, edge, new NKikimr::TEvPersQueue::TEvStatus()); + auto response = runtime.GrabEdgeEvent(); + + UNIT_ASSERT_VALUES_EQUAL(tabletId, response->Record.GetTabletId()); + + TAvgWriteBytes result; + + for (size_t i = 0; i < response->Record.PartResultSize(); ++i) { + const auto& partition = response->Record.GetPartResult(i); + if (partition.GetPartition() == static_cast(partitionId)) { + result.PerSec = partition.GetAvgWriteSpeedPerSec(); + result.PerMin = partition.GetAvgWriteSpeedPerMin(); + result.PerHour = partition.GetAvgWriteSpeedPerHour(); + result.PerDay = partition.GetAvgWriteSpeedPerDay(); + break; + } + } + + return result; +} + Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture) { CreateTopic("topic_A"); @@ -2265,6 +2353,150 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_44, TFixture) Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 100); } +void TFixture::CheckAvgWriteBytes(const TString& topicPath, + ui32 partitionId, + size_t minSize, size_t maxSize) +{ +#define UNIT_ASSERT_AVGWRITEBYTES(v, minSize, maxSize) \ + UNIT_ASSERT_LE_C(minSize, v, ", actual " << minSize << " > " << v); \ + UNIT_ASSERT_LE_C(v, maxSize, ", actual " << v << " > " << maxSize); + + auto avgWriteBytes = GetAvgWriteBytes(topicPath, partitionId); + + UNIT_ASSERT_AVGWRITEBYTES(avgWriteBytes.PerSec, minSize, maxSize); + UNIT_ASSERT_AVGWRITEBYTES(avgWriteBytes.PerMin, minSize, maxSize); + UNIT_ASSERT_AVGWRITEBYTES(avgWriteBytes.PerHour, minSize, maxSize); + UNIT_ASSERT_AVGWRITEBYTES(avgWriteBytes.PerDay, minSize, maxSize); + +#undef UNIT_ASSERT_AVGWRITEBYTES +} + +void TFixture::SplitPartition(const TString& topicName, + ui32 partitionId, + const TString& boundary) +{ + NKikimr::NPQ::NTest::SplitPartition(Setup->GetRuntime(), + ++SchemaTxId, + topicName, + partitionId, + boundary); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_45, TFixture) +{ + // Writing to a topic in a transaction affects the `AvgWriteBytes` indicator + CreateTopic("topic_A", TEST_CONSUMER, 2); + + auto session = CreateTableSession(); + auto tx = BeginTx(session); + + TString message(1'000, 'x'); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message, &tx, 0); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message, &tx, 0); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx, 1); + + CommitTx(tx, EStatus::SUCCESS); + + size_t minSize = (message.size() + TEST_MESSAGE_GROUP_ID_1.size()) * 2; + size_t maxSize = minSize + 200; + + CheckAvgWriteBytes("topic_A", 0, minSize, maxSize); + + minSize = (message.size() + TEST_MESSAGE_GROUP_ID_2.size()); + maxSize = minSize + 200; + + CheckAvgWriteBytes("topic_A", 1, minSize, maxSize); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_46, TFixture) +{ + // The `split` operation of the topic partition affects the writing in the transaction. + // The transaction commit should fail with an error + CreateTopic("topic_A", TEST_CONSUMER, 2, 10); + + auto session = CreateTableSession(); + auto tx = BeginTx(session); + + TString message(1'000, 'x'); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message, &tx, 0); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message, &tx, 0); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx, 1); + + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); + + SplitPartition("topic_A", 1, "\xC0"); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx, 1); + + CommitTx(tx, EStatus::ABORTED); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_47, TFixture) +{ + // The `split` operation of the topic partition does not affect the reading in the transaction. + CreateTopic("topic_A", TEST_CONSUMER, 2, 10); + + TString message(1'000, 'x'); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message, nullptr, 0); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message, nullptr, 0); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, nullptr, 1); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, nullptr, 1); + + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2); + + SplitPartition("topic_A", 1, "\xC0"); + + auto session = CreateTableSession(); + auto tx = BeginTx(session); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); + + CloseTopicReadSession("topic_A", TEST_CONSUMER); + + messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 1); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); + + CommitTx(tx, EStatus::SUCCESS); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture) +{ + // the commit of a transaction affects the split of the partition + CreateTopic("topic_A", TEST_CONSUMER, 2, 10); + AlterAutoPartitioning("topic_A", 2, 10, EAutoPartitioningStrategy::ScaleUp, TDuration::Seconds(2), 1, 2); + + auto session = CreateTableSession(); + auto tx = BeginTx(session); + + TString message(1_MB, 'x'); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message, &tx, 0); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message, &tx, 0); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_3, message, &tx, 0); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_3, message, &tx, 0); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx, 1); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx, 1); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_4, message, &tx, 1); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_4, message, &tx, 1); + + CommitTx(tx, EStatus::SUCCESS); + + Sleep(TDuration::Seconds(5)); + + auto topicDescription = DescribeTopic("topic_A"); + + UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2); +} + } } diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 680ec178b6..c3253d1ea7 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -52,7 +52,7 @@ void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consume Server.WaitInit(path); } -void TTopicSdkTestSetup::DescribeTopic(const TString& path) +TTopicDescription TTopicSdkTestSetup::DescribeTopic(const TString& path) { TTopicClient client(MakeDriver()); @@ -62,6 +62,8 @@ void TTopicSdkTestSetup::DescribeTopic(const TString& path) auto status = client.DescribeTopic(path, settings).GetValueSync(); UNIT_ASSERT(status.IsSuccess()); + + return status.GetTopicDescription(); } TString TTopicSdkTestSetup::GetEndpoint() const { diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index 65050aabe7..7a28780148 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -21,7 +21,7 @@ class TTopicSdkTestSetup { void CreateTopicWithAutoscale(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1, size_t maxPartitionCount = 100); - void DescribeTopic(const TString& path = TString{TEST_TOPIC}); + TTopicDescription DescribeTopic(const TString& path = TString{TEST_TOPIC}); TString GetEndpoint() const; TString GetTopicPath(const TString& name = TString{TEST_TOPIC}) const;