diff --git a/include/ydb-cpp-sdk/client/bsconfig/storage_config.h b/include/ydb-cpp-sdk/client/bsconfig/storage_config.h index 4e6613df02..151a81e943 100644 --- a/include/ydb-cpp-sdk/client/bsconfig/storage_config.h +++ b/include/ydb-cpp-sdk/client/bsconfig/storage_config.h @@ -44,6 +44,9 @@ class TStorageConfigClient { // Fetch current cluster storage config TAsyncFetchStorageConfigResult FetchStorageConfig(const TStorageConfigSettings& settings = {}); + // Bootstrap cluster with automatic configuration + TAsyncStatus BootstrapCluster(const std::string& selfAssemblyUUID); + private: class TImpl; diff --git a/include/ydb-cpp-sdk/client/draft/ydb_replication.h b/include/ydb-cpp-sdk/client/draft/ydb_replication.h index cfd9e9402f..2448d3b01c 100644 --- a/include/ydb-cpp-sdk/client/draft/ydb_replication.h +++ b/include/ydb-cpp-sdk/client/draft/ydb_replication.h @@ -9,6 +9,7 @@ namespace Ydb::Replication { class ConnectionParams; + class ConsistencyLevelGlobal; class DescribeReplicationResult; class DescribeReplicationResult_Stats; } @@ -60,6 +61,19 @@ class TConnectionParams: private TCommonClientSettings { > Credentials_; }; +struct TRowConsistency { +}; + +class TGlobalConsistency { +public: + explicit TGlobalConsistency(const Ydb::Replication::ConsistencyLevelGlobal& proto); + + const TDuration& GetCommitInterval() const; + +private: + TDuration CommitInterval_; +}; + class TStats { public: TStats() = default; @@ -108,6 +122,11 @@ class TReplicationDescription { std::optional SrcChangefeedName; }; + enum class EConsistencyLevel { + Row, + Global, + }; + enum class EState { Running, Error, @@ -119,6 +138,9 @@ class TReplicationDescription { const TConnectionParams& GetConnectionParams() const; const std::vector GetItems() const; + EConsistencyLevel GetConsistencyLevel() const; + const TGlobalConsistency& GetGlobalConsistency() const; + EState GetState() const; const TRunningState& GetRunningState() const; const TErrorState& GetErrorState() const; @@ -127,6 +149,12 @@ class TReplicationDescription { private: TConnectionParams ConnectionParams_; std::vector Items_; + + std::variant< + TRowConsistency, + TGlobalConsistency + > ConsistencyLevel_; + std::variant< TRunningState, TErrorState, diff --git a/include/ydb-cpp-sdk/client/import/import.h b/include/ydb-cpp-sdk/client/import/import.h index f80b9fd8bd..3e6b191cbf 100644 --- a/include/ydb-cpp-sdk/client/import/import.h +++ b/include/ydb-cpp-sdk/client/import/import.h @@ -42,6 +42,7 @@ struct TImportFromS3Settings : public TOperationRequestSettings; - -public: - explicit TTtlTierSettings(TDuration applyAfter, const TAction& action); - explicit TTtlTierSettings(const Ydb::Table::TtlTier& tier); - void SerializeTo(Ydb::Table::TtlTier& proto) const; - - TDuration GetApplyAfter() const; - const TAction& GetAction() const; - -private: - TDuration ApplyAfter_; - TAction Action_; -}; - class TDateTypeColumnModeSettings { public: - explicit TDateTypeColumnModeSettings(const std::string& columnName, const TDuration& deprecatedExpireAfter = TDuration::Max()); + explicit TDateTypeColumnModeSettings(const std::string& columnName, const TDuration& applyAfter); void SerializeTo(Ydb::Table::DateTypeColumnModeSettings& proto) const; - void SerializeTo(Ydb::Table::DateTypeColumnModeSettingsV1& proto) const; const std::string& GetColumnName() const; - // Deprecated. Use TTtlSettings::GetExpireAfter() const TDuration& GetExpireAfter() const; private: std::string ColumnName_; - TDuration DeprecatedExpireAfter_; + TDuration ApplyAfter_; }; class TValueSinceUnixEpochModeSettings { @@ -488,9 +458,8 @@ class TValueSinceUnixEpochModeSettings { }; public: - explicit TValueSinceUnixEpochModeSettings(const std::string& columnName, EUnit columnUnit, const TDuration& deprecatedExpireAfter = TDuration::Max()); + explicit TValueSinceUnixEpochModeSettings(const std::string& columnName, EUnit columnUnit, const TDuration& applyAfter); void SerializeTo(Ydb::Table::ValueSinceUnixEpochModeSettings& proto) const; - void SerializeTo(Ydb::Table::ValueSinceUnixEpochModeSettingsV1& proto) const; const std::string& GetColumnName() const; EUnit GetColumnUnit() const; @@ -503,7 +472,45 @@ class TValueSinceUnixEpochModeSettings { private: std::string ColumnName_; EUnit ColumnUnit_; - TDuration DeprecatedExpireAfter_; + TDuration ApplyAfter_; +}; + +class TTtlDeleteAction {}; +class TTtlEvictToExternalStorageAction { +public: + TTtlEvictToExternalStorageAction(const std::string& storageName); + void SerializeTo(Ydb::Table::EvictionToExternalStorageSettings& proto) const; + + std::string GetStorage() const; + +private: + std::string Storage_; +}; + +class TTtlTierSettings { +public: + using TExpression = std::variant< + TDateTypeColumnModeSettings, + TValueSinceUnixEpochModeSettings + >; + + using TAction = std::variant< + TTtlDeleteAction, + TTtlEvictToExternalStorageAction + >; + +public: + explicit TTtlTierSettings(const TExpression& expression, const TAction& action); + + static std::optional FromProto(const Ydb::Table::TtlTier& tier); + void SerializeTo(Ydb::Table::TtlTier& proto) const; + + const TExpression& GetExpression() const; + const TAction& GetAction() const; + +private: + TExpression Expression_; + TAction Action_; }; //! Represents ttl settings @@ -522,16 +529,14 @@ class TTtlSettings { ValueSinceUnixEpoch = 1, }; - explicit TTtlSettings(const std::string& columnName, const std::vector& tiers); + explicit TTtlSettings(const std::vector& tiers); + explicit TTtlSettings(const std::string& columnName, const TDuration& expireAfter); const TDateTypeColumnModeSettings& GetDateTypeColumn() const; - // Deprecated. Use FromProto() explicit TTtlSettings(const Ydb::Table::DateTypeColumnModeSettings& mode, uint32_t runIntervalSeconds); - explicit TTtlSettings(const std::string& columnName, EUnit columnUnit, const std::vector& tiers); explicit TTtlSettings(const std::string& columnName, EUnit columnUnit, const TDuration& expireAfter); const TValueSinceUnixEpochModeSettings& GetValueSinceUnixEpoch() const; - // Deprecated. Use FromProto() explicit TTtlSettings(const Ydb::Table::ValueSinceUnixEpochModeSettings& mode, uint32_t runIntervalSeconds); static std::optional FromProto(const Ydb::Table::TtlSettings& proto); @@ -542,14 +547,8 @@ class TTtlSettings { const TDuration& GetRunInterval() const; const std::vector& GetTiers() const; - std::optional GetExpireAfter() const; - -private: - explicit TTtlSettings(TMode mode, const std::vector& tiers, ui32 runIntervalSeconds); - static std::optional GetExpireAfterFrom(const std::vector& tiers); private: - TMode Mode_; std::vector Tiers_; TDuration RunInterval_ = TDuration::Zero(); }; @@ -1323,6 +1322,10 @@ class TTxSettings { return TTxSettings(TS_SNAPSHOT_RO); } + static TTxSettings SnapshotRW() { + return TTxSettings(TS_SNAPSHOT_RW); + } + void Out(IOutputStream& out) const { switch (Mode_) { case TS_SERIALIZABLE_RW: @@ -1337,6 +1340,9 @@ class TTxSettings { case TS_SNAPSHOT_RO: out << "SnapshotRO"; break; + case TS_SNAPSHOT_RW: + out << "SnapshotRW"; + break; default: out << "Unknown"; break; @@ -1348,7 +1354,8 @@ class TTxSettings { TS_SERIALIZABLE_RW, TS_ONLINE_RO, TS_STALE_RO, - TS_SNAPSHOT_RO + TS_SNAPSHOT_RO, + TS_SNAPSHOT_RW, }; FLUENT_SETTING(TTxOnlineSettings, OnlineSettings); diff --git a/src/api/grpc/ydb_bsconfig_v1.proto b/src/api/grpc/ydb_bsconfig_v1.proto index 78ee9f0203..137f7a6d2e 100644 --- a/src/api/grpc/ydb_bsconfig_v1.proto +++ b/src/api/grpc/ydb_bsconfig_v1.proto @@ -10,10 +10,13 @@ import "src/api/protos/ydb_bsconfig.proto"; service BSConfigService { - // Initialize Blobstorage host configs and box + // Initialize Blobstorage/single config rpc ReplaceStorageConfig(BSConfig.ReplaceStorageConfigRequest) returns (BSConfig.ReplaceStorageConfigResponse); - // Fetch Blobstorage host configs and box + // Fetch Blobstorage/single config rpc FetchStorageConfig(BSConfig.FetchStorageConfigRequest) returns (BSConfig.FetchStorageConfigResponse); -} \ No newline at end of file + // Bootstrap automatically configured cluster + rpc BootstrapCluster(BSConfig.BootstrapClusterRequest) returns (BSConfig.BootstrapClusterResponse); + +} diff --git a/src/api/protos/draft/ydb_replication.proto b/src/api/protos/draft/ydb_replication.proto index cb2f80c449..36ff7119a1 100644 --- a/src/api/protos/draft/ydb_replication.proto +++ b/src/api/protos/draft/ydb_replication.proto @@ -45,6 +45,13 @@ message ConnectionParams { } } +message ConsistencyLevelRow { +} + +message ConsistencyLevelGlobal { + google.protobuf.Duration commit_interval = 1; +} + message DescribeReplicationResult { message Stats { optional google.protobuf.Duration lag = 1; @@ -72,7 +79,12 @@ message DescribeReplicationResult { // Description of scheme object. Ydb.Scheme.Entry self = 1; + ConnectionParams connection_params = 2; + oneof consistency_level { + ConsistencyLevelRow row_consistency = 7; + ConsistencyLevelGlobal global_consistency = 8; + } repeated Item items = 3; oneof state { RunningState running = 4; diff --git a/src/api/protos/ydb_bsconfig.proto b/src/api/protos/ydb_bsconfig.proto index 2819b9e300..a75d947502 100644 --- a/src/api/protos/ydb_bsconfig.proto +++ b/src/api/protos/ydb_bsconfig.proto @@ -35,4 +35,13 @@ message FetchStorageConfigResponse { message FetchStorageConfigResult { string yaml_config = 1; -} \ No newline at end of file +} + +message BootstrapClusterRequest { + Ydb.Operations.OperationParams operation_params = 1; + string self_assembly_uuid = 2; +} + +message BootstrapClusterResponse { + Ydb.Operations.Operation operation = 1; +} diff --git a/src/api/protos/ydb_import.proto b/src/api/protos/ydb_import.proto index e7294af87b..82b661962c 100644 --- a/src/api/protos/ydb_import.proto +++ b/src/api/protos/ydb_import.proto @@ -72,6 +72,9 @@ message ImportFromS3Settings { // Prevent importing of ACL and owner. If true, objects are created with empty ACL // and their owner will be the user who started the import. bool no_acl = 11; + + // Skip checksum validation during import + bool skip_checksum_validation = 12; } message ImportFromS3Result { diff --git a/src/api/protos/ydb_query.proto b/src/api/protos/ydb_query.proto index 10db905670..ab9f3cabb0 100644 --- a/src/api/protos/ydb_query.proto +++ b/src/api/protos/ydb_query.proto @@ -62,6 +62,8 @@ message StaleModeSettings { message SnapshotModeSettings { } +message SnapshotRWModeSettings { +} message TransactionSettings { oneof tx_mode { @@ -69,6 +71,7 @@ message TransactionSettings { OnlineModeSettings online_read_only = 2; StaleModeSettings stale_read_only = 3; SnapshotModeSettings snapshot_read_only = 4; + SnapshotRWModeSettings snapshot_read_write = 5; } } diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index 616963432c..415ba2b0f1 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -435,44 +435,11 @@ message ColumnMeta { message EvictionToExternalStorageSettings { // Path to external data source - string storage_name = 1; -} - -message TtlTier { - uint32 apply_after_seconds = 1; - - oneof action { - google.protobuf.Empty delete = 2; - EvictionToExternalStorageSettings evict_to_external_storage = 3; - } -} - -message DateTypeColumnModeSettingsV1 { - // The row will be assigned a tier at the moment of time, when the value - // stored in is less than or equal to the current time (in epoch - // time format), and has passed since that moment; - // i.e. the eviction threshold is the value of plus . - - // The column type must be a date type - string column_name = 1; -} - -message ValueSinceUnixEpochModeSettingsV1 { - // Same as DateTypeColumnModeSettings (above), but useful when type of the - // value stored in is not a date type. - - // The column type must be one of: - // - Uint32 - // - Uint64 - // - DyNumber - string column_name = 1; - - // Interpretation of the value stored in - ValueSinceUnixEpochModeSettings.Unit column_unit = 2; + string storage = 1; } message DateTypeColumnModeSettings { - // The row will be assigned a tier at the moment of time, when the value + // The row will be considered as expired or assigned a tier at the moment of time, when the value // stored in is less than or equal to the current time (in epoch // time format), and has passed since that moment; // i.e. the expiration threshold is the value of plus . @@ -509,12 +476,27 @@ message ValueSinceUnixEpochModeSettings { uint32 expire_after_seconds = 3; } +message TtlTier { + oneof expression { + DateTypeColumnModeSettings date_type_column = 1; + ValueSinceUnixEpochModeSettings value_since_unix_epoch = 2; + } + + oneof action { + google.protobuf.Empty delete = 3; + EvictionToExternalStorageSettings evict_to_external_storage = 4; + } +} + +message TieredModeSettings { + repeated TtlTier tiers = 1; +} + message TtlSettings { oneof mode { - DateTypeColumnModeSettings date_type_column = 1 [deprecated = true]; - ValueSinceUnixEpochModeSettings value_since_unix_epoch = 2 [deprecated = true]; - DateTypeColumnModeSettingsV1 date_type_column_v1 = 4; - ValueSinceUnixEpochModeSettingsV1 value_since_unix_epoch_v1 = 5; + DateTypeColumnModeSettings date_type_column = 1; + ValueSinceUnixEpochModeSettings value_since_unix_epoch = 2; + TieredModeSettings tiered_ttl = 4; } // There is no guarantee that expired row will be deleted immediately upon @@ -530,8 +512,6 @@ message TtlSettings { // How often to run BRO on the same partition. // BRO will not be started more often, but may be started less often. uint32 run_interval_seconds = 3; - - repeated TtlTier tiers = 6; } message StorageSettings { @@ -899,12 +879,16 @@ message StaleModeSettings { message SnapshotModeSettings { } +message SnapshotRWModeSettings { +} + message TransactionSettings { oneof tx_mode { SerializableModeSettings serializable_read_write = 1; OnlineModeSettings online_read_only = 2; StaleModeSettings stale_read_only = 3; SnapshotModeSettings snapshot_read_only = 4; + SnapshotRWModeSettings snapshot_read_write = 5; } } diff --git a/src/client/bsconfig/storage_config.cpp b/src/client/bsconfig/storage_config.cpp index 3f729eaa4d..8a6307f474 100644 --- a/src/client/bsconfig/storage_config.cpp +++ b/src/client/bsconfig/storage_config.cpp @@ -47,6 +47,13 @@ class TStorageConfigClient::TImpl : public TClientImplCommon(); + request.set_self_assembly_uuid(selfAssemblyUUID); + return RunSimple(std::move(request), + &Ydb::BSConfig::V1::BSConfigService::Stub::AsyncBootstrapCluster); + } }; TStorageConfigClient::TStorageConfigClient(const TDriver& driver, const TCommonClientSettings& settings) @@ -63,5 +70,9 @@ TAsyncFetchStorageConfigResult TStorageConfigClient::FetchStorageConfig(const TS return Impl_->FetchStorageConfig(settings); } +TAsyncStatus TStorageConfigClient::BootstrapCluster(const std::string& selfAssemblyUUID) { + return Impl_->BootstrapCluster(selfAssemblyUUID); +} + } diff --git a/src/client/draft/ydb_replication.cpp b/src/client/draft/ydb_replication.cpp index 66e5a00107..c6b2a61adf 100644 --- a/src/client/draft/ydb_replication.cpp +++ b/src/client/draft/ydb_replication.cpp @@ -68,6 +68,15 @@ static TDuration DurationToDuration(const google::protobuf::Duration& value) { return TDuration::MilliSeconds(google::protobuf::util::TimeUtil::DurationToMilliseconds(value)); } +TGlobalConsistency::TGlobalConsistency(const Ydb::Replication::ConsistencyLevelGlobal& proto) + : CommitInterval_(DurationToDuration(proto.commit_interval())) +{ +} + +const TDuration& TGlobalConsistency::GetCommitInterval() const { + return CommitInterval_; +} + TStats::TStats(const Ydb::Replication::DescribeReplicationResult_Stats& stats) : Lag_(stats.has_lag() ? std::make_optional(DurationToDuration(stats.lag())) : std::nullopt) , InitialScanProgress_(stats.has_initial_scan_progress() ? std::make_optional(stats.initial_scan_progress()) : std::nullopt) @@ -131,6 +140,15 @@ TReplicationDescription::TReplicationDescription(const Ydb::Replication::Describ }); } + switch (desc.consistency_level_case()) { + case Ydb::Replication::DescribeReplicationResult::kGlobalConsistency: + ConsistencyLevel_ = TGlobalConsistency(desc.global_consistency()); + break; + + default: + break; + } + switch (desc.state_case()) { case Ydb::Replication::DescribeReplicationResult::kRunning: State_ = TRunningState(desc.running().stats()); @@ -157,6 +175,15 @@ const std::vector TReplicationDescription::GetIt return Items_; } +TReplicationDescription::EConsistencyLevel TReplicationDescription::GetConsistencyLevel() const { + return static_cast(ConsistencyLevel_.index()); +} + +const TGlobalConsistency& TReplicationDescription::GetGlobalConsistency() const { + return std::get(ConsistencyLevel_); +} + + TReplicationDescription::EState TReplicationDescription::GetState() const { return static_cast(State_.index()); } diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index 44ee0cdcb5..b7f21e59b2 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -51,6 +51,9 @@ static void SetTxSettings(const TTxSettings& txSettings, Ydb::Query::Transaction case TTxSettings::TS_SNAPSHOT_RO: proto->mutable_snapshot_read_only(); break; + case TTxSettings::TS_SNAPSHOT_RW: + proto->mutable_snapshot_read_write(); + break; default: throw TContractViolation("Unexpected transaction mode."); } diff --git a/src/client/query/impl/exec_query.cpp b/src/client/query/impl/exec_query.cpp index a26daebabc..0e8899abe4 100644 --- a/src/client/query/impl/exec_query.cpp +++ b/src/client/query/impl/exec_query.cpp @@ -32,6 +32,9 @@ static void SetTxSettings(const TTxSettings& txSettings, Ydb::Query::Transaction case TTxSettings::TS_SNAPSHOT_RO: proto->mutable_snapshot_read_only(); break; + case TTxSettings::TS_SNAPSHOT_RW: + proto->mutable_snapshot_read_write(); + break; default: throw TContractViolation("Unexpected transaction mode."); } diff --git a/src/client/resources/CMakeLists.txt b/src/client/resources/CMakeLists.txt index a873199a2a..3074324f62 100644 --- a/src/client/resources/CMakeLists.txt +++ b/src/client/resources/CMakeLists.txt @@ -22,8 +22,8 @@ resources(client-resources.global ${YDB_SDK_SOURCE_DIR}/src/client/resources/ydb_sdk_version.txt ${YDB_SDK_SOURCE_DIR}/src/client/resources/ydb_root_ca.pem KEYS - ydb_sdk_version.txt - ydb_root_ca.pem + ydb_sdk_version_v3.txt + ydb_root_ca_v3.pem ) _ydb_sdk_make_client_component(Resources client-resources client-resources.global) diff --git a/src/client/resources/ydb_ca.cpp b/src/client/resources/ydb_ca.cpp index 6c33ac213f..96ff0618bb 100644 --- a/src/client/resources/ydb_ca.cpp +++ b/src/client/resources/ydb_ca.cpp @@ -5,7 +5,7 @@ namespace NYdb::inline V3 { std::string GetRootCertificate() { - return NResource::Find("ydb_root_ca.pem"); + return NResource::Find("ydb_root_ca_v3.pem"); } } // namespace NYdb \ No newline at end of file diff --git a/src/client/resources/ydb_resources.cpp b/src/client/resources/ydb_resources.cpp index daf614fb33..22a7bdd39a 100644 --- a/src/client/resources/ydb_resources.cpp +++ b/src/client/resources/ydb_resources.cpp @@ -29,7 +29,7 @@ const char* YDB_CLIENT_CAPABILITY_SESSION_BALANCER = "session-balancer"; std::string GetSdkSemver() { - return NResource::Find("ydb_sdk_version.txt"); + return NResource::Find("ydb_sdk_version_v3.txt"); } } // namespace NYdb diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index 335c46d555..25fdbcf54e 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -1134,6 +1134,9 @@ void TTableClient::TImpl::SetTxSettings(const TTxSettings& txSettings, Ydb::Tabl case TTxSettings::TS_SNAPSHOT_RO: proto->mutable_snapshot_read_only(); break; + case TTxSettings::TS_SNAPSHOT_RW: + proto->mutable_snapshot_read_write(); + break; default: throw TContractViolation("Unexpected transaction mode."); } diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index 8b2256200a..b02471c9c4 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -2940,59 +2940,73 @@ bool operator!=(const TChangefeedDescription& lhs, const TChangefeedDescription& //////////////////////////////////////////////////////////////////////////////// -TTtlTierSettings::TTtlTierSettings(TDuration applyAfter, const TAction& action) - : ApplyAfter_(applyAfter) +TTtlTierSettings::TTtlTierSettings(const TExpression& expression, const TAction& action) + : Expression_(expression) , Action_(action) { } -TTtlTierSettings::TTtlTierSettings(const Ydb::Table::TtlTier& tier) - : ApplyAfter_(TDuration::Seconds(tier.apply_after_seconds())) -{ +std::optional TTtlTierSettings::FromProto(const Ydb::Table::TtlTier& tier) { + std::optional expression; + switch (tier.expression_case()) { + case Ydb::Table::TtlTier::kDateTypeColumn: + expression = TDateTypeColumnModeSettings( + tier.date_type_column().column_name(), TDuration::Seconds(tier.date_type_column().expire_after_seconds())); + break; + case Ydb::Table::TtlTier::kValueSinceUnixEpoch: + expression = TValueSinceUnixEpochModeSettings(tier.value_since_unix_epoch().column_name(), + TProtoAccessor::FromProto(tier.value_since_unix_epoch().column_unit()), + TDuration::Seconds(tier.value_since_unix_epoch().expire_after_seconds())); + break; + case Ydb::Table::TtlTier::EXPRESSION_NOT_SET: + return std::nullopt; + } + + TAction action; + switch (tier.action_case()) { - case Ydb::Table::TtlTier::kDelete: - Action_ = TTtlDeleteAction(); - break; - case Ydb::Table::TtlTier::kEvictToExternalStorage: - Action_ = TTtlEvictToExternalStorageAction(tier.evict_to_external_storage().storage_name()); - break; - case Ydb::Table::TtlTier::ACTION_NOT_SET: - break; + case Ydb::Table::TtlTier::kDelete: + action = TTtlDeleteAction(); + break; + case Ydb::Table::TtlTier::kEvictToExternalStorage: + action = TTtlEvictToExternalStorageAction(tier.evict_to_external_storage().storage()); + break; + case Ydb::Table::TtlTier::ACTION_NOT_SET: + return std::nullopt; } + + return TTtlTierSettings(std::move(*expression), std::move(action)); } void TTtlTierSettings::SerializeTo(Ydb::Table::TtlTier& proto) const { - proto.set_apply_after_seconds(ApplyAfter_.Seconds()); + std::visit(TOverloaded{ + [&proto](const TDateTypeColumnModeSettings& expr) { expr.SerializeTo(*proto.mutable_date_type_column()); }, + [&proto](const TValueSinceUnixEpochModeSettings& expr) { expr.SerializeTo(*proto.mutable_value_since_unix_epoch()); }, + }, + Expression_); std::visit(TOverloaded{ [&proto](const TTtlDeleteAction&) { proto.mutable_delete_(); }, - [&proto](const TTtlEvictToExternalStorageAction& action) { - proto.mutable_evict_to_external_storage()->set_storage_name(action.StorageName); - }, - [](const std::monostate) {}, + [&proto](const TTtlEvictToExternalStorageAction& action) { action.SerializeTo(*proto.mutable_evict_to_external_storage()); }, }, Action_); } -TDuration TTtlTierSettings::GetApplyAfter() const { - return ApplyAfter_; +const TTtlTierSettings::TExpression& TTtlTierSettings::GetExpression() const { + return Expression_; } const TTtlTierSettings::TAction& TTtlTierSettings::GetAction() const { return Action_; } -TDateTypeColumnModeSettings::TDateTypeColumnModeSettings(const std::string& columnName, const TDuration& deprecatedExpireAfter) +TDateTypeColumnModeSettings::TDateTypeColumnModeSettings(const std::string& columnName, const TDuration& applyAfter) : ColumnName_(columnName) - , DeprecatedExpireAfter_(deprecatedExpireAfter) + , ApplyAfter_(applyAfter) {} void TDateTypeColumnModeSettings::SerializeTo(Ydb::Table::DateTypeColumnModeSettings& proto) const { proto.set_column_name(TStringType{ColumnName_}); - proto.set_expire_after_seconds(DeprecatedExpireAfter_.Seconds()); -} - -void TDateTypeColumnModeSettings::SerializeTo(Ydb::Table::DateTypeColumnModeSettingsV1& proto) const { - proto.set_column_name(ColumnName_); + proto.set_expire_after_seconds(ApplyAfter_.Seconds()); } const std::string& TDateTypeColumnModeSettings::GetColumnName() const { @@ -3000,24 +3014,19 @@ const std::string& TDateTypeColumnModeSettings::GetColumnName() const { } const TDuration& TDateTypeColumnModeSettings::GetExpireAfter() const { - return DeprecatedExpireAfter_; + return ApplyAfter_; } -TValueSinceUnixEpochModeSettings::TValueSinceUnixEpochModeSettings(const std::string& columnName, EUnit columnUnit, const TDuration& deprecatedExpireAfter) +TValueSinceUnixEpochModeSettings::TValueSinceUnixEpochModeSettings(const std::string& columnName, EUnit columnUnit, const TDuration& applyAfter) : ColumnName_(columnName) , ColumnUnit_(columnUnit) - , DeprecatedExpireAfter_(deprecatedExpireAfter) + , ApplyAfter_(applyAfter) {} void TValueSinceUnixEpochModeSettings::SerializeTo(Ydb::Table::ValueSinceUnixEpochModeSettings& proto) const { proto.set_column_name(TStringType{ColumnName_}); proto.set_column_unit(TProtoAccessor::GetProto(ColumnUnit_)); - proto.set_expire_after_seconds(DeprecatedExpireAfter_.Seconds()); -} - -void TValueSinceUnixEpochModeSettings::SerializeTo(Ydb::Table::ValueSinceUnixEpochModeSettingsV1& proto) const { - proto.set_column_name(ColumnName_); - proto.set_column_unit(TProtoAccessor::GetProto(ColumnUnit_)); + proto.set_expire_after_seconds(ApplyAfter_.Seconds()); } const std::string& TValueSinceUnixEpochModeSettings::GetColumnName() const { @@ -3029,7 +3038,7 @@ TValueSinceUnixEpochModeSettings::EUnit TValueSinceUnixEpochModeSettings::GetCol } const TDuration& TValueSinceUnixEpochModeSettings::GetExpireAfter() const { - return DeprecatedExpireAfter_; + return ApplyAfter_; } void TValueSinceUnixEpochModeSettings::Out(IOutputStream& out, EUnit unit) { @@ -3072,13 +3081,24 @@ TValueSinceUnixEpochModeSettings::EUnit TValueSinceUnixEpochModeSettings::UnitFr return EUnit::Unknown; } -TTtlSettings::TTtlSettings(const std::string& columnName, const std::vector& tiers) - : Mode_(TDateTypeColumnModeSettings(columnName, GetExpireAfterFrom(tiers).value_or(TDuration::Max()))) - , Tiers_(tiers) +TTtlEvictToExternalStorageAction::TTtlEvictToExternalStorageAction(const std::string& storageName) + : Storage_(storageName) +{} + +void TTtlEvictToExternalStorageAction::SerializeTo(Ydb::Table::EvictionToExternalStorageSettings& proto) const { + proto.set_storage(Storage_); +} + +std::string TTtlEvictToExternalStorageAction::GetStorage() const { + return Storage_; +} + +TTtlSettings::TTtlSettings(const std::vector& tiers) + : Tiers_(tiers) {} TTtlSettings::TTtlSettings(const std::string& columnName, const TDuration& expireAfter) - : TTtlSettings(columnName, {TTtlTierSettings(expireAfter, TTtlDeleteAction())}) + : TTtlSettings({TTtlTierSettings(TDateTypeColumnModeSettings(columnName, expireAfter), TTtlDeleteAction())}) {} TTtlSettings::TTtlSettings(const Ydb::Table::DateTypeColumnModeSettings& mode, ui32 runIntervalSeconds) @@ -3088,16 +3108,11 @@ TTtlSettings::TTtlSettings(const Ydb::Table::DateTypeColumnModeSettings& mode, u } const TDateTypeColumnModeSettings& TTtlSettings::GetDateTypeColumn() const { - return std::get(Mode_); + return std::get(Tiers_.front().GetExpression()); } -TTtlSettings::TTtlSettings(const std::string& columnName, EUnit columnUnit, const std::vector& tiers) - : Mode_(TValueSinceUnixEpochModeSettings(columnName, columnUnit, GetExpireAfterFrom(tiers).value_or(TDuration::Max()))) - , Tiers_(tiers) -{} - TTtlSettings::TTtlSettings(const std::string& columnName, EUnit columnUnit, const TDuration& expireAfter) - : TTtlSettings(columnName, columnUnit, {TTtlTierSettings(expireAfter, TTtlDeleteAction())}) + : TTtlSettings({TTtlTierSettings(TValueSinceUnixEpochModeSettings(columnName, columnUnit, expireAfter), TTtlDeleteAction())}) {} TTtlSettings::TTtlSettings(const Ydb::Table::ValueSinceUnixEpochModeSettings& mode, ui32 runIntervalSeconds) @@ -3107,28 +3122,28 @@ TTtlSettings::TTtlSettings(const Ydb::Table::ValueSinceUnixEpochModeSettings& mo } const TValueSinceUnixEpochModeSettings& TTtlSettings::GetValueSinceUnixEpoch() const { - return std::get(Mode_); + return std::get(Tiers_.front().GetExpression()); } std::optional TTtlSettings::FromProto(const Ydb::Table::TtlSettings& proto) { - std::vector tiers; - for (const auto& tier : proto.tiers()) { - tiers.emplace_back(tier); - } - TDuration legacyExpireAfter = GetExpireAfterFrom(tiers).value_or(TDuration::Max()); - switch(proto.mode_case()) { case Ydb::Table::TtlSettings::kDateTypeColumn: return TTtlSettings(proto.date_type_column(), proto.run_interval_seconds()); case Ydb::Table::TtlSettings::kValueSinceUnixEpoch: return TTtlSettings(proto.value_since_unix_epoch(), proto.run_interval_seconds()); - case Ydb::Table::TtlSettings::kDateTypeColumnV1: - return TTtlSettings( - TDateTypeColumnModeSettings(proto.date_type_column_v1().column_name(), legacyExpireAfter), tiers, proto.run_interval_seconds()); - case Ydb::Table::TtlSettings::kValueSinceUnixEpochV1: - return TTtlSettings(TValueSinceUnixEpochModeSettings(proto.value_since_unix_epoch_v1().column_name(), - TProtoAccessor::FromProto(proto.value_since_unix_epoch_v1().column_unit()), legacyExpireAfter), - tiers, proto.run_interval_seconds()); + case Ydb::Table::TtlSettings::kTieredTtl: { + std::vector tiers; + for (const auto& tier : proto.tiered_ttl().tiers()) { + if (auto deserialized = TTtlTierSettings::FromProto(tier)) { + tiers.emplace_back(std::move(*deserialized)); + } else { + return std::nullopt; + } + } + auto settings = TTtlSettings(std::move(tiers)); + settings.SetRunInterval(TDuration::Seconds(proto.run_interval_seconds())); + return settings; + } case Ydb::Table::TtlSettings::MODE_NOT_SET: return std::nullopt; } @@ -3137,26 +3152,14 @@ std::optional TTtlSettings::FromProto(const Ydb::Table::TtlSetting void TTtlSettings::SerializeTo(Ydb::Table::TtlSettings& proto) const { if (Tiers_.size() == 1 && std::holds_alternative(Tiers_.back().GetAction())) { // serialize DELETE-only TTL to legacy format for backwards-compatibility - switch (GetMode()) { - case EMode::DateTypeColumn: - GetDateTypeColumn().SerializeTo(*proto.mutable_date_type_column()); - break; - case EMode::ValueSinceUnixEpoch: - GetValueSinceUnixEpoch().SerializeTo(*proto.mutable_value_since_unix_epoch()); - break; - } + std::visit(TOverloaded{ + [&proto](const TDateTypeColumnModeSettings& expr) { expr.SerializeTo(*proto.mutable_date_type_column()); }, + [&proto](const TValueSinceUnixEpochModeSettings& expr) { expr.SerializeTo(*proto.mutable_value_since_unix_epoch()); }, + }, + Tiers_.front().GetExpression()); } else { - switch (GetMode()) { - case EMode::DateTypeColumn: - GetDateTypeColumn().SerializeTo(*proto.mutable_date_type_column_v1()); - break; - case EMode::ValueSinceUnixEpoch: - GetValueSinceUnixEpoch().SerializeTo(*proto.mutable_value_since_unix_epoch_v1()); - break; - } - for (const auto& tier : Tiers_) { - tier.SerializeTo(*proto.add_tiers()); + tier.SerializeTo(*proto.mutable_tiered_ttl()->add_tiers()); } } @@ -3166,7 +3169,7 @@ void TTtlSettings::SerializeTo(Ydb::Table::TtlSettings& proto) const { } TTtlSettings::EMode TTtlSettings::GetMode() const { - return static_cast(Mode_.index()); + return static_cast(Tiers_.front().GetExpression().index()); } TTtlSettings& TTtlSettings::SetRunInterval(const TDuration& value) { @@ -3182,25 +3185,6 @@ const std::vector& TTtlSettings::GetTiers() const { return Tiers_; } -std::optional TTtlSettings::GetExpireAfter() const { - return GetExpireAfterFrom(Tiers_); -} - -std::optional TTtlSettings::GetExpireAfterFrom(const std::vector& tiers) { - for (const auto& tier : tiers) { - if (std::holds_alternative(tier.GetAction())) { - return tier.GetApplyAfter(); - } - } - return std::nullopt; -} - -TTtlSettings::TTtlSettings(TMode mode, const std::vector& tiers, uint32_t runIntervalSeconds) - : Mode_(std::move(mode)) - , Tiers_(tiers) - , RunInterval_(TDuration::Seconds(runIntervalSeconds)) -{ } - TAlterTtlSettings::EAction TAlterTtlSettings::GetAction() const { return static_cast(Action_.index()); } diff --git a/src/client/topic/ut/describe_topic_ut.cpp b/src/client/topic/ut/describe_topic_ut.cpp index 39ae21ecbe..5b995d3763 100644 --- a/src/client/topic/ut/describe_topic_ut.cpp +++ b/src/client/topic/ut/describe_topic_ut.cpp @@ -366,6 +366,8 @@ namespace NYdb::NTopic::NTests { Cerr << std::format("=== existingTopic={} allowUpdateRow={} allowDescribeSchema={} authToken={}\n", existingTopic, allowUpdateRow, allowDescribeSchema, std::string(authToken)); + setup.GetServer().AnnoyingClient->GrantConnect(authToken); + auto driverConfig = setup.MakeDriverConfig().SetAuthToken(authToken); auto client = TTopicClient(TDriver(driverConfig)); auto settings = TDescribePartitionSettings().IncludeLocation(true); @@ -380,7 +382,14 @@ namespace NYdb::NTopic::NTests { } setup.GetServer().AnnoyingClient->ModifyACL("/Root", TString{TEST_TOPIC}, acl.SerializeAsString()); - return client.DescribePartition(existingTopic ? TEST_TOPIC : "bad-topic", testPartitionId, settings).GetValueSync(); + while (true) { + TDescribePartitionResult result = client.DescribePartition(existingTopic ? TEST_TOPIC : "bad-topic", testPartitionId, settings).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS || result.GetStatus() == EStatus::SCHEME_ERROR || result.GetStatus() == EStatus::UNAUTHORIZED, result.GetIssues()); + // Connect access may appear later + if (result.GetStatus() != EStatus::UNAUTHORIZED) + return result; + Sleep(TDuration::Seconds(1)); + } } Y_UNIT_TEST(DescribePartitionPermissions) { diff --git a/src/client/topic/ut/local_partition_ut.cpp b/src/client/topic/ut/local_partition_ut.cpp index 4ff64776ba..6c61ef2eff 100644 --- a/src/client/topic/ut/local_partition_ut.cpp +++ b/src/client/topic/ut/local_partition_ut.cpp @@ -621,6 +621,8 @@ namespace NYdb::NTopic::NTests { auto setup = CreateSetup(TEST_CASE_NAME); auto authToken = "x-user-x@builtin"; + setup->GetServer().AnnoyingClient->GrantConnect(authToken); + { // Allow UpdateRow only, no DescribeSchema permission. NACLib::TDiffACL acl; diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 6b65097513..a6e1ba7168 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -196,6 +196,24 @@ class TFixture : public NUnitTest::TBaseFixture { const TString& consumerName, size_t count); + void TestWriteToTopic1(); + + void TestWriteToTopic4(); + + void TestWriteToTopic7(); + + void TestWriteToTopic9(); + + void TestWriteToTopic10(); + + void TestWriteToTopic11(); + + void TestWriteToTopic24(); + + void TestWriteToTopic26(); + + void TestWriteToTopic27(); + struct TAvgWriteBytes { ui64 PerSec = 0; ui64 PerMin = 0; @@ -213,6 +231,11 @@ class TFixture : public NUnitTest::TBaseFixture { void SplitPartition(const TString& topicPath, ui32 partitionId, const TString& boundary); + + virtual bool GetEnableOltpSink() const; + virtual bool GetEnableOlapSink() const; + virtual bool GetEnableHtapTx() const; + virtual bool GetAllowOlapDataQuery() const; private: template @@ -258,6 +281,10 @@ void TFixture::SetUp(NUnitTest::TTestContext&) settings.SetEnableTopicServiceTx(true); settings.SetEnableTopicSplitMerge(true); settings.SetEnablePQConfigTransactionsAtSchemeShard(true); + settings.SetEnableOltpSink(GetEnableOltpSink()); + settings.SetEnableOlapSink(GetEnableOlapSink()); + settings.SetEnableHtapTx(GetEnableHtapTx()); + settings.SetAllowOlapDataQuery(GetAllowOlapDataQuery()); Setup = std::make_unique(TEST_CASE_NAME, settings); @@ -1087,35 +1114,7 @@ TVector TFixture::Read_Exactly_N_Messages_From_Topic(const TString& top return result; } -auto TFixture::GetAvgWriteBytes(const TString& topicName, - ui32 partitionId) -> TAvgWriteBytes -{ - auto& runtime = Setup->GetRuntime(); - TActorId edge = runtime.AllocateEdgeActor(); - ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partitionId); - - runtime.SendToPipe(tabletId, edge, new NKikimr::TEvPersQueue::TEvStatus()); - auto response = runtime.GrabEdgeEvent(); - - UNIT_ASSERT_VALUES_EQUAL(tabletId, response->Record.GetTabletId()); - - TAvgWriteBytes result; - - for (size_t i = 0; i < response->Record.PartResultSize(); ++i) { - const auto& partition = response->Record.GetPartResult(i); - if (partition.GetPartition() == static_cast(partitionId)) { - result.PerSec = partition.GetAvgWriteSpeedPerSec(); - result.PerMin = partition.GetAvgWriteSpeedPerMin(); - result.PerHour = partition.GetAvgWriteSpeedPerHour(); - result.PerDay = partition.GetAvgWriteSpeedPerDay(); - break; - } - } - - return result; -} - -Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture) +void TFixture::TestWriteToTopic1() { CreateTopic("topic_A"); CreateTopic("topic_B"); @@ -1159,89 +1158,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture) } } -Y_UNIT_TEST_F(WriteToTopic_Demo_2, TFixture) -{ - CreateTopic("topic_A"); - CreateTopic("topic_B"); - - NTable::TSession tableSession = CreateTableSession(); - NTable::TTransaction tx = BeginTx(tableSession); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #1", &tx); - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #2", &tx); - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #3", &tx); - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #4", &tx); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, "message #5"); - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_2, "message #6"); - - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #7", &tx); - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #8", &tx); - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #9", &tx); - - { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #5"); - } - - { - auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #6"); - } - - CommitTx(tx, EStatus::SUCCESS); - - { - auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 4); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); - UNIT_ASSERT_VALUES_EQUAL(messages[3], "message #4"); - } - - { - auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, 3); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #7"); - UNIT_ASSERT_VALUES_EQUAL(messages[2], "message #9"); - } -} - -Y_UNIT_TEST_F(WriteToTopic_Demo_3, TFixture) -{ - CreateTopic("topic_A"); - CreateTopic("topic_B"); - - NTable::TSession tableSession = CreateTableSession(); - NTable::TTransaction tx = BeginTx(tableSession); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3"); - - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3"); - - CommitTx(tx, EStatus::ABORTED); - - tx = BeginTx(tableSession); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - - CommitTx(tx, EStatus::SUCCESS); - - messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); - - messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #2"); -} - -Y_UNIT_TEST_F(WriteToTopic_Demo_4, TFixture) +void TFixture::TestWriteToTopic4() { CreateTopic("topic_A"); CreateTopic("topic_B"); @@ -1275,71 +1192,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_4, TFixture) UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #4"); } -Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture) -{ - CreateTopic("topic_A"); - CreateTopic("topic_B"); - - NTable::TSession tableSession = CreateTableSession(); - - { - NTable::TTransaction tx_1 = BeginTx(tableSession); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1); - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx_1); - - CommitTx(tx_1, EStatus::SUCCESS); - } - - { - NTable::TTransaction tx_2 = BeginTx(tableSession); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3", &tx_2); - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #4", &tx_2); - - CommitTx(tx_2, EStatus::SUCCESS); - } - - { - auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 2); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); - UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #3"); - } - - { - auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, 2); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #2"); - UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #4"); - } -} - -Y_UNIT_TEST_F(WriteToTopic_Demo_6, TFixture) -{ - CreateTopic("topic_A"); - - NTable::TSession tableSession = CreateTableSession(); - NTable::TTransaction tx = BeginTx(tableSession); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); - - { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); - } - - CommitTx(tx, EStatus::SUCCESS); - - { - auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 2); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); - UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2"); - } - - DescribeTopic("topic_A"); -} - -Y_UNIT_TEST_F(WriteToTopic_Demo_7, TFixture) +void TFixture::TestWriteToTopic7() { CreateTopic("topic_A"); @@ -1370,39 +1223,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_7, TFixture) } } -Y_UNIT_TEST_F(WriteToTopic_Demo_8, TFixture) -{ - CreateTopic("topic_A"); - - NTable::TSession tableSession = CreateTableSession(); - NTable::TTransaction tx = BeginTx(tableSession); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2"); - - { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #2"); - } - - CommitTx(tx, EStatus::ABORTED); - - tx = BeginTx(tableSession); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); - - CommitTx(tx, EStatus::SUCCESS); - - { - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); - } -} - -Y_UNIT_TEST_F(WriteToTopic_Demo_9, TFixture) +void TFixture::TestWriteToTopic9() { CreateTopic("topic_A"); @@ -1430,7 +1251,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_9, TFixture) } } -Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture) +void TFixture::TestWriteToTopic10() { CreateTopic("topic_A"); @@ -1459,28 +1280,378 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture) } } -NPQ::TWriteId TFixture::GetTransactionWriteId(const TActorId& actorId, - ui64 tabletId) +void TFixture::TestWriteToTopic11() { - using TEvKeyValue = NKikimr::TEvKeyValue; + for (auto endOfTransaction : {Commit, Rollback, CloseTableSession}) { + TestTheCompletionOfATransaction({.Topics={"topic_A"}, .EndOfTransaction = endOfTransaction}); + TestTheCompletionOfATransaction({.Topics={"topic_A", "topic_B"}, .EndOfTransaction = endOfTransaction}); + } +} - auto request = std::make_unique(); - request->Record.SetCookie(12345); - request->Record.AddCmdRead()->SetKey("_txinfo"); +void TFixture::TestWriteToTopic24() +{ + // + // the test verifies a transaction in which data is written to a topic and to a table + // + CreateTopic("topic_A"); + CreateTable("/Root/table_A"); - auto& runtime = Setup->GetRuntime(); + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); - runtime.SendToPipe(tabletId, actorId, request.release()); - auto response = runtime.GrabEdgeEvent(); + auto records = MakeTableRecords(); + WriteToTable("table_A", records, &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); - UNIT_ASSERT(response->Record.HasCookie()); - UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345); - UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadResultSize(), 1); + CommitTx(tx, EStatus::SUCCESS); - auto& read = response->Record.GetReadResult(0); + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], MakeJsonDoc(records)); - NKikimrPQ::TTabletTxInfo info; - UNIT_ASSERT(info.ParseFromString(read.GetValue())); + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); + + CheckTabletKeys("topic_A"); +} + +void TFixture::TestWriteToTopic26() +{ + // + // the test verifies a transaction in which data is read from a partition of one topic and written to + // another partition of this topic + // + const ui32 PARTITION_0 = 0; + const ui32 PARTITION_1 = 1; + + CreateTopic("topic_A", TEST_CONSUMER, 2); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, PARTITION_0); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, PARTITION_0); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3", nullptr, PARTITION_0); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, PARTITION_0); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); + + for (const auto& m : messages) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, m, &tx, PARTITION_1); + } + + CommitTx(tx, EStatus::SUCCESS); + + messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), nullptr, PARTITION_1); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); +} + +void TFixture::TestWriteToTopic27() +{ + CreateTopic("topic_A", TEST_CONSUMER); + CreateTopic("topic_B", TEST_CONSUMER); + CreateTopic("topic_C", TEST_CONSUMER); + + for (size_t i = 0; i < 2; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, 0); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, 0); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + + WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); + + messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + + WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); + + CommitTx(tx, EStatus::SUCCESS); + + messages = ReadFromTopic("topic_C", TEST_CONSUMER, TDuration::Seconds(2), nullptr, 0); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); + + DumpPQTabletKeys("topic_A"); + DumpPQTabletKeys("topic_B"); + DumpPQTabletKeys("topic_C"); + } +} + +auto TFixture::GetAvgWriteBytes(const TString& topicName, + ui32 partitionId) -> TAvgWriteBytes +{ + auto& runtime = Setup->GetRuntime(); + TActorId edge = runtime.AllocateEdgeActor(); + ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partitionId); + + runtime.SendToPipe(tabletId, edge, new NKikimr::TEvPersQueue::TEvStatus()); + auto response = runtime.GrabEdgeEvent(); + + UNIT_ASSERT_VALUES_EQUAL(tabletId, response->Record.GetTabletId()); + + TAvgWriteBytes result; + + for (size_t i = 0; i < response->Record.PartResultSize(); ++i) { + const auto& partition = response->Record.GetPartResult(i); + if (partition.GetPartition() == static_cast(partitionId)) { + result.PerSec = partition.GetAvgWriteSpeedPerSec(); + result.PerMin = partition.GetAvgWriteSpeedPerMin(); + result.PerHour = partition.GetAvgWriteSpeedPerHour(); + result.PerDay = partition.GetAvgWriteSpeedPerDay(); + break; + } + } + + return result; +} + +bool TFixture::GetEnableOltpSink() const +{ + return false; +} + +bool TFixture::GetEnableOlapSink() const +{ + return false; +} + +bool TFixture::GetEnableHtapTx() const +{ + return false; +} + +bool TFixture::GetAllowOlapDataQuery() const +{ + return false; +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture) +{ + TestWriteToTopic1(); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_2, TFixture) +{ + CreateTopic("topic_A"); + CreateTopic("topic_B"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #1", &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #2", &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #3", &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, "message #4", &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, "message #5"); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_2, "message #6"); + + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #7", &tx); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #8", &tx); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_1, "message #9", &tx); + + { + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #5"); + } + + { + auto messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #6"); + } + + CommitTx(tx, EStatus::SUCCESS); + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 4); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages[3], "message #4"); + } + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, 3); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #7"); + UNIT_ASSERT_VALUES_EQUAL(messages[2], "message #9"); + } +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_3, TFixture) +{ + CreateTopic("topic_A"); + CreateTopic("topic_B"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3"); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #3"); + + CommitTx(tx, EStatus::ABORTED); + + tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + + CommitTx(tx, EStatus::SUCCESS); + + messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + + messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #2"); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_4, TFixture) +{ + TestWriteToTopic4(); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture) +{ + CreateTopic("topic_A"); + CreateTopic("topic_B"); + + NTable::TSession tableSession = CreateTableSession(); + + { + NTable::TTransaction tx_1 = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx_1); + + CommitTx(tx_1, EStatus::SUCCESS); + } + + { + NTable::TTransaction tx_2 = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3", &tx_2); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #4", &tx_2); + + CommitTx(tx_2, EStatus::SUCCESS); + } + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 2); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #3"); + } + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, 2); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #2"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #4"); + } +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_6, TFixture) +{ + CreateTopic("topic_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + + { + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); + } + + CommitTx(tx, EStatus::SUCCESS); + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 2); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2"); + } + + DescribeTopic("topic_A"); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_7, TFixture) +{ + TestWriteToTopic7(); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_8, TFixture) +{ + CreateTopic("topic_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2"); + + { + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #2"); + } + + CommitTx(tx, EStatus::ABORTED); + + tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + + CommitTx(tx, EStatus::SUCCESS); + + { + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + } +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_9, TFixture) +{ + TestWriteToTopic9(); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture) +{ + TestWriteToTopic10(); +} + +NPQ::TWriteId TFixture::GetTransactionWriteId(const TActorId& actorId, + ui64 tabletId) +{ + using TEvKeyValue = NKikimr::TEvKeyValue; + + auto request = std::make_unique(); + request->Record.SetCookie(12345); + request->Record.AddCmdRead()->SetKey("_txinfo"); + + auto& runtime = Setup->GetRuntime(); + + runtime.SendToPipe(tabletId, actorId, request.release()); + auto response = runtime.GrabEdgeEvent(); + + UNIT_ASSERT(response->Record.HasCookie()); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345); + UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadResultSize(), 1); + + auto& read = response->Record.GetReadResult(0); + + NKikimrPQ::TTabletTxInfo info; + UNIT_ASSERT(info.ParseFromString(read.GetValue())); UNIT_ASSERT_VALUES_EQUAL(info.TxWritesSize(), 1); @@ -1677,10 +1848,7 @@ NTable::TDataQueryResult TFixture::ExecuteDataQuery(NTable::TSession session, co Y_UNIT_TEST_F(WriteToTopic_Demo_11, TFixture) { - for (auto endOfTransaction : {Commit, Rollback, CloseTableSession}) { - TestTheCompletionOfATransaction({.Topics={"topic_A"}, .EndOfTransaction = endOfTransaction}); - TestTheCompletionOfATransaction({.Topics={"topic_A", "topic_B"}, .EndOfTransaction = endOfTransaction}); - } + TestWriteToTopic11(); } Y_UNIT_TEST_F(WriteToTopic_Demo_12, TFixture) @@ -1978,28 +2146,7 @@ size_t TFixture::GetTableRecordsCount(const TString& tablePath) Y_UNIT_TEST_F(WriteToTopic_Demo_24, TFixture) { - // - // the test verifies a transaction in which data is written to a topic and to a table - // - CreateTopic("topic_A"); - CreateTable("/Root/table_A"); - - NTable::TSession tableSession = CreateTableSession(); - NTable::TTransaction tx = BeginTx(tableSession); - - auto records = MakeTableRecords(); - WriteToTable("table_A", records, &tx); - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); - - CommitTx(tx, EStatus::SUCCESS); - - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(messages[0], MakeJsonDoc(records)); - - UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); - - CheckTabletKeys("topic_A"); + TestWriteToTopic24(); } Y_UNIT_TEST_F(WriteToTopic_Demo_25, TFixture) @@ -2017,81 +2164,26 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_25, TFixture) NTable::TSession tableSession = CreateTableSession(); NTable::TTransaction tx = BeginTx(tableSession); - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); - - for (const auto& m : messages) { - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, m, &tx); - } - - CommitTx(tx, EStatus::SUCCESS); - - Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, 3); -} - -Y_UNIT_TEST_F(WriteToTopic_Demo_26, TFixture) -{ - // - // the test verifies a transaction in which data is read from a partition of one topic and written to - // another partition of this topic - // - const ui32 PARTITION_0 = 0; - const ui32 PARTITION_1 = 1; - - CreateTopic("topic_A", TEST_CONSUMER, 2); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, PARTITION_0); - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, PARTITION_0); - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #3", nullptr, PARTITION_0); - - NTable::TSession tableSession = CreateTableSession(); - NTable::TTransaction tx = BeginTx(tableSession); - - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, PARTITION_0); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); - - for (const auto& m : messages) { - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, m, &tx, PARTITION_1); - } - - CommitTx(tx, EStatus::SUCCESS); - - messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), nullptr, PARTITION_1); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); -} - -Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture) -{ - CreateTopic("topic_A", TEST_CONSUMER); - CreateTopic("topic_B", TEST_CONSUMER); - CreateTopic("topic_C", TEST_CONSUMER); - - for (size_t i = 0; i < 2; ++i) { - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, 0); - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, 0); - - NTable::TSession tableSession = CreateTableSession(); - NTable::TTransaction tx = BeginTx(tableSession); - - auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); - - WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); - messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + for (const auto& m : messages) { + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, m, &tx); + } - WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); + CommitTx(tx, EStatus::SUCCESS); - CommitTx(tx, EStatus::SUCCESS); + Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, 3); +} - messages = ReadFromTopic("topic_C", TEST_CONSUMER, TDuration::Seconds(2), nullptr, 0); - UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); +Y_UNIT_TEST_F(WriteToTopic_Demo_26, TFixture) +{ + TestWriteToTopic26(); +} - DumpPQTabletKeys("topic_A"); - DumpPQTabletKeys("topic_B"); - DumpPQTabletKeys("topic_C"); - } +Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture) +{ + TestWriteToTopic27(); } Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture) @@ -2486,6 +2578,338 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture) UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2); } +class TFixtureSinks : public TFixture { +protected: + void CreateRowTable(const TString& path); + void CreateColumnTable(const TString& tablePath); + + bool GetEnableOltpSink() const override; + bool GetEnableOlapSink() const override; + bool GetEnableHtapTx() const override; + bool GetAllowOlapDataQuery() const override; +}; + +void TFixtureSinks::CreateRowTable(const TString& path) +{ + CreateTable(path); +} + +void TFixtureSinks::CreateColumnTable(const TString& tablePath) +{ + UNIT_ASSERT(!tablePath.empty()); + + TString path = (tablePath[0] != '/') ? ("/Root/" + tablePath) : tablePath; + + NTable::TSession session = CreateTableSession(); + auto desc = NTable::TTableBuilder() + .SetStoreType(NTable::EStoreType::Column) + .AddNonNullableColumn("key", EPrimitiveType::Utf8) + .AddNonNullableColumn("value", EPrimitiveType::Utf8) + .SetPrimaryKeyColumn("key") + .Build(); + auto result = session.CreateTable(path, std::move(desc)).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); +} + +bool TFixtureSinks::GetEnableOltpSink() const +{ + return true; +} + +bool TFixtureSinks::GetEnableOlapSink() const +{ + return true; +} + +bool TFixtureSinks::GetEnableHtapTx() const +{ + return true; +} + +bool TFixtureSinks::GetAllowOlapDataQuery() const +{ + return true; +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_1, TFixtureSinks) +{ + TestWriteToTopic7(); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_2, TFixtureSinks) +{ + TestWriteToTopic10(); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_3, TFixtureSinks) +{ + TestWriteToTopic26(); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_4, TFixtureSinks) +{ + TestWriteToTopic9(); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopic_5, TFixtureSinks) +{ + CreateTopic("topic_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); + + RollbackTx(tx, EStatus::SUCCESS); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_1, TFixtureSinks) +{ + TestWriteToTopic1(); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_2, TFixtureSinks) +{ + TestWriteToTopic27(); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_3, TFixtureSinks) +{ + TestWriteToTopic11(); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopics_4, TFixtureSinks) +{ + TestWriteToTopic4(); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_1, TFixtureSinks) +{ + TestWriteToTopic24(); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_2, TFixtureSinks) +{ + CreateTopic("topic_A"); + CreateTopic("topic_B"); + CreateRowTable("/Root/table_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto records = MakeTableRecords(); + WriteToTable("table_A", records, &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); + + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #3", &tx); + + CommitTx(tx, EStatus::SUCCESS); + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records)); + } + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, 3); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #3"); + } + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); + + CheckTabletKeys("topic_A"); + CheckTabletKeys("topic_B"); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_3, TFixtureSinks) +{ + CreateTopic("topic_A"); + CreateTopic("topic_B"); + + CreateRowTable("/Root/table_A"); + CreateRowTable("/Root/table_B"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto records = MakeTableRecords(); + WriteToTable("table_A", records, &tx); + WriteToTable("table_B", records, &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); + + const size_t topicMsgCnt = 10; + for (size_t i = 1; i <= topicMsgCnt; ++i) { + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #" + std::to_string(i), &tx); + } + + CommitTx(tx, EStatus::SUCCESS); + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records)); + } + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, topicMsgCnt); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #" + std::to_string(topicMsgCnt)); + } + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_B"), records.size()); + + CheckTabletKeys("topic_A"); + CheckTabletKeys("topic_B"); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_4, TFixtureSinks) +{ + CreateTopic("topic_A"); + CreateRowTable("/Root/table_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx1 = BeginTx(tableSession); + NTable::TTransaction tx2 = BeginTx(tableSession); + + ExecuteDataQuery(tableSession, R"(SELECT COUNT(*) FROM `table_A`)", NTable::TTxControl::Tx(tx1)); + + auto records = MakeTableRecords(); + WriteToTable("table_A", records, &tx2); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx1); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + CommitTx(tx2, EStatus::SUCCESS); + CommitTx(tx1, EStatus::ABORTED); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); + + CheckTabletKeys("topic_A"); +} + +Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5, TFixtureSinks) +{ + CreateTopic("topic_A"); + CreateRowTable("/Root/table_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto records = MakeTableRecords(); + WriteToTable("table_A", records, &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + RollbackTx(tx, EStatus::SUCCESS); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), 0); + + CheckTabletKeys("topic_A"); +} + +Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_1, TFixtureSinks) +{ + CreateTopic("topic_A"); + CreateColumnTable("/Root/table_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto records = MakeTableRecords(); + WriteToTable("table_A", records, &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); + + CommitTx(tx, EStatus::SUCCESS); + + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records)); + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); + + CheckTabletKeys("topic_A"); +} + +Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_2, TFixtureSinks) +{ + CreateTopic("topic_A"); + CreateTopic("topic_B"); + + CreateRowTable("/Root/table_A"); + CreateColumnTable("/Root/table_B"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto records = MakeTableRecords(); + + WriteToTable("table_A", records, &tx); + WriteToTable("table_B", records, &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); + + const size_t topicMsgCnt = 10; + for (size_t i = 1; i <= topicMsgCnt; ++i) { + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #" + std::to_string(i), &tx); + } + + CommitTx(tx, EStatus::SUCCESS); + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 1); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), MakeJsonDoc(records)); + } + + { + auto messages = Read_Exactly_N_Messages_From_Topic("topic_B", TEST_CONSUMER, topicMsgCnt); + UNIT_ASSERT_VALUES_EQUAL(messages.front(), "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages.back(), "message #" + std::to_string(topicMsgCnt)); + } + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), records.size()); + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_B"), records.size()); + + CheckTabletKeys("topic_A"); + CheckTabletKeys("topic_B"); +} + +Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks) +{ + CreateTopic("topic_A"); + CreateColumnTable("/Root/table_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + auto records = MakeTableRecords(); + WriteToTable("table_A", records, &tx); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, MakeJsonDoc(records), &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + RollbackTx(tx, EStatus::SUCCESS); + + Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, 0); + + UNIT_ASSERT_VALUES_EQUAL(GetTableRecordsCount("table_A"), 0); + + CheckTabletKeys("topic_A"); +} } }