diff --git a/examples/basic_example/basic_example.cpp b/examples/basic_example/basic_example.cpp index 552a3d2cba..a69c2e1031 100644 --- a/examples/basic_example/basic_example.cpp +++ b/examples/basic_example/basic_example.cpp @@ -508,4 +508,4 @@ bool Run(const TDriver& driver) { } return true; -} \ No newline at end of file +} diff --git a/include/ydb-cpp-sdk/client/draft/ydb_view.h b/include/ydb-cpp-sdk/client/draft/ydb_view.h new file mode 100644 index 0000000000..9151005058 --- /dev/null +++ b/include/ydb-cpp-sdk/client/draft/ydb_view.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include + +namespace Ydb::View { + class DescribeViewResult; +} + +namespace NYdb { + class TProtoAccessor; +} + +namespace NYql { + class TIssues; +} + +namespace NYdb::NView { + +class TDescribeViewResult; +using TAsyncDescribeViewResult = NThreading::TFuture; + +struct TDescribeViewSettings : public TOperationRequestSettings { + using TSelf = TDescribeViewSettings; +}; + +class TViewDescription { +public: + explicit TViewDescription(const Ydb::View::DescribeViewResult& desc); + + const std::string& GetQueryText() const; + +private: + std::string QueryText_; +}; + +class TDescribeViewResult : public NScheme::TDescribePathResult { + friend class NYdb::TProtoAccessor; + const Ydb::View::DescribeViewResult& GetProto() const; + +public: + TDescribeViewResult(TStatus&& status, Ydb::View::DescribeViewResult&& desc); + TViewDescription GetViewDescription() const; + +private: + std::unique_ptr Proto_; +}; + +class TViewClient { + class TImpl; + +public: + TViewClient(const TDriver& driver, const TCommonClientSettings& settings = TCommonClientSettings()); + + TAsyncDescribeViewResult DescribeView(const std::string& path, + const TDescribeViewSettings& settings = TDescribeViewSettings()); + +private: + std::shared_ptr Impl_; +}; + +} // namespace NYdb::NView diff --git a/include/ydb-cpp-sdk/client/proto/accessor.h b/include/ydb-cpp-sdk/client/proto/accessor.h index 4569764ad9..e2ff8c7589 100644 --- a/include/ydb-cpp-sdk/client/proto/accessor.h +++ b/include/ydb-cpp-sdk/client/proto/accessor.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -10,6 +11,7 @@ #include #include +#include #include #include #include @@ -46,6 +48,7 @@ class TProtoAccessor { static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult); static const Ydb::Coordination::DescribeNodeResult& GetProto(const NYdb::NCoordination::TNodeDescription &describeNodeResult); static const Ydb::Replication::DescribeReplicationResult& GetProto(const NYdb::NReplication::TDescribeReplicationResult& desc); + static const Ydb::View::DescribeViewResult& GetProto(const NYdb::NView::TDescribeViewResult& desc); static NTable::TQueryStats FromProto(const Ydb::TableStats::QueryStats& queryStats); static NTable::TTableDescription FromProto(const Ydb::Table::CreateTableRequest& request); diff --git a/include/ydb-cpp-sdk/client/query/client.h b/include/ydb-cpp-sdk/client/query/client.h index df44734b89..08eb754f08 100644 --- a/include/ydb-cpp-sdk/client/query/client.h +++ b/include/ydb-cpp-sdk/client/query/client.h @@ -130,6 +130,7 @@ class TTransaction; class TSession { friend class TQueryClient; friend class TTransaction; + friend class TExecuteQueryIterator; public: const std::string& GetId() const; diff --git a/include/ydb-cpp-sdk/client/query/query.h b/include/ydb-cpp-sdk/client/query/query.h index 7a07eb53fc..959d22e032 100644 --- a/include/ydb-cpp-sdk/client/query/query.h +++ b/include/ydb-cpp-sdk/client/query/query.h @@ -74,7 +74,7 @@ struct TExecuteQuerySettings : public TRequestSettings { FLUENT_SETTING_DEFAULT(EExecMode, ExecMode, EExecMode::Execute); FLUENT_SETTING_DEFAULT(EStatsMode, StatsMode, EStatsMode::None); FLUENT_SETTING_OPTIONAL(bool, ConcurrentResultSets); - FLUENT_SETTING(std::string, PoolId); + FLUENT_SETTING(std::string, ResourcePool); }; struct TBeginTxSettings : public TRequestSettings {}; @@ -98,7 +98,7 @@ struct TExecuteScriptSettings : public TOperationRequestSettings - static TExplicitPartitions FromProto(const TProto& proto); + static TExplicitPartitions FromProto(const Ydb::Table::ExplicitPartitions& proto); void SerializeTo(Ydb::Table::ExplicitPartitions& proto) const; }; @@ -199,47 +199,67 @@ struct TGlobalIndexSettings { TPartitioningSettings PartitioningSettings; TUniformOrExplicitPartitions Partitions; - template - static TGlobalIndexSettings FromProto(const TProto& proto); + static TGlobalIndexSettings FromProto(const Ydb::Table::GlobalIndexSettings& proto); void SerializeTo(Ydb::Table::GlobalIndexSettings& proto) const; }; struct TVectorIndexSettings { public: - enum class EDistance { - Cosine, + enum class EMetric { + Unspecified = 0, + InnerProduct, + CosineSimilarity, + CosineDistance, Manhattan, Euclidean, + }; - Unknown = std::numeric_limits::max() + enum class EVectorType { + Unspecified = 0, + Float, + Uint8, + Int8, + Bit, }; - enum class ESimilarity { - Cosine, - InnerProduct, + EMetric Metric = EMetric::Unspecified; + EVectorType VectorType = EVectorType::Unspecified; + uint32_t VectorDimension = 0; - Unknown = std::numeric_limits::max() + static TVectorIndexSettings FromProto(const Ydb::Table::VectorIndexSettings& proto); + + void SerializeTo(Ydb::Table::VectorIndexSettings& settings) const; + + void Out(IOutputStream &o) const; +}; + +struct TKMeansTreeSettings { +public: + enum class EMetric { + Unspecified = 0, + InnerProduct, + CosineSimilarity, + CosineDistance, + Manhattan, + Euclidean, }; enum class EVectorType { + Unspecified = 0, Float, Uint8, Int8, Bit, - - Unknown = std::numeric_limits::max() }; - using TMetric = std::variant; - TMetric Metric; - EVectorType VectorType; - uint32_t VectorDimension; + TVectorIndexSettings Settings; + uint32_t Clusters = 0; + uint32_t Levels = 0; - template - static TVectorIndexSettings FromProto(const TProto& proto); + static TKMeansTreeSettings FromProto(const Ydb::Table::KMeansTreeSettings& proto); - void SerializeTo(Ydb::Table::VectorIndexSettings& settings) const; + void SerializeTo(Ydb::Table::KMeansTreeSettings& settings) const; void Out(IOutputStream &o) const; }; @@ -255,7 +275,7 @@ class TIndexDescription { const std::vector& indexColumns, const std::vector& dataColumns = {}, const std::vector& globalIndexSettings = {}, - const std::optional& vectorIndexSettings = {} + const std::variant& specializedIndexSettings = {} ); TIndexDescription( @@ -269,7 +289,7 @@ class TIndexDescription { EIndexType GetIndexType() const; const std::vector& GetIndexColumns() const; const std::vector& GetDataColumns() const; - const std::optional& GetVectorIndexSettings() const; + const std::variant& GetVectorIndexSettings() const; uint64_t GetSizeBytes() const; void SerializeTo(Ydb::Table::TableIndex& proto) const; @@ -289,8 +309,8 @@ class TIndexDescription { std::vector IndexColumns_; std::vector DataColumns_; std::vector GlobalIndexSettings_; - std::optional VectorIndexSettings_; - uint64_t SizeBytes = 0; + std::variant SpecializedIndexSettings_; + uint64_t SizeBytes_ = 0; }; struct TRenameIndex { @@ -665,8 +685,8 @@ class TTableDescription { void AddUniqueSecondaryIndex(const std::string& indexName, const std::vector& indexColumns); void AddUniqueSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns); // vector KMeansTree - void AddVectorKMeansTreeSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const TVectorIndexSettings& vectorIndexSettings); - void AddVectorKMeansTreeSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns, const TVectorIndexSettings& vectorIndexSettings); + void AddVectorKMeansTreeIndex(const std::string& indexName, const std::vector& indexColumns, const TKMeansTreeSettings& indexSettings); + void AddVectorKMeansTreeIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns, const TKMeansTreeSettings& indexSettings); // default void AddSecondaryIndex(const std::string& indexName, const std::vector& indexColumns); @@ -889,8 +909,8 @@ class TTableBuilder { TTableBuilder& AddUniqueSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns); // vector KMeansTree - TTableBuilder& AddVectorKMeansTreeSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const TVectorIndexSettings& vectorIndexSettings); - TTableBuilder& AddVectorKMeansTreeSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns, const TVectorIndexSettings& vectorIndexSettings); + TTableBuilder& AddVectorKMeansTreeIndex(const std::string& indexName, const std::vector& indexColumns, const TKMeansTreeSettings& indexSettings); + TTableBuilder& AddVectorKMeansTreeIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns, const TKMeansTreeSettings& indexSettings); // default TTableBuilder& AddSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns); @@ -963,6 +983,9 @@ class TCopyItem { TCopyItem& SetOmitIndexes(); bool OmitIndexes() const; + + void Out(IOutputStream& out) const; + private: std::string Source_; std::string Destination_; diff --git a/include/ydb-cpp-sdk/client/topic/read_events.h b/include/ydb-cpp-sdk/client/topic/read_events.h index f656e5923d..23d44dda9b 100644 --- a/include/ydb-cpp-sdk/client/topic/read_events.h +++ b/include/ydb-cpp-sdk/client/topic/read_events.h @@ -200,6 +200,10 @@ struct TReadSessionEvent { return CompressedMessages; } + void SetReadInTransaction() { + ReadInTransaction = true; + } + //! Commits all messages in batch. void Commit(); @@ -218,6 +222,7 @@ struct TReadSessionEvent { std::vector Messages; std::vector CompressedMessages; std::vector> OffsetRanges; + bool ReadInTransaction = false; }; //! Acknowledgement for commit request. diff --git a/include/ydb-cpp-sdk/client/value/value.h b/include/ydb-cpp-sdk/client/value/value.h index 907f15faa4..c9f300adea 100644 --- a/include/ydb-cpp-sdk/client/value/value.h +++ b/include/ydb-cpp-sdk/client/value/value.h @@ -224,7 +224,7 @@ class TTypeBuilder : public TMoveOnly { struct TDecimalValue { std::string ToString() const; TDecimalValue(const Ydb::Value& decimalValueProto, const TDecimalType& decimalType); - TDecimalValue(const std::string& decimalString, uint8_t precision = 22, uint8_t scale = 9); + TDecimalValue(const std::string& decimalString, uint8_t precision, uint8_t scale); TDecimalType DecimalType_; uint64_t Low_; diff --git a/src/api/grpc/draft/ydb_view_v1.proto b/src/api/grpc/draft/ydb_view_v1.proto new file mode 100644 index 0000000000..20fc6fe212 --- /dev/null +++ b/src/api/grpc/draft/ydb_view_v1.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package Ydb.View.V1; +option java_package = "com.yandex.ydb.view.v1"; + +import "src/api/protos/draft/ydb_view.proto"; + +service ViewService { + rpc DescribeView(View.DescribeViewRequest) returns (View.DescribeViewResponse); +} diff --git a/src/api/protos/draft/ydb_view.proto b/src/api/protos/draft/ydb_view.proto new file mode 100644 index 0000000000..f063e7d72c --- /dev/null +++ b/src/api/protos/draft/ydb_view.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package Ydb.View; +option java_package = "com.yandex.ydb.view"; + +import "src/api/protos/annotations/validation.proto"; +import "src/api/protos/ydb_operation.proto"; +import "src/api/protos/ydb_scheme.proto"; + +message DescribeViewRequest { + Ydb.Operations.OperationParams operation_params = 1; + // The path to the view. + string path = 2 [(required) = true]; +} + +message DescribeViewResponse { + // The result of the request will be inside the operation proto. + Ydb.Operations.Operation operation = 1; +} + +message DescribeViewResult { + // Description of a generic scheme object. + Ydb.Scheme.Entry self = 1; + + // View-specific fields. + string query_text = 2; +} diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index 9f0da934cd..be2ab06292 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -61,19 +61,15 @@ message GlobalIndexSettings { } message VectorIndexSettings { - enum Distance { - DISTANCE_UNSPECIFIED = 0; - DISTANCE_COSINE = 1; - DISTANCE_MANHATTAN = 2; - DISTANCE_EUCLIDEAN = 3; + enum Metric { + METRIC_UNSPECIFIED = 0; + SIMILARITY_INNER_PRODUCT = 1; + SIMILARITY_COSINE = 2; + DISTANCE_COSINE = 3; + DISTANCE_MANHATTAN = 4; + DISTANCE_EUCLIDEAN = 5; } - enum Similarity { - SIMILARITY_UNSPECIFIED = 0; - SIMILARITY_COSINE = 1; - SIMILARITY_INNER_PRODUCT = 2; - } - enum VectorType { VECTOR_TYPE_UNSPECIFIED = 0; VECTOR_TYPE_FLOAT = 1; @@ -82,13 +78,19 @@ message VectorIndexSettings { VECTOR_TYPE_BIT = 4; } - oneof metric { - Distance distance = 1; - Similarity similarity = 2; - } - VectorType vector_type = 3; + Metric metric = 1; + + VectorType vector_type = 2; + + uint32 vector_dimension = 3; +} - uint32 vector_dimension = 4; +message KMeansTreeSettings { + VectorIndexSettings settings = 1; + // average count of clusters on each level of tree, 0 -- means auto + uint32 clusters = 2; + // average count of levels in the tree, 0 -- means auto + uint32 levels = 3; } message GlobalIndex { @@ -106,7 +108,7 @@ message GlobalUniqueIndex { message GlobalVectorKMeansTreeIndex { GlobalIndexSettings level_table_settings = 1; GlobalIndexSettings posting_table_settings = 2; - VectorIndexSettings vector_settings = 3; + KMeansTreeSettings vector_settings = 3; } // Represent secondary index diff --git a/src/client/draft/CMakeLists.txt b/src/client/draft/CMakeLists.txt index 5143ff8fe0..988b7706ec 100644 --- a/src/client/draft/CMakeLists.txt +++ b/src/client/draft/CMakeLists.txt @@ -13,6 +13,7 @@ target_sources(client-draft PRIVATE ydb_dynamic_config.cpp ydb_replication.cpp ydb_scripting.cpp + ydb_view.cpp ) generate_enum_serilization(client-draft diff --git a/src/client/draft/ydb_view.cpp b/src/client/draft/ydb_view.cpp new file mode 100644 index 0000000000..1afd5440bf --- /dev/null +++ b/src/client/draft/ydb_view.cpp @@ -0,0 +1,92 @@ +#include + +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + +#include +#include +#include +#include + +namespace NYdb { +namespace NView { + +TViewDescription::TViewDescription(const Ydb::View::DescribeViewResult& desc) + : QueryText_(desc.query_text()) +{ +} + +const std::string& TViewDescription::GetQueryText() const { + return QueryText_; +} + +TDescribeViewResult::TDescribeViewResult(TStatus&& status, Ydb::View::DescribeViewResult&& desc) + : NScheme::TDescribePathResult(std::move(status), desc.self()) + , Proto_(std::make_unique(std::move(desc))) +{ +} + +TViewDescription TDescribeViewResult::GetViewDescription() const { + return TViewDescription(*Proto_); +} + +const Ydb::View::DescribeViewResult& TDescribeViewResult::GetProto() const { + return *Proto_; +} + +class TViewClient::TImpl : public TClientImplCommon { +public: + TImpl(std::shared_ptr&& connections, const TCommonClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) + { + } + + TAsyncDescribeViewResult DescribeView(const std::string& path, const TDescribeViewSettings& settings) { + using namespace Ydb::View; + + auto request = MakeOperationRequest(settings); + request.set_path(TStringType{path}); + + auto promise = NThreading::NewPromise(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + DescribeViewResult result; + if (any) { + any->UnpackTo(&result); + } + + TDescribeViewResult val(TStatus(std::move(status)), std::move(result)); + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred( + std::move(request), + extractor, + &V1::ViewService::Stub::AsyncDescribeView, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); + } + +}; + +TViewClient::TViewClient(const TDriver& driver, const TCommonClientSettings& settings) + : Impl_(std::make_shared(CreateInternalInterface(driver), settings)) +{ +} + +TAsyncDescribeViewResult TViewClient::DescribeView(const std::string& path, const TDescribeViewSettings& settings) { + return Impl_->DescribeView(path, settings); +} + +} // NView + +const Ydb::View::DescribeViewResult& TProtoAccessor::GetProto(const NView::TDescribeViewResult& result) { + return result.GetProto(); +} + +} // NYdb diff --git a/src/client/federated_topic/ut/basic_usage_ut.cpp b/src/client/federated_topic/ut/basic_usage_ut.cpp index 909fd7bd9c..d7dd6e5d49 100644 --- a/src/client/federated_topic/ut/basic_usage_ut.cpp +++ b/src/client/federated_topic/ut/basic_usage_ut.cpp @@ -1,16 +1,16 @@ -#include -#include +#include +#include -#include +#include -#include +#include -#include -#include -#include +#include +#include +#include -#include -#include +#include +#include #include #include @@ -43,7 +43,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver); @@ -58,7 +58,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { Cerr << "Session was created" << Endl; ReadSession->WaitEvent().Wait(TDuration::Seconds(1)); - TMaybe event = ReadSession->GetEvent(false); + std::optional event = ReadSession->GetEvent(false); Y_ASSERT(!event); auto fdsRequest = fdsMock.GetNextPendingRequest(); @@ -99,7 +99,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { for (size_t i = 0; i < partitionsCount; ++i) { ReadSession->WaitEvent().Wait(); // Get event - TMaybe event = ReadSession->GetEvent(true/*block - will block if no event received yet*/); + std::optional event = ReadSession->GetEvent(true/*block - will block if no event received yet*/); Cerr << "Got new read session event: " << DebugString(*event) << Endl; auto* startPartitionSessionEvent = std::get_if(&*event); @@ -126,7 +126,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver); @@ -170,7 +170,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); auto clientSettings = TFederatedTopicClientSettings() .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy( @@ -215,7 +215,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); auto clientSettings = TFederatedTopicClientSettings() .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy( @@ -324,7 +324,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); auto clientSettings = TFederatedTopicClientSettings() .RetryPolicy(NTopic::IRetryPolicy::GetNoRetryPolicy()); @@ -343,14 +343,14 @@ Y_UNIT_TEST_SUITE(BasicUsage) { ReadSession->WaitEvent().Wait(TDuration::Seconds(1)); auto event = ReadSession->GetEvent(false); - UNIT_ASSERT(!event.Defined()); + UNIT_ASSERT(!event.has_value()); auto fdsRequest = fdsMock.WaitNextPendingRequest(); fdsRequest.Result.SetValue({{}, grpc::Status(grpc::StatusCode::UNAVAILABLE, "mock 'unavailable'")}); ReadSession->WaitEvent().Wait(); event = ReadSession->GetEvent(false); - UNIT_ASSERT(event.Defined()); + UNIT_ASSERT(event.has_value()); Cerr << ">>> Got event: " << DebugString(*event) << Endl; UNIT_ASSERT(std::holds_alternative(*event)); @@ -359,13 +359,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) { ReadSession2->WaitEvent().Wait(TDuration::Seconds(1)); event = ReadSession2->GetEvent(false); - UNIT_ASSERT(!event.Defined()); + UNIT_ASSERT(!event.has_value()); fdsRequest = fdsMock.WaitNextPendingRequest(); fdsRequest.Result.SetValue(fdsMock.ComposeOkResultAvailableDatabases()); event = ReadSession2->GetEvent(true); - UNIT_ASSERT(event.Defined()); + UNIT_ASSERT(event.has_value()); Cerr << ">>> Got event: " << DebugString(*event) << Endl; UNIT_ASSERT(std::holds_alternative(*event)); @@ -393,7 +393,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { Cerr << "Session was created" << Endl; ReadSession->WaitEvent().Wait(TDuration::Seconds(1)); - TMaybe event = ReadSession->GetEvent(false); + std::optional event = ReadSession->GetEvent(false); Y_ASSERT(event); Cerr << "Got new read session event: " << DebugString(*event) << Endl; @@ -418,7 +418,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); auto clientSettings = TFederatedTopicClientSettings(); NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings); @@ -434,7 +434,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { Cerr << "Session was created" << Endl; ReadSession->WaitEvent().Wait(TDuration::Seconds(1)); - TMaybe event = ReadSession->GetEvent(false); + std::optional event = ReadSession->GetEvent(false); Y_ASSERT(!event); { @@ -449,7 +449,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { } ReadSession->WaitEvent().Wait(); - TMaybe event2 = ReadSession->GetEvent(true); + std::optional event2 = ReadSession->GetEvent(true); Cerr << "Got new read session event: " << DebugString(*event2) << Endl; auto* sessionEvent = std::get_if(&*event2); @@ -475,7 +475,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); auto clientSettings = TFederatedTopicClientSettings() .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy( @@ -568,7 +568,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); auto clientSettings = TFederatedTopicClientSettings() .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy( @@ -613,10 +613,10 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto& message = messages[i]; UNIT_ASSERT(message.GetFederatedPartitionSession()->GetReadSourceDatabaseName() == "dc1"); UNIT_ASSERT(message.GetFederatedPartitionSession()->GetTopicPath() == setup->GetTestTopic()); - UNIT_ASSERT(message.GetData().EndsWith(message.GetFederatedPartitionSession()->GetTopicOriginDatabaseName())); + UNIT_ASSERT(message.GetData().ends_with(message.GetFederatedPartitionSession()->GetTopicOriginDatabaseName())); UNIT_ASSERT(!sentSet.empty()); - UNIT_ASSERT_C(sentSet.erase(message.GetData()), "no such element is sentSet: " + message.GetData()); + UNIT_ASSERT_C(sentSet.erase(TString{message.GetData()}), "no such element is sentSet: " + message.GetData()); totalReceived++; } if (totalReceived == 3 * sentMessages.size()) { @@ -719,7 +719,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver); @@ -787,7 +787,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver); @@ -821,7 +821,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver); @@ -878,7 +878,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); TFederatedTopicClientSettings clientSettings; clientSettings.RetryPolicy(NPersQueue::IRetryPolicy::GetNoRetryPolicy()); @@ -934,7 +934,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver); @@ -1017,7 +1017,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); TFederatedTopicClientSettings clientSettings; NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings); @@ -1036,13 +1036,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) { { auto e = WriteSession->GetEvent(true); - UNIT_ASSERT(e.Defined()); + UNIT_ASSERT(e.has_value()); Cerr << ">>> Got event: " << DebugString(*e) << Endl; UNIT_ASSERT(std::holds_alternative(*e)); } { auto e = WriteSession->GetEvent(true); - UNIT_ASSERT(e.Defined()); + UNIT_ASSERT(e.has_value()); Cerr << ">>> Got event: " << DebugString(*e) << Endl; UNIT_ASSERT(std::holds_alternative(*e)); } @@ -1052,7 +1052,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NTopic::TContinuationToken GetToken(std::shared_ptr writer) { auto e = writer->GetEvent(true); - UNIT_ASSERT(e.Defined()); + UNIT_ASSERT(e.has_value()); Cerr << ">>> Got event: " << DebugString(*e) << Endl; auto* readyToAcceptEvent = std::get_if(&*e); UNIT_ASSERT(readyToAcceptEvent); @@ -1076,7 +1076,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto driverConfig = NYdb::TDriverConfig() .SetEndpoint(TStringBuilder() << "localhost:" << newServicePort) .SetDatabase("/Root") - .SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + .SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); auto driver = NYdb::TDriver(driverConfig); auto topicClient = NYdb::NFederatedTopic::TFederatedTopicClient(driver); @@ -1169,7 +1169,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { NYdb::TDriverConfig cfg; cfg.SetEndpoint(TStringBuilder() << "localhost:" << setup->GetGrpcPort()); cfg.SetDatabase("/Root"); - cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + cfg.SetLog(std::unique_ptr(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); NYdb::TDriver driver(cfg); NYdb::NFederatedTopic::TFederatedTopicClient client(driver); diff --git a/src/client/impl/ydb_internal/session_pool/session_pool.h b/src/client/impl/ydb_internal/session_pool/session_pool.h index a2a4f9f83d..18a526ba1e 100644 --- a/src/client/impl/ydb_internal/session_pool/session_pool.h +++ b/src/client/impl/ydb_internal/session_pool/session_pool.h @@ -51,7 +51,9 @@ NThreading::TFuture InjectSessionStatusInterception( // Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect // since we have guarantee this request wasn't been started to execute. - if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { + if (status.IsTransportError() + && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED && status.GetStatus() != EStatus::CLIENT_OUT_OF_RANGE) + { impl->MarkBroken(); } else if (status.GetStatus() == EStatus::SESSION_BUSY) { impl->MarkBroken(); @@ -70,6 +72,7 @@ NThreading::TFuture InjectSessionStatusInterception( impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TKqpSessionCommon::EState::S_ACTIVE); } } + if (cb) { cb(value, *impl); } diff --git a/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp b/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp index 216c743428..ee87596df8 100644 --- a/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp +++ b/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp @@ -1,4 +1,6 @@ #include "data_plane_helpers.h" +#include +#include namespace NKikimr::NPersQueueTests { @@ -51,7 +53,8 @@ namespace NKikimr::NPersQueueTests { std::optional partitionGroup, std::optional codec, std::optional reconnectOnFailure, - std::unordered_map sessionMeta + std::unordered_map sessionMeta, + const TString& userAgent ) { auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId); if (partitionGroup) settings.PartitionGroupId(*partitionGroup); @@ -66,6 +69,9 @@ namespace NKikimr::NPersQueueTests { } settings.MaxMemoryUsage(1024*1024*1024*1024ll); settings.Meta_.Fields = sessionMeta; + if (!userAgent.empty()) { + settings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}}); + } return CreateSimpleWriter(driver, settings); } @@ -79,6 +85,21 @@ namespace NKikimr::NPersQueueTests { return TPersQueueClient(driver, clientSettings).CreateReadSession(TReadSessionSettings(settings).DisableClusterDiscovery(true)); } + std::shared_ptr CreateReader( + NYdb::TDriver& driver, + const NYdb::NTopic::TReadSessionSettings& settings, + std::shared_ptr creds, + const TString& userAgent + ) { + NYdb::NTopic::TTopicClientSettings clientSettings; + if (creds) clientSettings.CredentialsProviderFactory(creds); + auto readerSettings = settings; + if (!userAgent.empty()) { + readerSettings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}}); + } + return NYdb::NTopic::TTopicClient(driver, clientSettings).CreateReadSession(readerSettings); + } + TMaybe GetNextMessageSkipAssignment(std::shared_ptr& reader, TDuration timeout) { while (true) { auto future = reader->WaitEvent(); @@ -99,4 +120,24 @@ namespace NKikimr::NPersQueueTests { } return {}; } + + TMaybe GetNextMessageSkipAssignment(std::shared_ptr& reader, TDuration timeout) { + while (true) { + auto future = reader->WaitEvent(); + future.Wait(timeout); + std::optional event = reader->GetEvent(false, 1); + if (!event) + return {}; + if (auto e = std::get_if(&*event)) { + return *e; + } else if (auto* e = std::get_if(&*event)) { + e->Confirm(); + } else if (auto* e = std::get_if(&*event)) { + e->Confirm(); + } else if (std::get_if(&*event)) { + return {}; + } + } + return {}; + } } diff --git a/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.h b/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.h index 4357cccefb..a62a0de014 100644 --- a/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.h +++ b/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.h @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -36,16 +37,24 @@ namespace NKikimr::NPersQueueTests { std::optional partitionGroup = {}, std::optional codec = {}, std::optional reconnectOnFailure = {}, - std::unordered_map sessionMeta = {} + std::unordered_map sessionMeta = {}, + const TString& userAgent = {} ); std::shared_ptr CreateReader( NYdb::TDriver& driver, const NYdb::NPersQueue::TReadSessionSettings& settings, std::shared_ptr creds = nullptr + ); + std::shared_ptr CreateReader( + NYdb::TDriver& driver, + const NYdb::NTopic::TReadSessionSettings& settings, + std::shared_ptr creds = nullptr, + const TString& userAgent = "" ); TMaybe GetNextMessageSkipAssignment(std::shared_ptr& reader, TDuration timeout = TDuration::Max()); + TMaybe GetNextMessageSkipAssignment(std::shared_ptr& reader, TDuration timeout = TDuration::Max()); } diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index fac961ec62..422b40bb4a 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -90,7 +90,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public auto request = MakeOperationRequest(settings); request.set_exec_mode(::Ydb::Query::ExecMode(settings.ExecMode_)); request.set_stats_mode(::Ydb::Query::StatsMode(settings.StatsMode_)); - request.set_pool_id(TStringType{settings.PoolId_}); + request.set_pool_id(TStringType{settings.ResourcePool_}); request.mutable_script_content()->set_syntax(::Ydb::Query::Syntax(settings.Syntax_)); request.mutable_script_content()->set_text(TStringType{script}); SetDuration(settings.ResultsTtl_, *request.mutable_results_ttl()); diff --git a/src/client/query/impl/exec_query.cpp b/src/client/query/impl/exec_query.cpp index 4f3f5750a8..3dd9a896c0 100644 --- a/src/client/query/impl/exec_query.cpp +++ b/src/client/query/impl/exec_query.cpp @@ -1,9 +1,11 @@ #define INCLUDE_YDB_INTERNAL_H #include "exec_query.h" +#include "client_session.h" #include #include #include +#include #include #undef INCLUDE_YDB_INTERNAL_H @@ -59,7 +61,7 @@ class TExecuteQueryIterator::TReaderImpl { return Finished_; } - TAsyncExecuteQueryPart ReadNext(std::shared_ptr self) { + TAsyncExecuteQueryPart DoReadNext(std::shared_ptr self) { auto promise = NThreading::NewPromise(); // Capture self - guarantee no dtor call during the read auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable { @@ -100,6 +102,18 @@ class TExecuteQueryIterator::TReaderImpl { StreamProcessor_->Read(&Response_, readCb); return promise.GetFuture(); } + + TAsyncExecuteQueryPart ReadNext(std::shared_ptr self) { + if (!Session_) + return DoReadNext(std::move(self)); + + return NSessionPool::InjectSessionStatusInterception( + Session_->SessionImpl_, + DoReadNext(std::move(self)), + false, // no need to ping stream session + TDuration::Zero()); + } + private: TStreamProcessorPtr StreamProcessor_; TResponse Response_; @@ -212,7 +226,7 @@ TFuture> StreamExecuteQueryIm auto request = MakeRequest(); request.set_exec_mode(::Ydb::Query::ExecMode(settings.ExecMode_)); request.set_stats_mode(::Ydb::Query::StatsMode(settings.StatsMode_)); - request.set_pool_id(TStringType{settings.PoolId_}); + request.set_pool_id(TStringType{settings.ResourcePool_}); request.mutable_query_content()->set_text(TStringType{query}); request.mutable_query_content()->set_syntax(::Ydb::Query::Syntax(settings.Syntax_)); if (session.has_value()) { diff --git a/src/client/table/impl/transaction.cpp b/src/client/table/impl/transaction.cpp index e4440fdd6d..8234ae26b1 100644 --- a/src/client/table/impl/transaction.cpp +++ b/src/client/table/impl/transaction.cpp @@ -14,12 +14,19 @@ TAsyncStatus TTransaction::TImpl::Precommit() const auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {})); for (auto& callback : PrecommitCallbacks) { - auto action = [curr = callback()](const TAsyncStatus& prev) { + if (!callback) { + continue; + } + + // If you send multiple requests in parallel, the `KQP` service can respond with `SESSION_BUSY`. + // Therefore, precommit operations are performed sequentially. Here we capture the closure to + // trigger it later. + auto action = [callback = std::move(callback)](const TAsyncStatus& prev) { if (const TStatus& status = prev.GetValue(); !status.IsSuccess()) { return prev; } - return curr; + return callback(); }; result = result.Apply(action); @@ -39,6 +46,8 @@ TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSetting return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), std::nullopt)); } + PrecommitCallbacks.clear(); + return Session_.Client_->CommitTransaction(Session_, TxId_, settings); diff --git a/src/client/table/impl/transaction.h b/src/client/table/impl/transaction.h index 5507606b29..6cf4675641 100644 --- a/src/client/table/impl/transaction.h +++ b/src/client/table/impl/transaction.h @@ -31,7 +31,7 @@ class TTransaction::TImpl { std::string TxId_; bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet - std::vector PrecommitCallbacks; + mutable std::vector PrecommitCallbacks; }; } diff --git a/src/client/table/out.cpp b/src/client/table/out.cpp index 401695350c..9d7a957219 100644 --- a/src/client/table/out.cpp +++ b/src/client/table/out.cpp @@ -1,5 +1,9 @@ #include +Y_DECLARE_OUT_SPEC(, NYdb::NTable::TCopyItem, o, x) { + return x.Out(o); +} + Y_DECLARE_OUT_SPEC(, NYdb::NTable::TIndexDescription, o, x) { return x.Out(o); } @@ -24,68 +28,58 @@ Y_DECLARE_OUT_SPEC(, NYdb::NTable::TDescribeTableResult, o, x) { return x.Out(o); } -Y_DECLARE_OUT_SPEC(, NYdb::NTable::TVectorIndexSettings::EDistance, stream, value) { - auto convertDistance = [] (auto value) -> auto { - switch (value) { - case NYdb::NTable::TVectorIndexSettings::EDistance::Cosine: - return "COSINE"; - case NYdb::NTable::TVectorIndexSettings::EDistance::Manhattan: - return "MANHATTAN"; - case NYdb::NTable::TVectorIndexSettings::EDistance::Euclidean: - return "EUCLIDEAN"; - case NYdb::NTable::TVectorIndexSettings::EDistance::Unknown: - return "UNKNOWN"; - } - }; - - stream << convertDistance(value); -} - -Y_DECLARE_OUT_SPEC(, NYdb::NTable::TVectorIndexSettings::ESimilarity, stream, value) { - auto convertSimilarity = [] (auto value) -> auto { +Y_DECLARE_OUT_SPEC(, NYdb::NTable::TVectorIndexSettings::EMetric, stream, value) { + auto convertDistance = [&] { switch (value) { - case NYdb::NTable::TVectorIndexSettings::ESimilarity::Cosine: - return "COSINE"; - case NYdb::NTable::TVectorIndexSettings::ESimilarity::InnerProduct: - return "INNER_PRODUCT"; - case NYdb::NTable::TVectorIndexSettings::ESimilarity::Unknown: - return "UNKNOWN"; + case NYdb::NTable::TVectorIndexSettings::EMetric::InnerProduct: + return "similarity: inner_product"; + case NYdb::NTable::TVectorIndexSettings::EMetric::CosineSimilarity: + return "similarity: cosine"; + case NYdb::NTable::TVectorIndexSettings::EMetric::CosineDistance: + return "distance: cosine"; + case NYdb::NTable::TVectorIndexSettings::EMetric::Manhattan: + return "distance: manhattan"; + case NYdb::NTable::TVectorIndexSettings::EMetric::Euclidean: + return "distance: euclidean"; + case NYdb::NTable::TVectorIndexSettings::EMetric::Unspecified: + return "metric: unspecified"; } }; - stream << convertSimilarity(value); + stream << convertDistance(); } Y_DECLARE_OUT_SPEC(, NYdb::NTable::TVectorIndexSettings::EVectorType, stream, value) { - auto convertVectorType = [] (auto value) -> auto { + auto convertVectorType = [&] { switch (value) { case NYdb::NTable::TVectorIndexSettings::EVectorType::Float: - return "FLOAT"; + return "float"; case NYdb::NTable::TVectorIndexSettings::EVectorType::Uint8: - return "UINT8"; + return "uint8"; case NYdb::NTable::TVectorIndexSettings::EVectorType::Int8: - return "INT8"; + return "int8"; case NYdb::NTable::TVectorIndexSettings::EVectorType::Bit: - return "BIT"; - case NYdb::NTable::TVectorIndexSettings::EVectorType::Unknown: - return "UNKNOWN"; + return "bit"; + case NYdb::NTable::TVectorIndexSettings::EVectorType::Unspecified: + return "unspecified"; } }; - stream << convertVectorType(value); + stream << convertVectorType(); } Y_DECLARE_OUT_SPEC(, NYdb::NTable::TVectorIndexSettings, stream, value) { - stream << "{"; - - if (const auto* distance = std::get_if(&value.Metric)) { - stream << " distance: " << *distance << ""; - } else if (const auto* similarity = std::get_if(&value.Metric)) { - stream << " similarity: " << *similarity << ""; - } - - stream << ", vector_type: " << value.VectorType << ""; - stream << ", vector_dimension: " << value.VectorDimension << ""; + stream << + "{ " << value.Metric << + ", vector_type: " << value.VectorType << + ", vector_dimension: " << value.VectorDimension << + " }"; +} - stream << " }"; +Y_DECLARE_OUT_SPEC(, NYdb::NTable::TKMeansTreeSettings, stream, value) { + stream << + "{ settings: " << value.Settings << + ", clusters: " << value.Clusters << + ", levels: " << value.Levels << + " }"; } diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index f4b45bb9bf..3151b9fac1 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -253,8 +253,7 @@ static void SerializeTo(const TRenameIndex& rename, Ydb::Table::RenameIndexItem& proto.set_replace_destination(rename.ReplaceDestination_); } -template -TExplicitPartitions TExplicitPartitions::FromProto(const TProto& proto) { +TExplicitPartitions TExplicitPartitions::FromProto(const Ydb::Table::ExplicitPartitions& proto) { TExplicitPartitions out; for (const auto& splitPoint : proto.split_points()) { TValue value(TType(splitPoint.type()), splitPoint.value()); @@ -491,12 +490,12 @@ class TTableDescription::TImpl { Indexes_.emplace_back(indexDescription); } - void AddVectorIndex(const std::string& indexName, EIndexType type, const std::vector& indexColumns, const TVectorIndexSettings& vectorIndexSettings) { - Indexes_.emplace_back(TIndexDescription(indexName, type, indexColumns, {}, {}, vectorIndexSettings)); + void AddVectorKMeansTreeIndex(const std::string& indexName, EIndexType type, const std::vector& indexColumns, const TKMeansTreeSettings& indexSettings) { + Indexes_.emplace_back(TIndexDescription(indexName, type, indexColumns, {}, {}, indexSettings)); } - void AddVectorIndex(const std::string& indexName, EIndexType type, const std::vector& indexColumns, const std::vector& dataColumns, const TVectorIndexSettings& vectorIndexSettings) { - Indexes_.emplace_back(TIndexDescription(indexName, type, indexColumns, dataColumns, {}, vectorIndexSettings)); + void AddVectorKMeansTreeIndex(const std::string& indexName, EIndexType type, const std::vector& indexColumns, const std::vector& dataColumns, const TKMeansTreeSettings& indexSettings) { + Indexes_.emplace_back(TIndexDescription(indexName, type, indexColumns, dataColumns, {}, indexSettings)); } void AddChangefeed(const std::string& name, EChangefeedMode mode, EChangefeedFormat format) { @@ -800,12 +799,12 @@ void TTableDescription::AddUniqueSecondaryIndex(const std::string& indexName, co AddSecondaryIndex(indexName, EIndexType::GlobalUnique, indexColumns, dataColumns); } -void TTableDescription::AddVectorKMeansTreeSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const TVectorIndexSettings& vectorIndexSettings) { - Impl_->AddVectorIndex(indexName, EIndexType::GlobalVectorKMeansTree, indexColumns, vectorIndexSettings); +void TTableDescription::AddVectorKMeansTreeIndex(const std::string& indexName, const std::vector& indexColumns, const TKMeansTreeSettings& indexSettings) { + Impl_->AddVectorKMeansTreeIndex(indexName, EIndexType::GlobalVectorKMeansTree, indexColumns, indexSettings); } -void TTableDescription::AddVectorKMeansTreeSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns, const TVectorIndexSettings& vectorIndexSettings) { - Impl_->AddVectorIndex(indexName, EIndexType::GlobalVectorKMeansTree, indexColumns, dataColumns, vectorIndexSettings); +void TTableDescription::AddVectorKMeansTreeIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns, const TKMeansTreeSettings& indexSettings) { + Impl_->AddVectorKMeansTreeIndex(indexName, EIndexType::GlobalVectorKMeansTree, indexColumns, dataColumns, indexSettings); } void TTableDescription::AddSecondaryIndex(const std::string& indexName, const std::vector& indexColumns) { @@ -1278,13 +1277,13 @@ TTableBuilder& TTableBuilder::AddUniqueSecondaryIndex(const std::string& indexNa return AddSecondaryIndex(indexName, EIndexType::GlobalUnique, indexColumns); } -TTableBuilder& TTableBuilder::AddVectorKMeansTreeSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns, const TVectorIndexSettings& vectorIndexSettings) { - TableDescription_.AddVectorKMeansTreeSecondaryIndex(indexName, indexColumns, dataColumns, vectorIndexSettings); +TTableBuilder& TTableBuilder::AddVectorKMeansTreeIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns, const TKMeansTreeSettings& indexSettings) { + TableDescription_.AddVectorKMeansTreeIndex(indexName, indexColumns, dataColumns, indexSettings); return *this; } -TTableBuilder& TTableBuilder::AddVectorKMeansTreeSecondaryIndex(const std::string& indexName, const std::vector& indexColumns, const TVectorIndexSettings& vectorIndexSettings) { - TableDescription_.AddVectorKMeansTreeSecondaryIndex(indexName, indexColumns, vectorIndexSettings); +TTableBuilder& TTableBuilder::AddVectorKMeansTreeIndex(const std::string& indexName, const std::vector& indexColumns, const TKMeansTreeSettings& indexSettings) { + TableDescription_.AddVectorKMeansTreeIndex(indexName, indexColumns, indexSettings); return *this; } @@ -2284,6 +2283,12 @@ bool TCopyItem::OmitIndexes() const { return OmitIndexes_; } +void TCopyItem::Out(IOutputStream& o) const { + o << "{ src: \"" << Source_ << "\"" + << ", dst: \"" << Destination_ << "\"" + << " }"; +} + //////////////////////////////////////////////////////////////////////////////// TRenameItem::TRenameItem(const std::string& source, const std::string& destination) @@ -2317,13 +2322,13 @@ TIndexDescription::TIndexDescription( const std::vector& indexColumns, const std::vector& dataColumns, const std::vector& globalIndexSettings, - const std::optional& vectorIndexSettings + const std::variant& specializedIndexSettings ) : IndexName_(name) , IndexType_(type) , IndexColumns_(indexColumns) , DataColumns_(dataColumns) , GlobalIndexSettings_(globalIndexSettings) - , VectorIndexSettings_(vectorIndexSettings) + , SpecializedIndexSettings_(specializedIndexSettings) {} TIndexDescription::TIndexDescription( @@ -2358,21 +2363,20 @@ const std::vector& TIndexDescription::GetDataColumns() const { return DataColumns_; } -const std::optional& TIndexDescription::GetVectorIndexSettings() const { - return VectorIndexSettings_; +const std::variant& TIndexDescription::GetVectorIndexSettings() const { + return SpecializedIndexSettings_; } uint64_t TIndexDescription::GetSizeBytes() const { - return SizeBytes; + return SizeBytes_; } -template -TGlobalIndexSettings TGlobalIndexSettings::FromProto(const TProto& proto) { - auto partitionsFromProto = [](const auto& proto) -> TUniformOrExplicitPartitions { +TGlobalIndexSettings TGlobalIndexSettings::FromProto(const Ydb::Table::GlobalIndexSettings& proto) { + auto partitionsFromProto = [](const Ydb::Table::GlobalIndexSettings& proto) -> TUniformOrExplicitPartitions { switch (proto.partitions_case()) { - case TProto::kUniformPartitions: + case Ydb::Table::GlobalIndexSettings::kUniformPartitions: return proto.uniform_partitions(); - case TProto::kPartitionAtKeys: + case Ydb::Table::GlobalIndexSettings::kPartitionAtKeys: return TExplicitPartitions::FromProto(proto.partition_at_keys()); default: return {}; @@ -2399,34 +2403,26 @@ void TGlobalIndexSettings::SerializeTo(Ydb::Table::GlobalIndexSettings& settings std::visit(std::move(variantVisitor), Partitions); } -template -TVectorIndexSettings TVectorIndexSettings::FromProto(const TProto& proto) { - auto convertDistance = [] (auto distance) -> auto { - switch (distance) { +TVectorIndexSettings TVectorIndexSettings::FromProto(const Ydb::Table::VectorIndexSettings& proto) { + auto covertMetric = [&] { + switch (proto.metric()) { + case Ydb::Table::VectorIndexSettings::SIMILARITY_INNER_PRODUCT: + return EMetric::InnerProduct; + case Ydb::Table::VectorIndexSettings::SIMILARITY_COSINE: + return EMetric::CosineSimilarity; case Ydb::Table::VectorIndexSettings::DISTANCE_COSINE: - return EDistance::Cosine; + return EMetric::CosineDistance; case Ydb::Table::VectorIndexSettings::DISTANCE_MANHATTAN: - return EDistance::Manhattan; + return EMetric::Manhattan; case Ydb::Table::VectorIndexSettings::DISTANCE_EUCLIDEAN: - return EDistance::Euclidean; - default: - return EDistance::Unknown; - } - }; - - auto convertSimilarity = [] (auto similarity) -> auto { - switch (similarity) { - case Ydb::Table::VectorIndexSettings::SIMILARITY_COSINE: - return ESimilarity::Cosine; - case Ydb::Table::VectorIndexSettings::SIMILARITY_INNER_PRODUCT: - return ESimilarity::InnerProduct; + return EMetric::Euclidean; default: - return ESimilarity::Unknown; + return EMetric::Unspecified; } }; - auto convertVectorType = [] (auto vectorType) -> auto { - switch (vectorType) { + auto convertVectorType = [&] { + switch (proto.vector_type()) { case Ydb::Table::VectorIndexSettings::VECTOR_TYPE_FLOAT: return EVectorType::Float; case Ydb::Table::VectorIndexSettings::VECTOR_TYPE_UINT8: @@ -2436,56 +2432,37 @@ TVectorIndexSettings TVectorIndexSettings::FromProto(const TProto& proto) { case Ydb::Table::VectorIndexSettings::VECTOR_TYPE_BIT: return EVectorType::Bit; default: - return EVectorType::Unknown; - } - }; - - - auto metricFromProto = [&](const auto& proto) -> TVectorIndexSettings::TMetric { - switch (proto.metric_case()) { - case TProto::kDistance: - return convertDistance(proto.distance()); - case TProto::kSimilarity: - return convertSimilarity(proto.similarity()); - default: - return {}; + return EVectorType::Unspecified; } }; return { - .Metric = metricFromProto(proto), - .VectorType = convertVectorType(proto.vector_type()), - .VectorDimension = proto.vector_dimension() + .Metric = covertMetric(), + .VectorType = convertVectorType(), + .VectorDimension = proto.vector_dimension(), }; } void TVectorIndexSettings::SerializeTo(Ydb::Table::VectorIndexSettings& settings) const { - auto convertDistance = [] (auto distance) -> auto { - switch (distance) { - case EDistance::Cosine: + auto convertMetric = [&] { + switch (Metric) { + case EMetric::InnerProduct: + return Ydb::Table::VectorIndexSettings::SIMILARITY_INNER_PRODUCT; + case EMetric::CosineSimilarity: + return Ydb::Table::VectorIndexSettings::SIMILARITY_COSINE; + case EMetric::CosineDistance: return Ydb::Table::VectorIndexSettings::DISTANCE_COSINE; - case EDistance::Manhattan: + case EMetric::Manhattan: return Ydb::Table::VectorIndexSettings::DISTANCE_MANHATTAN; - case EDistance::Euclidean: + case EMetric::Euclidean: return Ydb::Table::VectorIndexSettings::DISTANCE_EUCLIDEAN; - case EDistance::Unknown: - return Ydb::Table::VectorIndexSettings::DISTANCE_UNSPECIFIED; + case EMetric::Unspecified: + return Ydb::Table::VectorIndexSettings::METRIC_UNSPECIFIED; } }; - auto convertSimilarity = [] (auto similarity) -> auto { - switch (similarity) { - case ESimilarity::Cosine: - return Ydb::Table::VectorIndexSettings::SIMILARITY_COSINE; - case ESimilarity::InnerProduct: - return Ydb::Table::VectorIndexSettings::SIMILARITY_INNER_PRODUCT; - case ESimilarity::Unknown: - return Ydb::Table::VectorIndexSettings::SIMILARITY_UNSPECIFIED; - } - }; - - auto convertVectorType = [] (auto vectorType) -> auto { - switch (vectorType) { + auto convertVectorType = [&] { + switch (VectorType) { case EVectorType::Float: return Ydb::Table::VectorIndexSettings::VECTOR_TYPE_FLOAT; case EVectorType::Uint8: @@ -2494,19 +2471,13 @@ void TVectorIndexSettings::SerializeTo(Ydb::Table::VectorIndexSettings& settings return Ydb::Table::VectorIndexSettings::VECTOR_TYPE_INT8; case EVectorType::Bit: return Ydb::Table::VectorIndexSettings::VECTOR_TYPE_BIT; - case EVectorType::Unknown: + case EVectorType::Unspecified: return Ydb::Table::VectorIndexSettings::VECTOR_TYPE_UNSPECIFIED; } }; - - if (const auto* distance = std::get_if(&Metric)) { - settings.set_distance(convertDistance(*distance)); - } else if (const auto* similarity = std::get_if(&Metric)) { - settings.set_similarity(convertSimilarity(*similarity)); - } - - settings.set_vector_type(convertVectorType(VectorType)); + settings.set_metric(convertMetric()); + settings.set_vector_type(convertVectorType()); settings.set_vector_dimension(VectorDimension); } @@ -2514,13 +2485,31 @@ void TVectorIndexSettings::Out(IOutputStream& o) const { o << *this; } +TKMeansTreeSettings TKMeansTreeSettings::FromProto(const Ydb::Table::KMeansTreeSettings& proto) { + return { + .Settings = TVectorIndexSettings::FromProto(proto.settings()), + .Clusters = proto.clusters(), + .Levels = proto.levels(), + }; +} + +void TKMeansTreeSettings::SerializeTo(Ydb::Table::KMeansTreeSettings& settings) const { + Settings.SerializeTo(*settings.mutable_settings()); + settings.set_clusters(Clusters); + settings.set_levels(Levels); +} + +void TKMeansTreeSettings::Out(IOutputStream& o) const { + o << *this; +} + template TIndexDescription TIndexDescription::FromProto(const TProto& proto) { EIndexType type; std::vector indexColumns; std::vector dataColumns; std::vector globalIndexSettings; - std::optional vectorIndexSettings; + std::variant specializedIndexSettings; indexColumns.assign(proto.index_columns().begin(), proto.index_columns().end()); dataColumns.assign(proto.data_columns().begin(), proto.data_columns().end()); @@ -2543,7 +2532,7 @@ TIndexDescription TIndexDescription::FromProto(const TProto& proto) { const auto &vectorProto = proto.global_vector_kmeans_tree_index(); globalIndexSettings.emplace_back(TGlobalIndexSettings::FromProto(vectorProto.level_table_settings())); globalIndexSettings.emplace_back(TGlobalIndexSettings::FromProto(vectorProto.posting_table_settings())); - vectorIndexSettings = TVectorIndexSettings::FromProto(vectorProto.vector_settings()); + specializedIndexSettings = TKMeansTreeSettings::FromProto(vectorProto.vector_settings()); break; } default: // fallback to global sync @@ -2552,9 +2541,9 @@ TIndexDescription TIndexDescription::FromProto(const TProto& proto) { break; } - auto result = TIndexDescription(proto.name(), type, indexColumns, dataColumns, globalIndexSettings, vectorIndexSettings); + auto result = TIndexDescription(proto.name(), type, indexColumns, dataColumns, globalIndexSettings, specializedIndexSettings); if constexpr (std::is_same_v) { - result.SizeBytes = proto.size_bytes(); + result.SizeBytes_ = proto.size_bytes(); } return result; @@ -2596,8 +2585,8 @@ void TIndexDescription::SerializeTo(Ydb::Table::TableIndex& proto) const { GlobalIndexSettings_[0].SerializeTo(level_settings); GlobalIndexSettings_[1].SerializeTo(posting_settings); } - if (VectorIndexSettings_) { - VectorIndexSettings_->SerializeTo(vector_settings); + if (const auto* settings = std::get_if(&SpecializedIndexSettings_)) { + settings->SerializeTo(vector_settings); } break; } @@ -2622,9 +2611,12 @@ void TIndexDescription::Out(IOutputStream& o) const { o << ", data_columns: [" << JoinSeq(", ", DataColumns_) << "]"; } - if (VectorIndexSettings_) { - o << ", vector_settings: " << *VectorIndexSettings_ << ""; - } + std::visit([&](const T& settings) { + if constexpr (!std::is_same_v) { + o << ", vector_settings: " << settings; + } + }, SpecializedIndexSettings_); + o << " }"; } diff --git a/src/client/topic/impl/CMakeLists.txt b/src/client/topic/impl/CMakeLists.txt index ed9ccdc316..299c0dbe26 100644 --- a/src/client/topic/impl/CMakeLists.txt +++ b/src/client/topic/impl/CMakeLists.txt @@ -27,12 +27,14 @@ target_sources(client-ydb_topic-impl common.cpp deferred_commit.cpp event_handlers.cpp + offsets_collector.cpp read_session_event.cpp read_session.cpp - write_session.cpp - write_session_impl.cpp topic_impl.cpp topic.cpp + transaction.cpp + write_session_impl.cpp + write_session.cpp ) _ydb_sdk_install_targets(TARGETS client-ydb_topic-impl) diff --git a/src/client/topic/impl/offsets_collector.cpp b/src/client/topic/impl/offsets_collector.cpp new file mode 100644 index 0000000000..ea3ead2776 --- /dev/null +++ b/src/client/topic/impl/offsets_collector.cpp @@ -0,0 +1,72 @@ +#include "offsets_collector.h" + +namespace NYdb::NTopic { + +std::vector TOffsetsCollector::GetOffsets() const +{ + std::vector topics; + + for (auto& [path, partitions] : Ranges) { + TTopicOffsets topic; + topic.Path = path; + + topics.push_back(std::move(topic)); + + for (auto& [id, ranges] : partitions) { + TPartitionOffsets partition; + partition.PartitionId = id; + + TTopicOffsets& t = topics.back(); + t.Partitions.push_back(std::move(partition)); + + for (auto& range : ranges) { + TPartitionOffsets& p = t.Partitions.back(); + + TOffsetsRange r; + r.Start = range.first; + r.End = range.second; + + p.Offsets.push_back(r); + } + } + } + + return topics; +} + +void TOffsetsCollector::CollectOffsets(const std::vector& events) +{ + for (auto& event : events) { + if (auto* e = std::get_if(&event)) { + CollectOffsets(*e); + } + } +} + +void TOffsetsCollector::CollectOffsets(const TReadSessionEvent::TEvent& event) +{ + if (auto* e = std::get_if(&event)) { + CollectOffsets(*e); + } +} + +void TOffsetsCollector::CollectOffsets(const TReadSessionEvent::TDataReceivedEvent& event) +{ + const auto& session = *event.GetPartitionSession(); + const std::string& topicPath = session.GetTopicPath(); + uint32_t partitionId = session.GetPartitionId(); + + if (event.HasCompressedMessages()) { + for (auto& message : event.GetCompressedMessages()) { + uint64_t offset = message.GetOffset(); + Ranges[topicPath][partitionId].InsertInterval(offset, offset + 1); + } + } else { + for (auto& message : event.GetMessages()) { + uint64_t offset = message.GetOffset(); + Ranges[topicPath][partitionId].InsertInterval(offset, offset + 1); + } + } +} + +} \ No newline at end of file diff --git a/src/client/topic/impl/offsets_collector.h b/src/client/topic/impl/offsets_collector.h new file mode 100644 index 0000000000..281d01c1f6 --- /dev/null +++ b/src/client/topic/impl/offsets_collector.h @@ -0,0 +1,35 @@ +#pragma once + +#include "topic_impl.h" +#include "transaction.h" + +#include +#include + +#include + +#include +#include +#include + +#include + +namespace NYdb::NTopic { + +class TOffsetsCollector { +public: + std::vector GetOffsets() const; + + void CollectOffsets(const std::vector& events); + void CollectOffsets(const TReadSessionEvent::TEvent& event); + +private: + // topic -> partition -> (begin, end) + using TOffsetRanges = std::unordered_map>>; + + void CollectOffsets(const TReadSessionEvent::TDataReceivedEvent& event); + + TOffsetRanges Ranges; +}; + +} diff --git a/src/client/topic/impl/read_session.cpp b/src/client/topic/impl/read_session.cpp index 541481b5d0..539d235c13 100644 --- a/src/client/topic/impl/read_session.cpp +++ b/src/client/topic/impl/read_session.cpp @@ -11,6 +11,13 @@ namespace NYdb::NTopic { static const std::string DRIVER_IS_STOPPING_DESCRIPTION = "Driver is stopping"; +void SetReadInTransaction(TReadSessionEvent::TEvent& event) +{ + if (auto* e = std::get_if(&event)) { + e->SetReadInTransaction(); + } +} + TReadSession::TReadSession(const TReadSessionSettings& settings, std::shared_ptr client, std::shared_ptr connections, @@ -136,12 +143,10 @@ std::vector TReadSession::GetEvents(const TReadSessio auto events = GetEvents(settings.Block_, settings.MaxEventsCount_, settings.MaxByteSize_); if (!events.empty() && settings.Tx_) { auto& tx = settings.Tx_->get(); + CbContext->TryGet()->CollectOffsets(tx, events, Client); for (auto& event : events) { - if (auto* dataEvent = std::get_if(&event)) { - CollectOffsets(tx, *dataEvent); - } + SetReadInTransaction(event); } - UpdateOffsets(tx); } return events; } @@ -157,92 +162,14 @@ std::optional TReadSession::GetEvent(bool block, size std::optional TReadSession::GetEvent(const TReadSessionGetEventSettings& settings) { auto event = GetEvent(settings.Block_, settings.MaxByteSize_); - if (event) { + if (event && settings.Tx_) { auto& tx = settings.Tx_->get(); - if (auto* dataEvent = std::get_if(&*event)) { - CollectOffsets(tx, *dataEvent); - } - UpdateOffsets(tx); + CbContext->TryGet()->CollectOffsets(tx, *event, Client); + SetReadInTransaction(*event); } return event; } -void TReadSession::CollectOffsets(NTable::TTransaction& tx, - const TReadSessionEvent::TDataReceivedEvent& event) -{ - const auto& session = *event.GetPartitionSession(); - - if (event.HasCompressedMessages()) { - for (auto& message : event.GetCompressedMessages()) { - CollectOffsets(tx, session.GetTopicPath(), session.GetPartitionId(), message.GetOffset()); - } - } else { - for (auto& message : event.GetMessages()) { - CollectOffsets(tx, session.GetTopicPath(), session.GetPartitionId(), message.GetOffset()); - } - } -} - -void TReadSession::CollectOffsets(NTable::TTransaction& tx, - const std::string& topicPath, ui32 partitionId, ui64 offset) -{ - const std::string& sessionId = tx.GetSession().GetId(); - const std::string& txId = tx.GetId(); - TOffsetRanges& ranges = OffsetRanges[std::make_pair(sessionId, txId)]; - ranges[topicPath][partitionId].InsertInterval(offset, offset + 1); -} - -void TReadSession::UpdateOffsets(const NTable::TTransaction& tx) -{ - const std::string& sessionId = tx.GetSession().GetId(); - const std::string& txId = tx.GetId(); - - auto p = OffsetRanges.find(std::make_pair(sessionId, txId)); - if (p == OffsetRanges.end()) { - return; - } - - std::vector topics; - for (auto& [path, partitions] : p->second) { - TTopicOffsets topic; - topic.Path = path; - - topics.push_back(std::move(topic)); - - for (auto& [id, ranges] : partitions) { - TPartitionOffsets partition; - partition.PartitionId = id; - - TTopicOffsets& t = topics.back(); - t.Partitions.push_back(std::move(partition)); - - for (auto& range : ranges) { - TPartitionOffsets& p = t.Partitions.back(); - - TOffsetsRange r; - r.Start = range.first; - r.End = range.second; - - p.Offsets.push_back(r); - } - } - } - - Y_ABORT_UNLESS(!topics.empty()); - - auto result = Client->UpdateOffsetsInTransaction(tx, - topics, - Settings.ConsumerName_, - {}).GetValueSync(); - Y_ABORT_UNLESS(!result.IsTransportError()); - - if (!result.IsSuccess()) { - ythrow yexception() << "error on update offsets: " << result; - } - - OffsetRanges.erase(std::make_pair(sessionId, txId)); -} - bool TReadSession::Close(TDuration timeout) { LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); // Log final counters. diff --git a/src/client/topic/impl/read_session.h b/src/client/topic/impl/read_session.h index fb92825d82..8c314ed085 100644 --- a/src/client/topic/impl/read_session.h +++ b/src/client/topic/impl/read_session.h @@ -63,18 +63,6 @@ class TReadSession : public IReadSession { void AbortImpl(EStatus statusCode, const std::string& message, TDeferredActions& deferred); private: - using TOffsetRanges = std::unordered_map>>; - - void CollectOffsets(NTable::TTransaction& tx, - const TReadSessionEvent::TDataReceivedEvent& event); - void CollectOffsets(NTable::TTransaction& tx, - const std::string& topicPath, ui32 partitionId, ui64 offset); - void UpdateOffsets(const NTable::TTransaction& tx); - - // - // (session, tx) -> topic -> partition -> (begin, end) - // - std::unordered_map, TOffsetRanges, THash>> OffsetRanges; TReadSessionSettings Settings; const std::string SessionId; diff --git a/src/client/topic/impl/read_session_event.cpp b/src/client/topic/impl/read_session_event.cpp index f97960f66a..4895167d41 100644 --- a/src/client/topic/impl/read_session_event.cpp +++ b/src/client/topic/impl/read_session_event.cpp @@ -248,6 +248,10 @@ TDataReceivedEvent::TDataReceivedEvent(std::vector messages, std::vect } void TDataReceivedEvent::Commit() { + if (ReadInTransaction) { + ythrow yexception() << "Offsets cannot be commited explicitly when reading in a transaction"; + } + for (auto [from, to] : OffsetRanges) { static_cast*>(PartitionSession.Get())->Commit(from, to); } diff --git a/src/client/topic/impl/read_session_impl.h b/src/client/topic/impl/read_session_impl.h index e4da1a0530..c9e0a3500a 100644 --- a/src/client/topic/impl/read_session_impl.h +++ b/src/client/topic/impl/read_session_impl.h @@ -6,10 +6,15 @@ #include "common.h" #include "counters_logger.h" +#include "offsets_collector.h" +#include "transaction.h" #include #include #include +#include + +#include #include @@ -1157,6 +1162,13 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContextSetCallbackContext(TEnableSelfContext>::SelfContext); } + void CollectOffsets(NTable::TTransaction& tx, + const std::vector& events, + std::shared_ptr client); + void CollectOffsets(NTable::TTransaction& tx, + const TReadSessionEvent::TEvent& event, + std::shared_ptr client); + private: void BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions& deferred); @@ -1301,6 +1313,22 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext; + using TTransactionMap = std::unordered_map>; + + void TrySubscribeOnTransactionCommit(NTable::TTransaction& tx, + std::shared_ptr client); + TTransactionInfoPtr GetOrCreateTxInfo(const TTransactionId& txId); + void DeleteTx(const TTransactionId& txId); + const TAReadSessionSettings Settings; const std::string Database; const std::string SessionId; @@ -1348,6 +1376,8 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext> HierarchyData; std::unordered_set ReadingFinishedData; + + TTransactionMap Txs; }; } // namespace NYdb::NTopic diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 1645142f10..a247cce4ac 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -1909,6 +1909,95 @@ void TSingleClusterReadSessionImpl::ConfirmPartitionStream } } +template +void TSingleClusterReadSessionImpl::CollectOffsets(NTable::TTransaction& tx, + const std::vector& events, + std::shared_ptr client) +{ + auto txInfo = GetOrCreateTxInfo(MakeTransactionId(tx)); + TrySubscribeOnTransactionCommit(tx, std::move(client)); + with_lock (txInfo->Lock) { + txInfo->OffsetsCollector.CollectOffsets(events); + } +} + +template +void TSingleClusterReadSessionImpl::CollectOffsets(NTable::TTransaction& tx, + const TReadSessionEvent::TEvent& event, + std::shared_ptr client) +{ + auto txInfo = GetOrCreateTxInfo(MakeTransactionId(tx)); + TrySubscribeOnTransactionCommit(tx, std::move(client)); + with_lock (txInfo->Lock) { + txInfo->OffsetsCollector.CollectOffsets(event); + } +} + +template +void TSingleClusterReadSessionImpl::TrySubscribeOnTransactionCommit(NTable::TTransaction& tx, + std::shared_ptr client) +{ + const TTransactionId txId = MakeTransactionId(tx); + auto txInfo = GetOrCreateTxInfo(txId); + Y_ABORT_UNLESS(txInfo); + + with_lock (txInfo->Lock) { + if (txInfo->Subscribed) { + return; + } + + auto callback = [cbContext = this->SelfContext, txId, txInfo, consumer = Settings.ConsumerName_, client]() { + std::vector offsets; + + with_lock (txInfo->Lock) { + Y_ABORT_UNLESS(!txInfo->CommitCalled); + + txInfo->CommitCalled = true; + + offsets = txInfo->OffsetsCollector.GetOffsets(); + } + + if (auto self = cbContext->LockShared()) { + self->DeleteTx(txId); + } + + return client->UpdateOffsetsInTransaction(txId, + offsets, + consumer, + {}); + }; + + tx.AddPrecommitCallback(std::move(callback)); + + txInfo->IsActive = true; + txInfo->Subscribed = true; + } +} + +template +auto TSingleClusterReadSessionImpl::GetOrCreateTxInfo(const TTransactionId& txId) -> TTransactionInfoPtr +{ + with_lock (Lock) { + auto p = Txs.find(txId); + if (p == Txs.end()) { + TTransactionInfoPtr& txInfo = Txs[txId]; + txInfo = std::make_shared(); + txInfo->Subscribed = false; + txInfo->CommitCalled = false; + p = Txs.find(txId); + } + return p->second; + } +} + +template +void TSingleClusterReadSessionImpl::DeleteTx(const TTransactionId& txId) +{ + with_lock (Lock) { + Txs.erase(txId); + } +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TReadSessionEventInfo @@ -1994,6 +2083,13 @@ bool TReadSessionEventsQueue::PushEvent(TIntrusivePtrDeleteNotReadyTail(deferred); } + if (!HasDataEventCallback() && !std::holds_alternative>(event)) { + // Call non-dataEvent callbacks immediately. + if (TryApplyCallbackToEventImpl(event, deferred, CbContext)) { + return true; + } + } + stream->InsertEvent(std::move(event)); Y_ASSERT(stream->HasEvents()); diff --git a/src/client/topic/impl/topic_impl.h b/src/client/topic/impl/topic_impl.h index b7a83dd686..b820a46fa7 100644 --- a/src/client/topic/impl/topic_impl.h +++ b/src/client/topic/impl/topic_impl.h @@ -1,5 +1,7 @@ #pragma once +#include "transaction.h" + #include #define INCLUDE_YDB_INTERNAL_H @@ -329,15 +331,15 @@ class TTopicClient::TImpl : public TClientImplCommon { TRpcRequestSettings::Make(settings)); } - TAsyncStatus UpdateOffsetsInTransaction(const NTable::TTransaction& tx, + TAsyncStatus UpdateOffsetsInTransaction(const TTransactionId& tx, const std::vector& topics, const std::string& consumerName, const TUpdateOffsetsInTransactionSettings& settings) { auto request = MakeOperationRequest(settings); - request.mutable_tx()->set_id(TStringType{tx.GetId()}); - request.mutable_tx()->set_session(TStringType{tx.GetSession().GetId()}); + request.mutable_tx()->set_id(TStringType{GetTxId(tx)}); + request.mutable_tx()->set_session(TStringType{GetSessionId(tx)}); for (auto& t : topics) { auto* topic = request.mutable_topics()->Add(); diff --git a/src/client/topic/impl/transaction.cpp b/src/client/topic/impl/transaction.cpp new file mode 100644 index 0000000000..8358bb93b5 --- /dev/null +++ b/src/client/topic/impl/transaction.cpp @@ -0,0 +1,26 @@ +#include "transaction.h" +#include + +namespace NYdb::NTopic { + +TTransactionId MakeTransactionId(const NTable::TTransaction& tx) +{ + return {tx.GetSession().GetId(), tx.GetId()}; +} + +TStatus MakeStatus(EStatus code, NYql::TIssues&& issues) +{ + return {code, std::move(issues)}; +} + +TStatus MakeSessionExpiredError() +{ + return MakeStatus(EStatus::SESSION_EXPIRED, {}); +} + +TStatus MakeCommitTransactionSuccess() +{ + return MakeStatus(EStatus::SUCCESS, {}); +} + +} diff --git a/src/client/topic/impl/transaction.h b/src/client/topic/impl/transaction.h new file mode 100644 index 0000000000..27d9672c49 --- /dev/null +++ b/src/client/topic/impl/transaction.h @@ -0,0 +1,32 @@ +#pragma once + +#include + +namespace NYdb::NTable { + +class TTransaction; + +} + +namespace NYdb::NTopic { + +using TTransactionId = std::pair; + +inline +const std::string& GetSessionId(const TTransactionId& x) +{ + return x.first; +} + +inline +const std::string& GetTxId(const TTransactionId& x) +{ + return x.second; +} + +TTransactionId MakeTransactionId(const NTable::TTransaction& tx); + +TStatus MakeSessionExpiredError(); +TStatus MakeCommitTransactionSuccess(); + +} diff --git a/src/client/topic/impl/write_session_impl.cpp b/src/client/topic/impl/write_session_impl.cpp index 164dfff1a4..cd57cc1d29 100644 --- a/src/client/topic/impl/write_session_impl.cpp +++ b/src/client/topic/impl/write_session_impl.cpp @@ -12,11 +12,10 @@ #include #include - namespace NYdb::NTopic { const TDuration UPDATE_TOKEN_PERIOD = TDuration::Hours(1); -// Error code from file src/api/protos/persqueue_error_codes_v1.proto +// Error code from file ydb/public/api/protos/persqueue_error_codes_v1.proto const uint64_t WRITE_ERROR_PARTITION_INACTIVE = 500029; namespace { @@ -520,30 +519,6 @@ NThreading::TFuture TWriteSessionImpl::WaitEvent() { return EventsQueue->WaitEvent(); } -void TWriteSessionImpl::OnTransactionCommit() -{ -} - -TStatus MakeStatus(EStatus code, NYql::TIssues&& issues) -{ - return {code, std::move(issues)}; -} - -TStatus MakeSessionExpiredError() -{ - return MakeStatus(EStatus::SESSION_EXPIRED, {}); -} - -TStatus MakeCommitTransactionSuccess() -{ - return MakeStatus(EStatus::SUCCESS, {}); -} - -std::pair MakeTransactionId(const TTransaction& tx) -{ - return {tx.GetSession().GetId(), tx.GetId()}; -} - void TWriteSessionImpl::TrySubscribeOnTransactionCommit(TTransaction* tx) { if (!tx) { @@ -563,21 +538,26 @@ void TWriteSessionImpl::TrySubscribeOnTransactionCommit(TTransaction* tx) txInfo->AllAcksReceived = NThreading::NewPromise(); } - auto callback = [txInfo]() { + auto callback = [cbContext = this->SelfContext, txId, txInfo]() { with_lock(txInfo->Lock) { + Y_ABORT_UNLESS(!txInfo->CommitCalled); + txInfo->CommitCalled = true; if (txInfo->WriteCount == txInfo->AckCount) { txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess()); + if (auto self = cbContext->LockShared()) { + self->DeleteTx(txId); + } return txInfo->AllAcksReceived.GetFuture(); } if (txInfo->IsActive) { return txInfo->AllAcksReceived.GetFuture(); } - - return NThreading::MakeFuture(MakeSessionExpiredError()); } + + return NThreading::MakeFuture(MakeSessionExpiredError()); }; tx->AddPrecommitCallback(std::move(callback)); @@ -590,7 +570,7 @@ void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo) auto p = WrittenInTx.find(seqNo); if (p == WrittenInTx.end()) { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "OnAck: seqno=" << seqNo << ", txId=?"); + LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=?"); return; } @@ -601,7 +581,7 @@ void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo) ++txInfo->AckCount; LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=" << txId.second<< ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + LogPrefix() << "OnAck: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); if (txInfo->CommitCalled && (txInfo->WriteCount == txInfo->AckCount)) { txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess()); @@ -624,6 +604,13 @@ auto TWriteSessionImpl::GetOrCreateTxInfo(const TTransactionId& txId) -> TTransa return p->second; } +void TWriteSessionImpl::DeleteTx(const TTransactionId& txId) +{ + with_lock (Lock) { + Txs.erase(txId); + } +} + void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& message) { TInstant createdAtValue = message.CreateTimestamp_.value_or(TInstant::Now()); bool readyToAccept = false; @@ -637,11 +624,13 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess if (message.GetTxPtr()) { const auto& txId = MakeTransactionId(*message.GetTxPtr()); TTransactionInfoPtr txInfo = GetOrCreateTxInfo(txId); - ++txInfo->WriteCount; - WrittenInTx[seqNo] = txId; + with_lock(txInfo->Lock) { + ++txInfo->WriteCount; - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "OnWrite: seqNo=" << seqNo << ", txId=" << txId.second << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, + LogPrefix() << "OnWrite: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount); + } + WrittenInTx[seqNo] = txId; } CurrentBatch.Add( @@ -1705,9 +1694,11 @@ void TWriteSessionImpl::AbortImpl() { void TWriteSessionImpl::CancelTransactions() { for (auto& [_, txInfo] : Txs) { - txInfo->IsActive = false; - if (txInfo->WriteCount != txInfo->AckCount) { - txInfo->AllAcksReceived.SetValue(MakeSessionExpiredError()); + with_lock(txInfo->Lock) { + txInfo->IsActive = false; + if (txInfo->WriteCount != txInfo->AckCount) { + txInfo->AllAcksReceived.SetValue(MakeSessionExpiredError()); + } } } diff --git a/src/client/topic/impl/write_session_impl.h b/src/client/topic/impl/write_session_impl.h index e0260ce95f..0e078b5246 100644 --- a/src/client/topic/impl/write_session_impl.h +++ b/src/client/topic/impl/write_session_impl.h @@ -1,5 +1,7 @@ #pragma once +#include "transaction.h" + #include #include #include @@ -317,7 +319,6 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, ui64 AckCount = 0; }; - using TTransactionId = std::pair; // SessionId, TxId using TTransactionInfoPtr = std::shared_ptr; THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action @@ -428,8 +429,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, void CancelTransactions(); TTransactionInfoPtr GetOrCreateTxInfo(const TTransactionId& txId); void TrySignalAllAcksReceived(ui64 seqNo); - - void OnTransactionCommit(); + void DeleteTx(const TTransactionId& txId); private: TWriteSessionSettings Settings; diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 177c4be250..f4f1c288c8 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -61,7 +61,14 @@ class TFixture : public NUnitTest::TBaseFixture { void StartPartitionSession(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset); void StartPartitionSession(TTopicReadSessionPtr reader, ui64 offset); + struct TReadMessageSettings { + NTable::TTransaction& Tx; + bool CommitOffsets = false; + std::optional Offset; + }; + void ReadMessage(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset); + void ReadMessage(TTopicReadSessionPtr reader, const TReadMessageSettings& settings); void WriteMessage(const TString& message); void WriteMessages(const TVector& messages, @@ -305,8 +312,23 @@ void TFixture::StartPartitionSession(TTopicReadSessionPtr reader, ui64 offset) void TFixture::ReadMessage(TTopicReadSessionPtr reader, NTable::TTransaction& tx, ui64 offset) { - auto event = ReadEvent(reader, tx); - UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), offset); + TReadMessageSettings settings { + .Tx = tx, + .CommitOffsets = false, + .Offset = offset + }; + ReadMessage(reader, settings); +} + +void TFixture::ReadMessage(TTopicReadSessionPtr reader, const TReadMessageSettings& settings) +{ + auto event = ReadEvent(reader, settings.Tx); + if (settings.Offset.has_value()) { + UNIT_ASSERT_VALUES_EQUAL(event.GetMessages()[0].GetOffset(), *settings.Offset); + } + if (settings.CommitOffsets) { + event.Commit(); + } } template @@ -489,9 +511,7 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture) { auto reader = CreateReader(); - StartPartitionSession(reader, tx1, 0); - ReadMessage(reader, tx1, 0); } @@ -500,9 +520,7 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture) { auto reader = CreateReader(); - StartPartitionSession(reader, tx2, 0); - ReadMessage(reader, tx2, 0); } @@ -510,6 +528,16 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture) CommitTx(tx1, EStatus::ABORTED); } +Y_UNIT_TEST_F(Offsets_Cannot_Be_Promoted_When_Reading_In_A_Transaction, TFixture) +{ + WriteMessage("message"); + auto session = CreateTableSession(); + auto tx = BeginTx(session); + auto reader = CreateReader(); + StartPartitionSession(reader, tx, 0); + UNIT_ASSERT_EXCEPTION(ReadMessage(reader, {.Tx = tx, .CommitOffsets = true}), yexception); +} + //Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture) //{ // WriteToTopicWithInvalidTxId(false); @@ -1948,10 +1976,12 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture) auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0); UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); + WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0); CommitTx(tx, EStatus::SUCCESS); diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 34de5301c8..680ec178b6 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -136,3 +136,9 @@ TTopicClient TTopicSdkTestSetup::MakeClient() const { return TTopicClient(MakeDriver()); } + +NYdb::NTable::TTableClient TTopicSdkTestSetup::MakeTableClient() const +{ + return NYdb::NTable::TTableClient(MakeDriver(), NYdb::NTable::TClientSettings() + .UseQueryCache(false)); +} diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index 3b151d09f2..65050aabe7 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -33,6 +33,7 @@ class TTopicSdkTestSetup { TLog& GetLog(); TTopicClient MakeClient() const; + NYdb::NTable::TTableClient MakeTableClient() const; TDriver MakeDriver() const; TDriver MakeDriver(const TDriverConfig& config) const; diff --git a/src/library/json_value/ydb_json_value.cpp b/src/library/json_value/ydb_json_value.cpp index e4f50d1f43..b5315cba3d 100644 --- a/src/library/json_value/ydb_json_value.cpp +++ b/src/library/json_value/ydb_json_value.cpp @@ -676,7 +676,7 @@ namespace { case TTypeParser::ETypeKind::Decimal: EnsureType(jsonValue, NJson::JSON_STRING); - ValueBuilder.Decimal(std::string(jsonValue.GetString())); + ValueBuilder.Decimal(TDecimalValue(std::string{jsonValue.GetString()}, 22, 9)); break; case TTypeParser::ETypeKind::Pg: diff --git a/src/library/uuid/uuid.cpp b/src/library/uuid/uuid.cpp index 6177535dc0..a6a3cda99b 100644 --- a/src/library/uuid/uuid.cpp +++ b/src/library/uuid/uuid.cpp @@ -57,6 +57,16 @@ void UuidBytesToString(const std::string& in, IOutputStream& out) { NUuid::UuidToString(dw, out); } +void UuidHalfsToString(ui64 low, ui64 hi, IOutputStream& out) { + union { + ui16 dw[8]; + ui64 half[2]; + } buf; + buf.half[0] = low; + buf.half[1] = hi; + NUuid::UuidToString(buf.dw, out); +} + void UuidHalfsToByteString(ui64 low, ui64 hi, IOutputStream& out) { union { char bytes[16]; diff --git a/src/library/uuid/uuid.h b/src/library/uuid/uuid.h index 8a8d1faa8a..c8eb53205f 100644 --- a/src/library/uuid/uuid.h +++ b/src/library/uuid/uuid.h @@ -15,6 +15,7 @@ static constexpr ui32 UUID_LEN = 16; std::string UuidBytesToString(const std::string& in); void UuidBytesToString(const std::string& in, IOutputStream& out); +void UuidHalfsToString(ui64 low, ui64 hi, IOutputStream& out); void UuidToString(ui16 dw[8], IOutputStream& out); void UuidHalfsToByteString(ui64 low, ui64 hi, IOutputStream& out); diff --git a/src/library/yql_common/decimal/yql_decimal.cpp b/src/library/yql_common/decimal/yql_decimal.cpp index 34360e60d3..b2829e09ac 100644 --- a/src/library/yql_common/decimal/yql_decimal.cpp +++ b/src/library/yql_common/decimal/yql_decimal.cpp @@ -17,11 +17,11 @@ TUint128 GetDivider(ui8 scale) { } bool IsError(TInt128 v) { - return v > Nan() || v < -Nan(); + return v > Nan() || v < -Inf(); } bool IsNan(TInt128 v) { - return v == Nan() || v == -Nan(); + return v == Nan(); } bool IsInf(TInt128 v) { @@ -47,8 +47,6 @@ const char* ToString(TInt128 val, ui8 precision, ui8 scale) { return "-inf"; if (val == Nan()) return "nan"; - if (val == -Nan()) - return "-nan"; if (!IsNormal(val)) { return nullptr; diff --git a/tests/unit/client/CMakeLists.txt b/tests/unit/client/CMakeLists.txt index db7c5c7f7f..7633d3f609 100644 --- a/tests/unit/client/CMakeLists.txt +++ b/tests/unit/client/CMakeLists.txt @@ -1,3 +1,5 @@ +add_subdirectory(draft/helpers) + add_ydb_test(NAME client-ydb_coordination_ut SOURCES coordination/coordination_ut.cpp @@ -22,6 +24,22 @@ add_ydb_test(NAME client-extensions-discovery_mutator_ut unit ) +add_ydb_test(NAME client-draft_ut + SOURCES + draft/ydb_scripting_response_headers_ut.cpp + draft/ydb_view_ut.cpp + LINK_LIBRARIES + yutil + cpp-testing-unittest_main + YDB-CPP-SDK::Draft + LABELS + unit +) +target_link_libraries(client-draft_ut + PRIVATE + client-draft_ut_helpers +) + add_ydb_test(NAME client-ydb_driver_ut SOURCES driver/driver_ut.cpp @@ -99,14 +117,3 @@ add_ydb_test(NAME client-ydb_value_ut LABELS unit ) - -add_ydb_test(NAME client-draft_ut - SOURCES - scripting/ydb_scripting_response_headers_ut.cpp - LINK_LIBRARIES - yutil - cpp-testing-unittest_main - YDB-CPP-SDK::Draft - LABELS - unit -) diff --git a/tests/unit/client/draft/helpers/CMakeLists.txt b/tests/unit/client/draft/helpers/CMakeLists.txt new file mode 100644 index 0000000000..28133bff0e --- /dev/null +++ b/tests/unit/client/draft/helpers/CMakeLists.txt @@ -0,0 +1,13 @@ +_ydb_sdk_add_library(client-draft_ut_helpers) + +target_link_libraries(client-draft_ut_helpers + PRIVATE + api-grpc + api-grpc-draft +) + +target_sources(client-draft_ut_helpers + PRIVATE + grpc_services/scripting.cpp + grpc_services/view.cpp +) diff --git a/tests/unit/client/draft/helpers/grpc_server.h b/tests/unit/client/draft/helpers/grpc_server.h new file mode 100644 index 0000000000..c788452cf9 --- /dev/null +++ b/tests/unit/client/draft/helpers/grpc_server.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +#include + +namespace NYdb { + +template +std::unique_ptr StartGrpcServer(const std::string& address, TService& service) { + grpc::ServerBuilder builder; + builder.AddListeningPort(TStringType{address}, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + return builder.BuildAndStart(); +} + +} diff --git a/tests/unit/client/draft/helpers/grpc_services/scripting.cpp b/tests/unit/client/draft/helpers/grpc_services/scripting.cpp new file mode 100644 index 0000000000..1b172e179a --- /dev/null +++ b/tests/unit/client/draft/helpers/grpc_services/scripting.cpp @@ -0,0 +1,21 @@ +#include "scripting.h" + +namespace NYdb::NScripting { + +grpc::Status TMockSlyDbProxy::ExecuteYql( + grpc::ServerContext* context, + [[maybe_unused]] const Ydb::Scripting::ExecuteYqlRequest* request, + Ydb::Scripting::ExecuteYqlResponse* response +) { + context->AddInitialMetadata("key", "value"); + + // Just to make sdk core happy + auto* op = response->mutable_operation(); + op->set_ready(true); + op->set_status(Ydb::StatusIds::SUCCESS); + op->mutable_result(); + + return grpc::Status::OK; +} + +} diff --git a/tests/unit/client/draft/helpers/grpc_services/scripting.h b/tests/unit/client/draft/helpers/grpc_services/scripting.h new file mode 100644 index 0000000000..3e46a186f1 --- /dev/null +++ b/tests/unit/client/draft/helpers/grpc_services/scripting.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +namespace NYdb::NScripting { + +class TMockSlyDbProxy : public Ydb::Scripting::V1::ScriptingService::Service +{ +public: + grpc::Status ExecuteYql( + grpc::ServerContext* context, + const Ydb::Scripting::ExecuteYqlRequest* request, + Ydb::Scripting::ExecuteYqlResponse* response) override; +}; + +} diff --git a/tests/unit/client/draft/helpers/grpc_services/view.cpp b/tests/unit/client/draft/helpers/grpc_services/view.cpp new file mode 100644 index 0000000000..3614fb5e4a --- /dev/null +++ b/tests/unit/client/draft/helpers/grpc_services/view.cpp @@ -0,0 +1,19 @@ +#include "view.h" + +namespace NYdb::NView { + +grpc::Status TViewDummyService::DescribeView( + [[maybe_unused]] grpc::ServerContext* context, + [[maybe_unused]] const Ydb::View::DescribeViewRequest* request, + Ydb::View::DescribeViewResponse* response +) { + auto* op = response->mutable_operation(); + op->set_ready(true); + op->set_status(Ydb::StatusIds::SUCCESS); + Ydb::View::DescribeViewResult describeResult; + describeResult.set_query_text(DummyQueryText); + op->mutable_result()->PackFrom(describeResult); + return grpc::Status::OK; +} + +} diff --git a/tests/unit/client/draft/helpers/grpc_services/view.h b/tests/unit/client/draft/helpers/grpc_services/view.h new file mode 100644 index 0000000000..20a66e597f --- /dev/null +++ b/tests/unit/client/draft/helpers/grpc_services/view.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +namespace NYdb::NView { + +constexpr const char* DummyQueryText = "select 42"; + +class TViewDummyService : public Ydb::View::V1::ViewService::Service +{ +public: + grpc::Status DescribeView( + grpc::ServerContext* context, + const Ydb::View::DescribeViewRequest* request, + Ydb::View::DescribeViewResponse* response) override; +}; + +} diff --git a/tests/unit/client/draft/ydb_scripting_response_headers_ut.cpp b/tests/unit/client/draft/ydb_scripting_response_headers_ut.cpp new file mode 100644 index 0000000000..9e350d1984 --- /dev/null +++ b/tests/unit/client/draft/ydb_scripting_response_headers_ut.cpp @@ -0,0 +1,30 @@ +#include "helpers/grpc_server.h" +#include "helpers/grpc_services/scripting.h" + +#include + +#include + +using namespace NYdb; +using namespace NYdb::NScripting; + +Y_UNIT_TEST_SUITE(ResponseHeaders) { + Y_UNIT_TEST(PassHeader) { + TMockSlyDbProxy slyDbProxy; + + std::string addr = "localhost:10000"; + + auto server = StartGrpcServer(addr, slyDbProxy); + + auto config = TDriverConfig() + .SetEndpoint(addr); + TDriver driver(config); + TScriptingClient client(driver); + + auto result = client.ExecuteYqlScript("SMTH").GetValueSync(); + auto metadata = result.GetResponseMetadata(); + + UNIT_ASSERT(metadata.find("key") != metadata.end()); + UNIT_ASSERT_VALUES_EQUAL(metadata.find("key")->second, "value"); + } +} diff --git a/tests/unit/client/draft/ydb_view_ut.cpp b/tests/unit/client/draft/ydb_view_ut.cpp new file mode 100644 index 0000000000..af7f058b36 --- /dev/null +++ b/tests/unit/client/draft/ydb_view_ut.cpp @@ -0,0 +1,28 @@ +#include "helpers/grpc_server.h" +#include "helpers/grpc_services/view.h" + +#include + +#include + +using namespace NYdb; +using namespace NYdb::NView; + +Y_UNIT_TEST_SUITE(ViewClient) { + Y_UNIT_TEST(Basic) { + TString addr = "localhost:2135"; + TViewDummyService viewService; + + auto server = StartGrpcServer(addr, viewService); + + auto config = TDriverConfig().SetEndpoint(addr); + TDriver driver(config); + TViewClient client(driver); + + auto result = client.DescribeView("any").ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + auto queryText = result.GetViewDescription().GetQueryText(); + UNIT_ASSERT_STRINGS_EQUAL(queryText, DummyQueryText); + } +} diff --git a/tests/unit/client/scripting/ydb_scripting_response_headers_ut.cpp b/tests/unit/client/scripting/ydb_scripting_response_headers_ut.cpp deleted file mode 100644 index ea614c7cf5..0000000000 --- a/tests/unit/client/scripting/ydb_scripting_response_headers_ut.cpp +++ /dev/null @@ -1,68 +0,0 @@ -#include -#include - -#include -#include - -#include -#include -#include - -#include -#include - -using namespace NYdb; -using namespace NYdb::NScripting; - -namespace { - -template -std::unique_ptr StartGrpcServer(const std::string& address, TService& service) { - grpc::ServerBuilder builder; - builder.AddListeningPort(TStringType{address}, grpc::InsecureServerCredentials()); - builder.RegisterService(&service); - return builder.BuildAndStart(); -} - -class TMockSlyDbProxy : public Ydb::Scripting::V1::ScriptingService::Service -{ -public: - grpc::Status ExecuteYql( - grpc::ServerContext* context, - const Ydb::Scripting::ExecuteYqlRequest* request, - Ydb::Scripting::ExecuteYqlResponse* response) override { - context->AddInitialMetadata("key", "value"); - Y_UNUSED(request); - - // Just to make sdk core happy - auto* op = response->mutable_operation(); - op->set_ready(true); - op->set_status(Ydb::StatusIds::SUCCESS); - op->mutable_result(); - - return grpc::Status::OK; - } -}; - -} - -Y_UNIT_TEST_SUITE(ResponseHeaders) { - Y_UNIT_TEST(PassHeader) { - TMockSlyDbProxy slyDbProxy; - - std::string addr = "localhost:10000"; - - auto server = StartGrpcServer(addr, slyDbProxy); - - auto config = TDriverConfig() - .SetEndpoint(addr); - TDriver driver(config); - TScriptingClient client(driver); - - auto result = client.ExecuteYqlScript("SMTH").GetValueSync(); - auto metadata = result.GetResponseMetadata(); - - UNIT_ASSERT(metadata.find("key") != metadata.end()); - UNIT_ASSERT_VALUES_EQUAL(metadata.find("key")->second, "value"); - } -} diff --git a/tests/unit/library/yql_common/decimal/yql_decimal_ut.cpp b/tests/unit/library/yql_common/decimal/yql_decimal_ut.cpp index ec08e5d055..2fec102f61 100644 --- a/tests/unit/library/yql_common/decimal/yql_decimal_ut.cpp +++ b/tests/unit/library/yql_common/decimal/yql_decimal_ut.cpp @@ -230,19 +230,16 @@ Y_UNIT_TEST_SUITE(TYqlDecimalTest) { } Y_UNIT_TEST(TestSpecialAsString) { - UNIT_ASSERT(IsValid("+Nan")); - UNIT_ASSERT(IsValid("-nAn")); + UNIT_ASSERT(IsValid("Nan")); UNIT_ASSERT(IsValid("INF")); UNIT_ASSERT(IsValid("-inf")); - UNIT_ASSERT_VALUES_EQUAL(ToString(+Nan(), 10, 2), "nan"); - UNIT_ASSERT_VALUES_EQUAL(ToString(-Nan(), 10, 2), "-nan"); + UNIT_ASSERT_VALUES_EQUAL(ToString(Nan(), 10, 2), "nan"); UNIT_ASSERT_VALUES_EQUAL(ToString(+Inf(), 10, 2), "inf"); UNIT_ASSERT_VALUES_EQUAL(ToString(-Inf(), 10, 2), "-inf"); UNIT_ASSERT(IsNan(FromString("nan", 10, 2))); - UNIT_ASSERT(IsNan(FromString("-nAN", 12, 7))); UNIT_ASSERT(IsInf(FromString("+INf", MaxPrecision, 6))); UNIT_ASSERT(IsInf(FromString("-inF", 4, 2))); }