diff --git a/include/ydb-cpp-sdk/client/driver/driver.h b/include/ydb-cpp-sdk/client/driver/driver.h index 8d9342aac6..a2fa24ca62 100644 --- a/include/ydb-cpp-sdk/client/driver/driver.h +++ b/include/ydb-cpp-sdk/client/driver/driver.h @@ -1,5 +1,7 @@ #pragma once +#include "fwd.h" + #include #include #include @@ -13,11 +15,8 @@ namespace NYdb::inline V3 { -class TDriver; class TGRpcConnectionsImpl; -//////////////////////////////////////////////////////////////////////////////// - //! Represents configuration of YDB driver class TDriverConfig { friend class TDriver; diff --git a/include/ydb-cpp-sdk/client/driver/fwd.h b/include/ydb-cpp-sdk/client/driver/fwd.h new file mode 100644 index 0000000000..6d0a1c888d --- /dev/null +++ b/include/ydb-cpp-sdk/client/driver/fwd.h @@ -0,0 +1,8 @@ +#pragma once + +namespace NYdb::inline V3 { + +class TDriver; +class TDriverConfig; + +} // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/params/fwd.h b/include/ydb-cpp-sdk/client/params/fwd.h new file mode 100644 index 0000000000..4d6af5b082 --- /dev/null +++ b/include/ydb-cpp-sdk/client/params/fwd.h @@ -0,0 +1,9 @@ +#pragma once + +namespace NYdb::inline V3 { + +class TParams; +class TParamValueBuilder; +class TParamsBuilder; + +} // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/params/params.h b/include/ydb-cpp-sdk/client/params/params.h index 1092c2bb88..89a30d0ce6 100644 --- a/include/ydb-cpp-sdk/client/params/params.h +++ b/include/ydb-cpp-sdk/client/params/params.h @@ -1,5 +1,7 @@ #pragma once +#include "fwd.h" + #include #include @@ -32,8 +34,6 @@ namespace NQuery { class TQueryClient; } -class TParamsBuilder; - class TParams { friend class TParamsBuilder; friend class NTable::TTableClient; diff --git a/include/ydb-cpp-sdk/client/query/client.h b/include/ydb-cpp-sdk/client/query/client.h index d608b18f67..980bf2b079 100644 --- a/include/ydb-cpp-sdk/client/query/client.h +++ b/include/ydb-cpp-sdk/client/query/client.h @@ -1,5 +1,7 @@ #pragma once +#include "fwd.h" + #include "query.h" #include "tx.h" @@ -27,7 +29,6 @@ struct TCreateSessionSettings : public TSimpleRequestSettings; using TRetryOperationSettings = NYdb::NRetry::TRetryOperationSettings; @@ -55,7 +56,6 @@ struct TClientSettings : public TCommonClientSettingsBase { // ! This API is currently in experimental state and is a subject for changes. // ! No backward and/or forward compatibility guarantees are provided. // ! DO NOT USE for production workloads. -class TSession; class TQueryClient { friend class TSession; friend class NRetry::Async::TRetryContext; @@ -126,7 +126,6 @@ class TQueryClient { std::shared_ptr Impl_; }; -class TTransaction; class TSession { friend class TQueryClient; friend class TTransaction; diff --git a/include/ydb-cpp-sdk/client/query/fwd.h b/include/ydb-cpp-sdk/client/query/fwd.h new file mode 100644 index 0000000000..08b7f4e675 --- /dev/null +++ b/include/ydb-cpp-sdk/client/query/fwd.h @@ -0,0 +1,37 @@ +#pragma once + +namespace NYdb::inline V3::NQuery { + +struct TClientSettings; +struct TSessionPoolSettings; +struct TCreateSessionSettings; +struct TExecuteQuerySettings; +struct TBeginTxSettings; +struct TCommitTxSettings; +struct TRollbackTxSettings; +struct TExecuteScriptSettings; +struct TFetchScriptResultsSettings; +struct TTxOnlineSettings; +struct TTxSettings; + +class TQueryClient; +class TSession; + +class TCreateSessionResult; +class TBeginTransactionResult; +class TExecuteQueryResult; +class TCommitTransactionResult; +class TFetchScriptResultsResult; + +class TExecuteQueryPart; +class TExecuteQueryIterator; + +class TTransaction; +struct TTxControl; + +class TQueryContent; +class TResultSetMeta; +class TScriptExecutionOperation; +class TExecStats; + +} // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/query/query.h b/include/ydb-cpp-sdk/client/query/query.h index 6ecc64393d..4e3616a436 100644 --- a/include/ydb-cpp-sdk/client/query/query.h +++ b/include/ydb-cpp-sdk/client/query/query.h @@ -1,5 +1,7 @@ #pragma once +#include "fwd.h" + #include "stats.h" #include @@ -46,7 +48,6 @@ enum class EExecStatus { Failed = 50, }; -class TExecuteQueryPart; using TAsyncExecuteQueryPart = NThreading::TFuture; class TExecuteQueryIterator : public TStatus { @@ -88,8 +89,6 @@ class TCommitTransactionResult : public TStatus { TCommitTransactionResult(TStatus&& status); }; -class TBeginTransactionResult; - using TAsyncBeginTransactionResult = NThreading::TFuture; using TAsyncCommitTransactionResult = NThreading::TFuture; @@ -182,7 +181,6 @@ class TFetchScriptResultsResult : public TStatus { std::string NextFetchToken_; }; -class TExecuteQueryResult; using TAsyncFetchScriptResultsResult = NThreading::TFuture; using TAsyncExecuteQueryResult = NThreading::TFuture; diff --git a/include/ydb-cpp-sdk/client/query/stats.h b/include/ydb-cpp-sdk/client/query/stats.h index 92856ba1eb..902478b446 100644 --- a/include/ydb-cpp-sdk/client/query/stats.h +++ b/include/ydb-cpp-sdk/client/query/stats.h @@ -6,8 +6,6 @@ #include #include -class TDuration; - namespace Ydb::TableStats { class QueryStats; } diff --git a/include/ydb-cpp-sdk/client/result/fwd.h b/include/ydb-cpp-sdk/client/result/fwd.h new file mode 100644 index 0000000000..98f7ba8cba --- /dev/null +++ b/include/ydb-cpp-sdk/client/result/fwd.h @@ -0,0 +1,8 @@ +#pragma once + +namespace NYdb::inline V3 { + +class TResultSet; +class TResultSetParser; + +} // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/result/result.h b/include/ydb-cpp-sdk/client/result/result.h index d7caa0249e..4fe057b663 100644 --- a/include/ydb-cpp-sdk/client/result/result.h +++ b/include/ydb-cpp-sdk/client/result/result.h @@ -1,5 +1,7 @@ #pragma once +#include "fwd.h" + #include #include diff --git a/include/ydb-cpp-sdk/client/table/fwd.h b/include/ydb-cpp-sdk/client/table/fwd.h new file mode 100644 index 0000000000..04c947ef6b --- /dev/null +++ b/include/ydb-cpp-sdk/client/table/fwd.h @@ -0,0 +1,116 @@ +#pragma once + +namespace NYdb::inline V3::NTable { + +class TKeyBound; +class TKeyRange; + +struct TTableColumn; +struct TAlterTableColumn; + +struct TPartitionStats; + +struct TSequenceDescription; +class TChangefeedDescription; +class TIndexDescription; +class TColumnFamilyDescription; +class TTableDescription; + +struct TExplicitPartitions; + +struct TRenameIndex; + +struct TGlobalIndexSettings; +struct TVectorIndexSettings; +struct TKMeansTreeSettings; +struct TCreateSessionSettings; +struct TSessionPoolSettings; +struct TClientSettings; +struct TBulkUpsertSettings; +struct TReadRowsSettings; +struct TStreamExecScanQuerySettings; +struct TTxOnlineSettings; +struct TCreateTableSettings; +struct TDropTableSettings; +struct TAlterTableSettings; +struct TCopyTableSettings; +struct TCopyTablesSettings; +struct TRenameTablesSettings; +struct TDescribeTableSettings; +struct TExplainDataQuerySettings; +struct TPrepareDataQuerySettings; +struct TExecDataQuerySettings; +struct TExecSchemeQuerySettings; +struct TBeginTxSettings; +struct TCommitTxSettings; +struct TRollbackTxSettings; +struct TCloseSessionSettings; +struct TKeepAliveSettings; +struct TReadTableSettings; + +class TPartitioningSettings; +class TDateTypeColumnModeSettings; +class TValueSinceUnixEpochModeSettings; +class TTtlTierSettings; +class TTtlSettings; +class TAlterTtlSettings; +class TStorageSettings; +class TReadReplicasSettings; +class TTxSettings; + +struct TColumnFamilyPolicy; +struct TStoragePolicy; +struct TPartitioningPolicy; +struct TReplicationPolicy; + +class TBuildIndexOperation; + +class TTtlDeleteAction; +class TTtlEvictToExternalStorageAction; + +class TStorageSettingsBuilder; +class TPartitioningSettingsBuilder; +class TColumnFamilyBuilder; +class TTableStorageSettingsBuilder; +class TTableColumnFamilyBuilder; +class TTablePartitioningSettingsBuilder; +class TTableBuilder; +class TAlterStorageSettingsBuilder; +class TAlterColumnFamilyBuilder; +class TAlterTtlSettingsBuilder; +class TAlterAttributesBuilder; +class TAlterPartitioningSettingsBuilder; + +class TPrepareQueryResult; +class TExplainQueryResult; +class TDescribeTableResult; +class TDataQueryResult; +class TBeginTransactionResult; +class TCommitTransactionResult; +class TCreateSessionResult; +class TKeepAliveResult; +class TBulkUpsertResult; +class TReadRowsResult; + +template +class TSimpleStreamPart; + +class TScanQueryPart; + +class TTablePartIterator; +class TScanQueryPartIterator; + +class TReadTableSnapshot; + +class TCopyItem; +class TRenameItem; + +class TDataQuery; + +class TTransaction; +class TTxControl; + +class TSession; +class TTableClient; + +} // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index 3df814b213..98a15f9284 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -1,5 +1,7 @@ #pragma once +#include "fwd.h" + #include "table_enum.h" #include @@ -643,9 +645,6 @@ class TReadReplicasSettings { uint64_t ReadReplicasCount_; }; -struct TExplicitPartitions; -struct TDescribeTableSettings; - enum class EStoreType { Row = 0, Column = 1 @@ -826,8 +825,6 @@ class TColumnFamilyBuilder { //////////////////////////////////////////////////////////////////////////////// -class TTableBuilder; - class TTableStorageSettingsBuilder { public: explicit TTableStorageSettingsBuilder(TTableBuilder& parent) @@ -1071,19 +1068,6 @@ class TRenameItem { //////////////////////////////////////////////////////////////////////////////// -class TCreateSessionResult; -class TDataQueryResult; -class TTablePartIterator; -class TPrepareQueryResult; -class TExplainQueryResult; -class TDescribeTableResult; -class TBeginTransactionResult; -class TCommitTransactionResult; -class TKeepAliveResult; -class TBulkUpsertResult; -class TReadRowsResult; -class TScanQueryPartIterator; - using TAsyncCreateSessionResult = NThreading::TFuture; using TAsyncDataQueryResult = NThreading::TFuture; using TAsyncPrepareQueryResult = NThreading::TFuture; @@ -1177,9 +1161,6 @@ struct TStreamExecScanQuerySettings : public TRequestSettings //////////////////////////////////////////////////////////////////////////////// -struct TAlterTableSettings; - class TAlterStorageSettingsBuilder { public: explicit TAlterStorageSettingsBuilder(TAlterTableSettings& parent) diff --git a/include/ydb-cpp-sdk/client/types/credentials/credentials.h b/include/ydb-cpp-sdk/client/types/credentials/credentials.h index a70dbeeb0b..75d51f4654 100644 --- a/include/ydb-cpp-sdk/client/types/credentials/credentials.h +++ b/include/ydb-cpp-sdk/client/types/credentials/credentials.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include diff --git a/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/credentials.h b/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/credentials.h index e0593ab06e..78be18a802 100644 --- a/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/credentials.h +++ b/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/credentials.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include diff --git a/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/from_file.h b/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/from_file.h index 621425d47e..242da20a88 100644 --- a/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/from_file.h +++ b/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/from_file.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include diff --git a/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/jwt_token_source.h b/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/jwt_token_source.h index 0660708e72..99a485acb5 100644 --- a/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/jwt_token_source.h +++ b/include/ydb-cpp-sdk/client/types/credentials/oauth2_token_exchange/jwt_token_source.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "credentials.h" #include diff --git a/include/ydb-cpp-sdk/client/types/exceptions/exceptions.h b/include/ydb-cpp-sdk/client/types/exceptions/exceptions.h index b296db2429..bd96f8b530 100644 --- a/include/ydb-cpp-sdk/client/types/exceptions/exceptions.h +++ b/include/ydb-cpp-sdk/client/types/exceptions/exceptions.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace NYdb::inline V3 { diff --git a/include/ydb-cpp-sdk/client/types/fwd.h b/include/ydb-cpp-sdk/client/types/fwd.h new file mode 100644 index 0000000000..6168920659 --- /dev/null +++ b/include/ydb-cpp-sdk/client/types/fwd.h @@ -0,0 +1,34 @@ +#pragma once + +namespace NYdb::inline V3 { + +template +struct TRequestSettings; + +template +struct TSimpleRequestSettings; + +template +struct TOperationRequestSettings; + +template +struct TS3Settings; + +class TStatus; +class TStreamPartStatus; + +class TOperation; + +class TYdbException; +class TContractViolation; + +class ICredentialsProvider; +class ICredentialsProviderFactory; + +class ITokenSource; + +namespace NStatusHelpers { +class TYdbErrorException; +} + +} // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/types/operation/operation.h b/include/ydb-cpp-sdk/client/types/operation/operation.h index 893a5f4ebc..2909ffbaca 100644 --- a/include/ydb-cpp-sdk/client/types/operation/operation.h +++ b/include/ydb-cpp-sdk/client/types/operation/operation.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include @@ -18,8 +20,6 @@ class Operation; namespace NYdb::inline V3 { -class TStatus; - class TOperation { public: using TOperationId = NKikimr::NOperationId::TOperationId; diff --git a/include/ydb-cpp-sdk/client/types/request_settings.h b/include/ydb-cpp-sdk/client/types/request_settings.h index d62730bcb9..2f2d4dca8a 100644 --- a/include/ydb-cpp-sdk/client/types/request_settings.h +++ b/include/ydb-cpp-sdk/client/types/request_settings.h @@ -1,5 +1,7 @@ #pragma once +#include "fwd.h" + #include "fluent_settings_helpers.h" #include diff --git a/include/ydb-cpp-sdk/client/types/s3_settings.h b/include/ydb-cpp-sdk/client/types/s3_settings.h index 6f5430a5fe..8079a7eee9 100644 --- a/include/ydb-cpp-sdk/client/types/s3_settings.h +++ b/include/ydb-cpp-sdk/client/types/s3_settings.h @@ -1,5 +1,6 @@ #pragma once +#include "fwd.h" #include "fluent_settings_helpers.h" #include diff --git a/include/ydb-cpp-sdk/client/types/status/status.h b/include/ydb-cpp-sdk/client/types/status/status.h index 41af8ffb66..09ab1a6699 100644 --- a/include/ydb-cpp-sdk/client/types/status/status.h +++ b/include/ydb-cpp-sdk/client/types/status/status.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include diff --git a/include/ydb-cpp-sdk/client/types/status_codes.h b/include/ydb-cpp-sdk/client/types/status_codes.h index 15aedc57b7..e6047b7db6 100644 --- a/include/ydb-cpp-sdk/client/types/status_codes.h +++ b/include/ydb-cpp-sdk/client/types/status_codes.h @@ -1,5 +1,7 @@ #pragma once +#include "fwd.h" + #include namespace NYdb::inline V3 { diff --git a/include/ydb-cpp-sdk/client/types/ydb.h b/include/ydb-cpp-sdk/client/types/ydb.h index e0c85f8b65..3957f1fedc 100644 --- a/include/ydb-cpp-sdk/client/types/ydb.h +++ b/include/ydb-cpp-sdk/client/types/ydb.h @@ -1,5 +1,6 @@ #pragma once +#include "fwd.h" #include "status_codes.h" namespace NYdb::inline V3 { diff --git a/include/ydb-cpp-sdk/client/value/fwd.h b/include/ydb-cpp-sdk/client/value/fwd.h new file mode 100644 index 0000000000..41fcc5635f --- /dev/null +++ b/include/ydb-cpp-sdk/client/value/fwd.h @@ -0,0 +1,15 @@ +#pragma once + +namespace NYdb::inline V3 { + +class TType; +class TTypeParser; +class TTypeBuilder; +class TValue; +class TValueParser; +class TValueBuilder; + +template +class TValueBuilderBase; + +} // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/value/value.h b/include/ydb-cpp-sdk/client/value/value.h index f198d54f68..3c2787090c 100644 --- a/include/ydb-cpp-sdk/client/value/value.h +++ b/include/ydb-cpp-sdk/client/value/value.h @@ -1,5 +1,7 @@ #pragma once +#include "fwd.h" + #include #include diff --git a/src/api/grpc/draft/ydb_ymq_v1.proto b/src/api/grpc/draft/ydb_ymq_v1.proto index 3080f38db7..76652907fd 100644 --- a/src/api/grpc/draft/ydb_ymq_v1.proto +++ b/src/api/grpc/draft/ydb_ymq_v1.proto @@ -25,4 +25,7 @@ service YmqService { rpc YmqDeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse); rpc YmqChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse); rpc YmqListDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest) returns (ListDeadLetterSourceQueuesResponse); + rpc YmqListQueueTags(ListQueueTagsRequest) returns (ListQueueTagsResponse); + rpc YmqTagQueue(TagQueueRequest) returns (TagQueueResponse); + rpc YmqUntagQueue(UntagQueueRequest) returns (UntagQueueResponse); } diff --git a/src/api/protos/draft/ymq.proto b/src/api/protos/draft/ymq.proto index 975f6c68f1..429440121d 100644 --- a/src/api/protos/draft/ymq.proto +++ b/src/api/protos/draft/ymq.proto @@ -298,3 +298,47 @@ message ListDeadLetterSourceQueuesResult { string next_token = 1; repeated string queue_urls = 2; } + + +message ListQueueTagsRequest { + Ydb.Operations.OperationParams operation_params = 1; + string queue_url = 2; +} + +message ListQueueTagsResponse { + Ydb.Operations.Operation operation = 1; +} + +message ListQueueTagsResult { + map tags = 1; +} + +message TagQueueRequest { + Ydb.Operations.OperationParams operation_params = 1; + map tags = 2; + string queue_url = 3; +} + +message TagQueueResponse { + Ydb.Operations.Operation operation = 1; +} + +message TagQueueResult { +} + +message UntagQueueRequest { + Ydb.Operations.OperationParams operation_params = 1; + repeated string tag_keys = 2; + string queue_url = 3; +} + +message UntagQueueResponse { + Ydb.Operations.Operation operation = 1; +} + +message UntagQueueResult { +} + +message QueueTags { + map Tags = 1; +} diff --git a/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp index 299f3f0245..c3f673e53e 100644 --- a/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +++ b/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp @@ -38,6 +38,7 @@ TKqpSessionCommon::TKqpSessionCommon( , State_(S_STANDALONE) , TimeToTouch_(TInstant::Now()) , TimeInPast_(TInstant::Now()) + , CloseHandler_(nullptr) , NeedUpdateActiveCounter_(false) {} @@ -114,7 +115,7 @@ void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval, if (updateTimeInPast) { TimeInPast_ = now; } - TimeToTouch_ = now + interval; + TimeToTouch_.store(now + interval, std::memory_order_relaxed); } void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval, @@ -124,11 +125,11 @@ void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval, if (updateTimeInPast) { TimeInPast_ = now; } - TimeToTouch_ = now + interval; + TimeToTouch_.store(now + interval, std::memory_order_relaxed); } TInstant TKqpSessionCommon::GetTimeToTouchFast() const { - return TimeToTouch_; + return TimeToTouch_.load(std::memory_order_relaxed); } TInstant TKqpSessionCommon::GetTimeInPastFast() const { @@ -144,6 +145,24 @@ TDuration TKqpSessionCommon::GetTimeInterval() const { return TimeInterval_; } +void TKqpSessionCommon::UpdateServerCloseHandler(IServerCloseHandler* handler) { + CloseHandler_.store(handler); +} + +void TKqpSessionCommon::CloseFromServer(std::weak_ptr client) noexcept { + auto strong = client.lock(); + if (!strong) { + // Session closed on the server after stopping client - do nothing + // moreover pool maybe destoyed now + return; + } + + IServerCloseHandler* h = CloseHandler_.load(); + if (h) { + h->OnCloseSession(this, strong); + } +} + //////////////////////////////////////////////////////////////////////////////// std::function TKqpSessionCommon::GetSmartDeleter( diff --git a/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h index 6256c23e7e..dac45a56bc 100644 --- a/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h +++ b/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h @@ -13,6 +13,15 @@ namespace NYdb::inline V3 { //////////////////////////////////////////////////////////////////////////////// ui64 GetNodeIdFromSession(const std::string& sessionId); +class TKqpSessionCommon; + +class IServerCloseHandler { +public: + virtual ~IServerCloseHandler() = default; + // called when session should be closed by server signal + virtual void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr) = 0; +}; + class TKqpSessionCommon : public TEndpointObj { public: TKqpSessionCommon(const std::string& sessionId, const std::string& endpoint, @@ -54,6 +63,12 @@ class TKqpSessionCommon : public TEndpointObj { static std::function GetSmartDeleter(std::shared_ptr client); + // Shoult be called under session pool lock + void UpdateServerCloseHandler(IServerCloseHandler*); + + // Called asynchronously from grpc thread. + void CloseFromServer(std::weak_ptr client) noexcept; + protected: TAdaptiveLock Lock_; @@ -63,10 +78,14 @@ class TKqpSessionCommon : public TEndpointObj { const bool IsOwnedBySessionPool_; EState State_; - TInstant TimeToTouch_; + // This time is used during async close session handling which does not lock the session + // so we need to be able to read this value atomicaly + std::atomic TimeToTouch_; TInstant TimeInPast_; // Is used to implement progressive timeout for settler keep alive call TDuration TimeInterval_; + + std::atomic CloseHandler_; // Indicate session was in active state, but state was changed // (need to decrement active session counter) // TODO: suboptimal because need lock for atomic change from interceptor diff --git a/src/client/impl/ydb_internal/session_pool/session_pool.cpp b/src/client/impl/ydb_internal/session_pool/session_pool.cpp index 9484becb53..ff9a6db5ad 100644 --- a/src/client/impl/ydb_internal/session_pool/session_pool.cpp +++ b/src/client/impl/ydb_internal/session_pool/session_pool.cpp @@ -141,6 +141,7 @@ void TSessionPool::GetSession(std::unique_ptr ctx) } if (!Sessions_.empty()) { auto it = std::prev(Sessions_.end()); + it->second->UpdateServerCloseHandler(nullptr); sessionImpl = std::move(it->second); Sessions_.erase(it); } @@ -206,6 +207,7 @@ bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active) { if (!active) IncrementActiveCounterUnsafe(); } else { + impl->UpdateServerCloseHandler(this); Sessions_.emplace(std::make_pair( impl->GetTimeToTouchFast(), impl)); @@ -242,6 +244,7 @@ void TSessionPool::Drain(std::function&& std::lock_guard guard(Mtx_); Closed_ = close; for (auto it = Sessions_.begin(); it != Sessions_.end();) { + it->second->UpdateServerCloseHandler(nullptr); const bool cont = cb(std::move(it->second)); it = Sessions_.erase(it); if (!cont) @@ -283,9 +286,11 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr weakC break; if (deletePredicate(it->second.get(), sessions.size())) { + it->second->UpdateServerCloseHandler(nullptr); sessionsToDelete.emplace_back(std::move(it->second)); sessions.erase(it++); } else if (cmd) { + it->second->UpdateServerCloseHandler(nullptr); sessionsToTouch.emplace_back(std::move(it->second)); sessions.erase(it++); } else { @@ -338,6 +343,32 @@ i64 TSessionPool::GetCurrentPoolSize() const { return Sessions_.size(); } +void TSessionPool::OnCloseSession(const TKqpSessionCommon* s, std::shared_ptr client) { + std::unique_ptr session; + { + std::lock_guard guard(Mtx_); + const auto timeToTouch = s->GetTimeToTouchFast(); + const auto id = s->GetId(); + auto it = Sessions_.find(timeToTouch); + // Sessions_ is multimap of sessions sorted by scheduled time to run periodic task + // Scan sessions with same scheduled time to find needed one. In most cases only one session here + while (it != Sessions_.end() && it->first == timeToTouch) { + if (id != it->second->GetId()) { + it++; + continue; + } + session = std::move(it->second); + Sessions_.erase(it); + break; + } + } + + if (session) { + Y_ABORT_UNLESS(session->GetState() == TKqpSessionCommon::S_IDLE); + CloseAndDeleteSession(std::move(session), client); + } +} + void TSessionPool::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) { ActiveSessionsCounter_.Set(statCollector.ActiveSessions); InPoolSessionsCounter_.Set(statCollector.InPoolSessions); diff --git a/src/client/impl/ydb_internal/session_pool/session_pool.h b/src/client/impl/ydb_internal/session_pool/session_pool.h index 4a317b9704..78394bbcc1 100644 --- a/src/client/impl/ydb_internal/session_pool/session_pool.h +++ b/src/client/impl/ydb_internal/session_pool/session_pool.h @@ -82,7 +82,7 @@ NThreading::TFuture InjectSessionStatusInterception( return promise.GetFuture(); } -class TSessionPool { +class TSessionPool : public IServerCloseHandler { private: class TWaitersQueue { public: @@ -125,6 +125,8 @@ class TSessionPool { void Drain(std::function&&)> cb, bool close); void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector); + void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr client) override; + private: void UpdateStats(); static void ReplySessionToUser(TKqpSessionCommon* session, std::unique_ptr ctx); diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index b7f21e59b2..8182d9d30b 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -344,7 +344,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public const auto sessionId = resp->session_id(); request.set_session_id(sessionId); - auto args = std::make_shared(promise, sessionId, endpoint, client); + auto args = std::make_shared(promise, sessionId, endpoint, client, client); // Do not pass client timeout here. Session must be alive TRpcRequestSettings rpcSettings; diff --git a/src/client/query/impl/client_session.cpp b/src/client/query/impl/client_session.cpp index eb8281f1dc..8b1c89a8fa 100644 --- a/src/client/query/impl/client_session.cpp +++ b/src/client/query/impl/client_session.cpp @@ -5,20 +5,106 @@ #undef INCLUDE_YDB_INTERNAL_H #include +#include namespace NYdb::inline V3::NQuery { -TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const std::string& sessionId, const std::string& endpoint) +// Custom lock primitive to protect session from destroying +// during async read execution. +// The problem is TSession::TImpl holds grpc stream processor by IntrusivePtr +// and this processor alredy refcounted by internal code. +// That mean during TSession::TImpl dtor no gurantee to grpc procerrot will be destroyed. +// StreamProcessor_->Cancel() doesn't help it just start async cancelation but we have no way +// to wait cancelation has done. +// So we need some way to protect access to row session impl pointer +// from async reader (processor callback). We can't use shared/weak ptr here because TSessionImpl +// stores as uniq ptr inside session pool and as shared ptr in the TSession +// when user got session (see GetSmartDeleter related code). + +// Why just not std::mutex? - Requirement do not destroy a mutex while it is locked +// makes it difficult to use here. Moreover we need to allow recursive lock. + +// Why recursive lock? - In happy path we destroy session from CloseFromServer call, +// so the session dtor called from thread which already got the lock. + +// TODO: Proably we can add sync version of Cancel method in to grpc stream procesor to make sure +// no more callback will be called. + +class TSafeTSessionImplHolder { + TSession::TImpl* Ptr; + std::atomic_uint32_t Semaphore; + std::atomic OwnerThread; +public: + TSafeTSessionImplHolder(TSession::TImpl* p) + : Ptr(p) + , Semaphore(0) + {} + + TSession::TImpl* TrySharedOwning() noexcept { + auto old = Semaphore.fetch_add(1); + if (old == 0) { + OwnerThread.store(std::this_thread::get_id()); + return Ptr; + } else { + return nullptr; + } + } + + void Release() noexcept { + OwnerThread.store(std::thread::id()); + Semaphore.store(0); + } + + void WaitAndLock() noexcept { + if (OwnerThread.load() == std::this_thread::get_id()) { + return; + } + + uint32_t cur = 0; + uint32_t newVal = 1; + while (!Semaphore.compare_exchange_weak(cur, newVal, + std::memory_order_release, std::memory_order_relaxed)) { + std::this_thread::yield(); + cur = 0; + } + } +}; + +void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client, + std::shared_ptr holder) +{ + auto resp = std::make_shared(); + ptr->Read(resp.get(), [resp, ptr, client, holder](NYdbGrpc::TGrpcStatus grpcStatus) mutable { + switch (grpcStatus.GRpcStatusCode) { + case grpc::StatusCode::OK: + StartAsyncRead(ptr, client, holder); + break; + case grpc::StatusCode::OUT_OF_RANGE: { + auto impl = holder->TrySharedOwning(); + if (impl) { + impl->CloseFromServer(client); + holder->Release(); + } + break; + } + } + }); +} + +TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const std::string& sessionId, const std::string& endpoint, std::weak_ptr client) : TKqpSessionCommon(sessionId, endpoint, true) , StreamProcessor_(ptr) + , SessionHolder(std::make_shared(this)) { MarkActive(); SetNeedUpdateActiveCounter(true); + StartAsyncRead(StreamProcessor_, client, SessionHolder); } TSession::TImpl::~TImpl() { StreamProcessor_->Cancel(); + SessionHolder->WaitAndLock(); } void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr, @@ -53,7 +139,7 @@ void TSession::TImpl::NewSmartShared(TStreamProcessorPtr ptr, std::move(st), TSession( args->Client, - new TSession::TImpl(ptr, args->SessionId, args->Endpoint) + new TSession::TImpl(ptr, args->SessionId, args->Endpoint, args->SessionClient) ) ) ); diff --git a/src/client/query/impl/client_session.h b/src/client/query/impl/client_session.h index 2a9b8ac52c..2d2d65973f 100644 --- a/src/client/query/impl/client_session.h +++ b/src/client/query/impl/client_session.h @@ -7,27 +7,32 @@ namespace NYdb::inline V3::NQuery { +class TSafeTSessionImplHolder; + class TSession::TImpl : public TKqpSessionCommon { public: struct TAttachSessionArgs { TAttachSessionArgs(NThreading::TPromise promise, std::string sessionId, std::string endpoint, - std::shared_ptr client) + std::shared_ptr client, + std::weak_ptr sessionClient) : Promise(promise) , SessionId(sessionId) , Endpoint(endpoint) , Client(client) + , SessionClient(sessionClient) { } NThreading::TPromise Promise; std::string SessionId; std::string Endpoint; std::shared_ptr Client; + std::weak_ptr SessionClient; }; using TResponse = Ydb::Query::SessionState; using TStreamProcessorPtr = NYdbGrpc::IStreamRequestReadProcessor::TPtr; - TImpl(TStreamProcessorPtr ptr, const std::string& id, const std::string& endpoint); + TImpl(TStreamProcessorPtr ptr, const std::string& id, const std::string& endpoint, std::weak_ptr client); ~TImpl(); static void MakeImplAsync(TStreamProcessorPtr processor, std::shared_ptr args); @@ -35,8 +40,11 @@ class TSession::TImpl : public TKqpSessionCommon { private: static void NewSmartShared(TStreamProcessorPtr ptr, std::shared_ptr args, NYdb::TStatus status); + static void StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr client, std::shared_ptr session); + private: TStreamProcessorPtr StreamProcessor_; + std::shared_ptr SessionHolder; }; } diff --git a/src/library/grpc/client/CMakeLists.txt b/src/library/grpc/client/CMakeLists.txt index f648178a9e..aef5379490 100644 --- a/src/library/grpc/client/CMakeLists.txt +++ b/src/library/grpc/client/CMakeLists.txt @@ -1,12 +1,15 @@ _ydb_sdk_add_library(grpc-client) -target_link_libraries(grpc-client PUBLIC - yutil - gRPC::grpc++ +target_link_libraries(grpc-client + PUBLIC + yutil + gRPC::grpc++ ) target_sources(grpc-client PRIVATE grpc_client_low.cpp ) +target_compile_definitions(grpc-client PRIVATE YDB_DISABLE_GRPC_SOCKET_MUTATOR) + _ydb_sdk_install_targets(TARGETS grpc-client) diff --git a/src/library/grpc/client/grpc_client_low.cpp b/src/library/grpc/client/grpc_client_low.cpp index 452bd0b7df..a2b959c311 100644 --- a/src/library/grpc/client/grpc_client_low.cpp +++ b/src/library/grpc/client/grpc_client_low.cpp @@ -14,6 +14,12 @@ #include #endif +#if !defined(YDB_DISABLE_GRPC_SOCKET_MUTATOR) +#include +#endif + +#include + namespace NYdbGrpc { void EnableGRpcTracing() { @@ -30,77 +36,79 @@ void EnableGRpcTracing() { gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); } -// class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator { -// public: -// TGRpcKeepAliveSocketMutator(int idle, int count, int interval) -// : Idle_(idle) -// , Count_(count) -// , Interval_(interval) -// { -// grpc_socket_mutator_init(this, &VTable); -// } -// private: -// static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) { -// return static_cast(mutator); -// } - -// template -// bool SetOption(int fd, int level, int optname, const TVal& value) { -// return setsockopt(fd, level, optname, reinterpret_cast(&value), sizeof(value)) == 0; -// } -// bool SetOption(int fd) { -// if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) { -// std::cerr << std::format("Failed to set SO_KEEPALIVE option: {}", strerror(errno)) << std::endl; -// return false; -// } -// #ifdef _linux_ -// if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) { -// std::cerr << std::format("Failed to set TCP_KEEPIDLE option: {}", strerror(errno)) << std::endl; -// return false; -// } -// if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) { -// std::cerr << std::format("Failed to set TCP_KEEPCNT option: {}", strerror(errno)) << std::endl; -// return false; -// } -// if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) { -// std::cerr << std::format("Failed to set TCP_KEEPINTVL option: {}", strerror(errno)) << std::endl; -// return false; -// } -// #endif -// return true; -// } -// static bool Mutate(int fd, grpc_socket_mutator* mutator) { -// auto self = Cast(mutator); -// return self->SetOption(fd); -// } -// static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) { -// const auto* selfA = Cast(a); -// const auto* selfB = Cast(b); -// auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_); -// auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_); -// return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0; -// } -// static void Destroy(grpc_socket_mutator* mutator) { -// delete Cast(mutator); -// } -// static bool Mutate2(const grpc_mutate_socket_info* info, grpc_socket_mutator* mutator) { -// auto self = Cast(mutator); -// return self->SetOption(info->fd); -// } - -// static grpc_socket_mutator_vtable VTable; -// const int Idle_; -// const int Count_; -// const int Interval_; -// }; - -// grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = -// { -// &TGRpcKeepAliveSocketMutator::Mutate, -// &TGRpcKeepAliveSocketMutator::Compare, -// &TGRpcKeepAliveSocketMutator::Destroy, -// &TGRpcKeepAliveSocketMutator::Mutate2 -// }; +#if !defined(YDB_DISABLE_GRPC_SOCKET_MUTATOR) +class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator { +public: + TGRpcKeepAliveSocketMutator(int idle, int count, int interval) + : Idle_(idle) + , Count_(count) + , Interval_(interval) + { + grpc_socket_mutator_init(this, &VTable); + } +private: + static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) { + return static_cast(mutator); + } + + template + bool SetOption(int fd, int level, int optname, const TVal& value) { + return setsockopt(fd, level, optname, reinterpret_cast(&value), sizeof(value)) == 0; + } + bool SetOption(int fd) { + if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) { + std::cerr << std::format("Failed to set SO_KEEPALIVE option: {}", strerror(errno)) << std::endl; + return false; + } +#ifdef _linux_ + if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) { + std::cerr << std::format("Failed to set TCP_KEEPIDLE option: {}", strerror(errno)) << std::endl; + return false; + } + if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) { + std::cerr << std::format("Failed to set TCP_KEEPCNT option: {}", strerror(errno)) << std::endl; + return false; + } + if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) { + std::cerr << std::format("Failed to set TCP_KEEPINTVL option: {}", strerror(errno)) << std::endl; + return false; + } +#endif + return true; + } + static bool Mutate(int fd, grpc_socket_mutator* mutator) { + auto self = Cast(mutator); + return self->SetOption(fd); + } + static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) { + const auto* selfA = Cast(a); + const auto* selfB = Cast(b); + auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_); + auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_); + return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0; + } + static void Destroy(grpc_socket_mutator* mutator) { + delete Cast(mutator); + } + static bool Mutate2(const grpc_mutate_socket_info* info, grpc_socket_mutator* mutator) { + auto self = Cast(mutator); + return self->SetOption(info->fd); + } + + static grpc_socket_mutator_vtable VTable; + const int Idle_; + const int Count_; + const int Interval_; +}; + +grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = + { + &TGRpcKeepAliveSocketMutator::Mutate, + &TGRpcKeepAliveSocketMutator::Compare, + &TGRpcKeepAliveSocketMutator::Destroy, + &TGRpcKeepAliveSocketMutator::Mutate2 + }; +#endif TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime) : TcpKeepAliveSettings_(tcpKeepAliveSettings) @@ -140,16 +148,9 @@ void TChannelPool::GetStubsHolderLocked( } } } - // TGRpcKeepAliveSocketMutator* mutator = nullptr; - // // will be destroyed inside grpc - // if (TcpKeepAliveSettings_.Enabled) { - // mutator = new TGRpcKeepAliveSocketMutator( - // TcpKeepAliveSettings_.Idle, - // TcpKeepAliveSettings_.Count, - // TcpKeepAliveSettings_.Interval - // ); - // } - cb(Pool_.emplace(channelId, CreateChannelInterface(config, nullptr)).first->second); + auto mutator = NImpl::CreateGRpcKeepAliveSocketMutator(TcpKeepAliveSettings_); + // will be destroyed inside grpc + cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second); LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId); } } @@ -587,4 +588,19 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) { } } +grpc_socket_mutator* NImpl::CreateGRpcKeepAliveSocketMutator(const TTcpKeepAliveSettings& TcpKeepAliveSettings_) { +#if !defined(YDB_DISABLE_GRPC_SOCKET_MUTATOR) + TGRpcKeepAliveSocketMutator* mutator = nullptr; + if (TcpKeepAliveSettings_.Enabled) { + mutator = new TGRpcKeepAliveSocketMutator( + TcpKeepAliveSettings_.Idle, + TcpKeepAliveSettings_.Count, + TcpKeepAliveSettings_.Interval + ); + } + return mutator; +#endif + return nullptr; +} + } // namespace NGRpc diff --git a/src/library/grpc/client/grpc_client_low.h b/src/library/grpc/client/grpc_client_low.h index ae0f898d4a..72d355783b 100644 --- a/src/library/grpc/client/grpc_client_low.h +++ b/src/library/grpc/client/grpc_client_low.h @@ -452,6 +452,10 @@ class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor>(new TServiceConnection(CreateChannelInterface(config), this)); } + template + std::unique_ptr> CreateGRpcServiceConnection(const TGRpcClientConfig& config, const TTcpKeepAliveSettings& keepAlive) { + auto mutator = NImpl::CreateGRpcKeepAliveSocketMutator(keepAlive); + // will be destroyed inside grpc + return std::unique_ptr>(new TServiceConnection(CreateChannelInterface(config, mutator), this)); + } + template std::unique_ptr> CreateGRpcServiceConnection(TStubsHolder& holder) { return std::unique_ptr>(new TServiceConnection(holder, this));