diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index 5c679a07b9d8..36a87a69b8cd 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -304,7 +304,8 @@ struct TObjectStorageExternalSource : public IExternalSource { } auto httpGateway = NYql::IHTTPGateway::Make(); - auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, NYql::NS3Lister::TListingRequest{ + auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); + auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{ .Url = meta->DataSourceLocation, .AuthInfo = authInfo, .Pattern = meta->TableLocation, diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index c54f1a26cc44..01c8b51843af 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1695,6 +1695,7 @@ class TKqpHost : public IKqpHost { state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script; state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx); state->Gateway = FederatedQuerySetup->HttpGateway; + state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); state->ExecutorPoolId = AppData()->UserPoolId; auto dataSource = NYql::CreateS3DataSource(state); diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp index 994ed88d5a76..6fd330caf4ca 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.cpp @@ -29,7 +29,8 @@ std::unordered_set FqRetriedCurlCodes() { CURLE_SEND_ERROR, CURLE_RECV_ERROR, CURLE_NO_CONNECTION_AVAILABLE, - CURLE_GOT_NOTHING + CURLE_GOT_NOTHING, + CURLE_COULDNT_RESOLVE_HOST }; } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp index 2cc7214d54c9..749c86b9db44 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp @@ -111,6 +111,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID FileQueueBatchSizeLimit, FileQueueBatchObjectCountLimit, Gateway, + RetryPolicy, Url, AuthInfo, Pattern, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 9c827e32c1dd..358448c2d451 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1319,6 +1319,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public FileQueueBatchSizeLimit, FileQueueBatchObjectCountLimit, Gateway, + RetryPolicy, Url, AuthInfo, Pattern, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 447b21c672d6..69de502f94e4 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -24,6 +24,7 @@ NActors::IActor* CreateS3FileQueueActor( ui64 batchSizeLimit, ui64 batchObjectCountLimit, IHTTPGateway::TPtr gateway, + IHTTPGateway::TRetryPolicy::TPtr retryPolicy, TString url, TS3Credentials::TAuthInfo authInfo, TString pattern, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp index 8a97cf82f6f2..eb6bebed5624 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp @@ -171,6 +171,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped ui64 batchSizeLimit, ui64 batchObjectCountLimit, IHTTPGateway::TPtr gateway, + IHTTPGateway::TRetryPolicy::TPtr retryPolicy, TString url, TS3Credentials::TAuthInfo authInfo, TString pattern, @@ -186,6 +187,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped , BatchSizeLimit(batchSizeLimit) , BatchObjectCountLimit(batchObjectCountLimit) , Gateway(std::move(gateway)) + , RetryPolicy(std::move(retryPolicy)) , Url(std::move(url)) , AuthInfo(std::move(authInfo)) , Pattern(std::move(pattern)) @@ -488,6 +490,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped CurrentDirectoryPathIndex = object.GetPathIndex(); MaybeLister = NS3Lister::MakeS3Lister( Gateway, + RetryPolicy, NS3Lister::TListingRequest{ Url, AuthInfo, @@ -611,6 +614,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped THashSet UpdatedConsumers; const IHTTPGateway::TPtr Gateway; + const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; const TString Url; const TS3Credentials::TAuthInfo AuthInfo; const TString Pattern; @@ -632,6 +636,7 @@ NActors::IActor* CreateS3FileQueueActor( ui64 batchSizeLimit, ui64 batchObjectCountLimit, IHTTPGateway::TPtr gateway, + IHTTPGateway::TRetryPolicy::TPtr retryPolicy, TString url, TS3Credentials::TAuthInfo authInfo, TString pattern, @@ -648,6 +653,7 @@ NActors::IActor* CreateS3FileQueueActor( batchSizeLimit, batchObjectCountLimit, gateway, + retryPolicy, url, authInfo, pattern, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h index e2b98a98429d..12a90ffa3faa 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h @@ -22,6 +22,7 @@ NActors::IActor* CreateS3FileQueueActor( ui64 batchSizeLimit, ui64 batchObjectCountLimit, IHTTPGateway::TPtr gateway, + IHTTPGateway::TRetryPolicy::TPtr retryPolicy, TString url, TS3Credentials::TAuthInfo authInfo, TString pattern, diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp index f7c568acc869..15392deac1f1 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -243,6 +243,7 @@ class TS3Lister : public IS3Lister { TS3Lister( const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TListingRequest& listingRequest, const TMaybe& delimiter, size_t maxFilesPerQuery, @@ -265,7 +266,7 @@ class TS3Lister : public IS3Lister { NewPromise>(), std::make_shared(), IHTTPGateway::TWeakPtr(httpGateway), - GetHTTPDefaultRetryPolicy(), + retryPolicy, CreateGuidAsString(), std::move(request), delimiter, @@ -391,7 +392,7 @@ class TS3Lister : public IS3Lister { NewPromise>(), std::make_shared(), ctx.GatewayWeak, - GetHTTPDefaultRetryPolicy(), + ctx.RetryPolicy, CreateGuidAsString(), ctx.ListingRequest, ctx.Delimiter, @@ -457,15 +458,16 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory { TFuture Make( const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const NS3Lister::TListingRequest& listingRequest, const TMaybe& delimiter, bool allowLocalFiles) override { auto acquired = Semaphore->AcquireAsync(); return acquired.Apply( - [ctx = SharedCtx, httpGateway, listingRequest, delimiter, allowLocalFiles](const auto& f) { + [ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles](const auto& f) { return std::shared_ptr(new TListerLockReleaseWrapper{ NS3Lister::MakeS3Lister( - httpGateway, listingRequest, delimiter, allowLocalFiles, ctx), + httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, ctx), std::make_unique( f.GetValue()->MakeAutoRelease())}); }); @@ -507,13 +509,14 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory { IS3Lister::TPtr MakeS3Lister( const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TListingRequest& listingRequest, const TMaybe& delimiter, bool allowLocalFiles, TSharedListingContextPtr sharedCtx) { if (listingRequest.Url.substr(0, 7) != "file://") { return std::make_shared( - httpGateway, listingRequest, delimiter, 1000, std::move(sharedCtx)); + httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx)); } if (!allowLocalFiles) { diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h index bc6865ecee4d..24d70e4f9cae 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h @@ -165,6 +165,7 @@ class IS3Lister : public TIterator> { IS3Lister::TPtr MakeS3Lister( const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TListingRequest& listingRequest, const TMaybe& delimiter, bool allowLocalFiles, @@ -176,6 +177,7 @@ class IS3ListerFactory { virtual NThreading::TFuture Make( const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const NS3Lister::TListingRequest& listingRequest, const TMaybe& delimiter, bool allowLocalFiles) = 0; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 950dd6eac4f5..c9f3d40a42cb 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -540,6 +540,7 @@ class TS3DqIntegration: public TDqIntegrationBase { fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, State_->Gateway, + State_->GatewayRetryPolicy, connect.Url, GetAuthInfo(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster)), pathPattern, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 6d3f3275ae28..155810c3485b 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -88,6 +88,7 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { State_->Configuration->RegexpCacheSize)) , ListingStrategy_(MakeS3ListingStrategy( State_->Gateway, + State_->GatewayRetryPolicy, ListerFactory_, State_->Configuration->MinDesiredDirectoriesOfFilesPerQuery, State_->Configuration->MaxInflightListsPerQuery, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp index b6e1722dade9..843ba0bcf434 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp @@ -134,14 +134,15 @@ class TFlatFileS3ListingStrategy : public TCollectingS3ListingStrategy { TFlatFileS3ListingStrategy( const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, bool allowLocalFiles) : TCollectingS3ListingStrategy( - [allowLocalFiles, httpGateway, listerFactory]( + [allowLocalFiles, httpGateway, retryPolicy, listerFactory]( const TListingRequest& listingRequest, TS3ListingOptions options) { Y_UNUSED(options); return listerFactory->Make( - httpGateway, listingRequest, Nothing(), allowLocalFiles); + httpGateway, retryPolicy, listingRequest, Nothing(), allowLocalFiles); }, "TFlatFileS3ListingStrategy") { } }; @@ -151,14 +152,15 @@ class TDirectoryS3ListingStrategy : public TCollectingS3ListingStrategy { TDirectoryS3ListingStrategy( const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, bool allowLocalFiles) : TCollectingS3ListingStrategy( - [allowLocalFiles, httpGateway, listerFactory]( + [allowLocalFiles, httpGateway, retryPolicy, listerFactory]( const TListingRequest& listingRequest, TS3ListingOptions options) { Y_UNUSED(options); return listerFactory->Make( - httpGateway, listingRequest, "/", allowLocalFiles); + httpGateway, retryPolicy, listingRequest, "/", allowLocalFiles); }, "TDirectoryS3ListingStrategy") { } }; @@ -402,9 +404,10 @@ class TPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy TPartitionedDatasetS3ListingStrategy( const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, bool allowLocalFiles) : TCollectingS3ListingStrategy( - [listerFactory, httpGateway, allowLocalFiles]( + [listerFactory, httpGateway, retryPolicy, allowLocalFiles]( const TListingRequest& listingRequest, TS3ListingOptions options) { auto ptr = std::shared_ptr( @@ -413,7 +416,7 @@ class TPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy listingRequest.Prefix, options, TDirectoryS3ListingStrategy{ - listerFactory, httpGateway, allowLocalFiles}}); + listerFactory, httpGateway, retryPolicy, allowLocalFiles}}); return MakeFuture(std::move(ptr)); }, "TPartitionedDatasetS3ListingStrategy") { } @@ -557,10 +560,11 @@ class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrate TUnPartitionedDatasetS3ListingStrategy( const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, size_t minParallelism, bool allowLocalFiles) : TCollectingS3ListingStrategy( - [listerFactory, httpGateway, minParallelism, allowLocalFiles]( + [listerFactory, httpGateway, retryPolicy, minParallelism, allowLocalFiles]( const TListingRequest& listingRequest, TS3ListingOptions options) { auto ptr = std::shared_ptr( @@ -579,7 +583,7 @@ class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrate : listingRequest.Pattern.substr( 0, NS3::GetFirstWildcardPos(listingRequest.Pattern))}, TDirectoryS3ListingStrategy{ - listerFactory, httpGateway, allowLocalFiles}, + listerFactory, httpGateway, retryPolicy, allowLocalFiles}, minParallelism, options.MaxResultSet}); return MakeFuture(std::move(ptr)); @@ -893,11 +897,12 @@ class TConcurrentUnPartitionedDatasetS3ListingStrategy : TConcurrentUnPartitionedDatasetS3ListingStrategy( const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, size_t minParallelism, size_t maxParallelOps, bool allowLocalFiles) : TCollectingS3ListingStrategy( - [listerFactory, httpGateway, minParallelism, allowLocalFiles, maxParallelOps]( + [listerFactory, httpGateway, retryPolicy, minParallelism, allowLocalFiles, maxParallelOps]( const TListingRequest& listingRequest, TS3ListingOptions options) { auto ptr = std::shared_ptr( @@ -929,7 +934,7 @@ class TConcurrentUnPartitionedDatasetS3ListingStrategy : : listingRequest.Pattern.substr( 0, NS3::GetFirstWildcardPos(listingRequest.Pattern))}, TDirectoryS3ListingStrategy{ - listerFactory, httpGateway, allowLocalFiles}, + listerFactory, httpGateway, retryPolicy, allowLocalFiles}, options.MaxResultSet, maxParallelOps}); return MakeFuture(std::move(ptr)); @@ -943,10 +948,11 @@ class TConcurrentPartitionedDatasetS3ListingStrategy : TConcurrentPartitionedDatasetS3ListingStrategy( const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, size_t maxParallelOps, bool allowLocalFiles) : TCollectingS3ListingStrategy( - [listerFactory, httpGateway, allowLocalFiles, maxParallelOps]( + [listerFactory, httpGateway, retryPolicy, allowLocalFiles, maxParallelOps]( const TListingRequest& listingRequest, TS3ListingOptions options) { auto ptr = std::shared_ptr( @@ -974,12 +980,12 @@ class TConcurrentPartitionedDatasetS3ListingStrategy : : listingRequest.Pattern.substr( 0, NS3::GetFirstWildcardPos(listingRequest.Pattern))}, TDirectoryS3ListingStrategy{ - listerFactory, httpGateway, allowLocalFiles}, + listerFactory, httpGateway, retryPolicy, allowLocalFiles}, options.MaxResultSet, maxParallelOps}); return MakeFuture(std::move(ptr)); }, - "TConcurrentUnPartitionedDatasetS3ListingStrategy") { } + "TConcurrentPartitionedDatasetS3ListingStrategy") { } }; @@ -1024,6 +1030,7 @@ class TLoggingS3ListingStrategy : public IS3ListingStrategy { IS3ListingStrategy::TPtr MakeS3ListingStrategy( const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const IS3ListerFactory::TPtr& listerFactory, ui64 minDesiredDirectoriesOfFilesPerQuery, size_t maxParallelOps, @@ -1032,7 +1039,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy( std::make_shared( std::vector>{ std::make_shared( - listerFactory, httpGateway, allowLocalFiles), + listerFactory, httpGateway, retryPolicy, allowLocalFiles), std::make_shared( std::initializer_list{ {[](const TS3ListingOptions& options) { @@ -1042,6 +1049,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy( std::make_shared( listerFactory, httpGateway, + retryPolicy, allowLocalFiles)}, {[](const TS3ListingOptions& options) { return options.IsPartitionedDataset && @@ -1050,6 +1058,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy( std::make_shared( listerFactory, httpGateway, + retryPolicy, maxParallelOps, allowLocalFiles)}, {[](const TS3ListingOptions& options) { @@ -1059,6 +1068,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy( std::make_shared( listerFactory, httpGateway, + retryPolicy, minDesiredDirectoriesOfFilesPerQuery, allowLocalFiles)}, {[](const TS3ListingOptions& options) { @@ -1068,6 +1078,7 @@ IS3ListingStrategy::TPtr MakeS3ListingStrategy( std::make_shared( listerFactory, httpGateway, + retryPolicy, minDesiredDirectoriesOfFilesPerQuery, maxParallelOps, allowLocalFiles)}})})); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h index 1c1a8d8acaca..611e2dce1368 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h @@ -31,6 +31,7 @@ class IS3ListingStrategy { IS3ListingStrategy::TPtr MakeS3ListingStrategy( const IHTTPGateway::TPtr& httpGateway, + const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const NS3Lister::IS3ListerFactory::TPtr& listerFactory, ui64 minDesiredDirectoriesOfFilesPerQuery, size_t maxParallelOps, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h index 49c805c707f3..0bcf96290c7a 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h @@ -2,6 +2,7 @@ #include #include +#include #include #include "yql_s3_settings.h" @@ -28,6 +29,7 @@ struct TS3State : public TThrRefBase const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; IHTTPGateway::TPtr Gateway; + IHTTPGateway::TRetryPolicy::TPtr GatewayRetryPolicy = GetHTTPDefaultRetryPolicy(); ui32 ExecutorPoolId = 0; std::list> PrimaryKeys; };