Skip to content

Commit 131b40d

Browse files
committed
YQ kqprun pass allow local files into runtime listing (ydb-platform#7844)
1 parent cf25aff commit 131b40d

20 files changed

+86
-49
lines changed

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
3636
NActors::TActorSystem* actorSystem,
3737
size_t pathsLimit,
3838
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
39-
bool enableInfer) {
39+
bool enableInfer,
40+
bool allowLocalFiles) {
4041
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
4142
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
4243
{
4344
ToString(NYql::EDatabaseType::ObjectStorage),
44-
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer)
45+
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles)
4546
},
4647
{
4748
ToString(NYql::EDatabaseType::ClickHouse),

ydb/core/external_sources/external_source_factory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
1515
NActors::TActorSystem* actorSystem = nullptr,
1616
size_t pathsLimit = 50000,
1717
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory = nullptr,
18-
bool enableInfer = false);
18+
bool enableInfer = false,
19+
bool allowLocalFiles = false);
1920

2021
}

ydb/core/external_sources/object_storage.cpp

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
4141
NActors::TActorSystem* actorSystem,
4242
size_t pathsLimit,
4343
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
44-
bool enableInfer)
44+
bool enableInfer,
45+
bool allowLocalFiles)
4546
: HostnamePatterns(hostnamePatterns)
4647
, PathsLimit(pathsLimit)
4748
, ActorSystem(actorSystem)
4849
, CredentialsFactory(std::move(credentialsFactory))
4950
, EnableInfer(enableInfer)
51+
, AllowLocalFiles(allowLocalFiles)
5052
{}
5153

5254
virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
@@ -341,14 +343,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
341343

342344
NYql::NS3Lister::TListingRequest request {
343345
.Url = meta->DataSourceLocation,
344-
.Credentials = credentials
345-
};
346-
TVector<NYql::NS3Lister::TListingRequest> requests;
347-
348-
if (!projection) {
349-
auto error = NYql::NS3::BuildS3FilePattern(path, filePattern, partitionedBy, request);
350-
if (error) {
351-
throw yexception() << *error;
346+
.Credentials = credentials,
347+
.Pattern = effectiveFilePattern,
348+
}, Nothing(), AllowLocalFiles);
349+
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
350+
auto& listRes = listResFut.GetValue();
351+
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
352+
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
353+
throw yexception() << error.Issues.ToString();
352354
}
353355
requests.push_back(request);
354356
} else {
@@ -804,6 +806,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
804806
NActors::TActorSystem* ActorSystem = nullptr;
805807
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> CredentialsFactory;
806808
const bool EnableInfer = false;
809+
const bool AllowLocalFiles;
807810
};
808811

809812
}
@@ -813,8 +816,9 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
813816
NActors::TActorSystem* actorSystem,
814817
size_t pathsLimit,
815818
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
816-
bool enableInfer) {
817-
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer);
819+
bool enableInfer,
820+
bool allowLocalFiles) {
821+
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles);
818822
}
819823

820824
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location) {

ydb/core/external_sources/object_storage.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
1313
NActors::TActorSystem* actorSystem,
1414
size_t pathsLimit,
1515
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
16-
bool enableInfer);
16+
bool enableInfer,
17+
bool allowLocalFiles);
1718

1819
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location);
1920

ydb/core/external_sources/object_storage_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,22 @@ namespace NKikimr {
88

99
Y_UNIT_TEST_SUITE(ObjectStorageTest) {
1010
Y_UNIT_TEST(SuccessValidation) {
11-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
11+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
1212
NKikimrExternalSources::TSchema schema;
1313
NKikimrExternalSources::TGeneral general;
1414
UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general));
1515
}
1616

1717
Y_UNIT_TEST(FailedCreate) {
18-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
18+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
1919
NKikimrExternalSources::TSchema schema;
2020
NKikimrExternalSources::TGeneral general;
2121
general.mutable_attributes()->insert({"a", "b"});
2222
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Unknown attribute a");
2323
}
2424

2525
Y_UNIT_TEST(FailedValidation) {
26-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
26+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
2727
NKikimrExternalSources::TSchema schema;
2828
NKikimrExternalSources::TGeneral general;
2929
general.mutable_attributes()->insert({"projection.h", "b"});

ydb/core/fq/libs/init/init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ void Init(
227227
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
228228

229229
s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
230-
yqCounters->GetSubgroup("subsystem", "S3ReadActor"));
230+
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());
231231
s3ActorsFactory->RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
232232
httpGateway, s3HttpRetryPolicy);
233233

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
7878

7979
if (federatedQuerySetup) {
8080
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
81-
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
81+
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig, nullptr, federatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
8282
s3ActorsFactory->RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
8383

8484
if (federatedQuerySetup->ConnectorClient) {

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1072,7 +1072,8 @@ class TKqpHost : public IKqpHost {
10721072
ActorSystem,
10731073
FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit(),
10741074
FederatedQuerySetup ? FederatedQuerySetup->CredentialsFactory : nullptr,
1075-
Config->FeatureFlags.GetEnableExternalSourceSchemaInference());
1075+
Config->FeatureFlags.GetEnableExternalSourceSchemaInference(),
1076+
FederatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
10761077
}
10771078
}
10781079

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6977,7 +6977,10 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
69776977
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory(
69786978
std::vector<TString>(hostnamePatterns.begin(), hostnamePatterns.end()),
69796979
nullptr,
6980-
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit()
6980+
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit(),
6981+
nullptr,
6982+
appConfig.GetFeatureFlags().GetEnableExternalSourceSchemaInference(),
6983+
appConfig.GetQueryServiceConfig().GetS3().GetAllowLocalFiles()
69816984
);
69826985
}
69836986

ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,17 @@ namespace NYql::NDq {
6464
IHTTPGateway::TPtr gateway,
6565
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
6666
const TS3ReadActorFactoryConfig& cfg,
67-
::NMonitoring::TDynamicCounterPtr counters) override {
67+
::NMonitoring::TDynamicCounterPtr counters,
68+
bool allowLocalFiles) override {
6869

6970
#if defined(_linux_) || defined(_darwin_)
7071
NDB::registerFormats();
7172
factory.RegisterSource<NS3::TSource>("S3Source",
72-
[credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
73+
[credentialsFactory, gateway, retryPolicy, cfg, counters, allowLocalFiles](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
7374
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
7475
std::move(settings), args.InputIndex, args.StatsLevel, args.TxId, args.SecureParams,
7576
args.TaskParams, args.ReadRanges, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
76-
counters, args.TaskCounters, args.MemoryQuotaManager);
77+
counters, args.TaskCounters, args.MemoryQuotaManager, allowLocalFiles);
7778
});
7879
#else
7980
Y_UNUSED(factory);

0 commit comments

Comments
 (0)