Skip to content

Commit 015366f

Browse files
GrigoriyPAstanislav-shchetinin
authored andcommitted
YQ kqprun pass allow local files into runtime listing (ydb-platform#7844)
1 parent da58b50 commit 015366f

20 files changed

+79
-42
lines changed

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
3636
NActors::TActorSystem* actorSystem,
3737
size_t pathsLimit,
3838
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
39-
bool enableInfer) {
39+
bool enableInfer,
40+
bool allowLocalFiles) {
4041
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
4142
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
4243
{
4344
ToString(NYql::EDatabaseType::ObjectStorage),
44-
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer)
45+
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles)
4546
},
4647
{
4748
ToString(NYql::EDatabaseType::ClickHouse),

ydb/core/external_sources/external_source_factory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
1515
NActors::TActorSystem* actorSystem = nullptr,
1616
size_t pathsLimit = 50000,
1717
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory = nullptr,
18-
bool enableInfer = false);
18+
bool enableInfer = false,
19+
bool allowLocalFiles = false);
1920

2021
}

ydb/core/external_sources/object_storage.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
3434
NActors::TActorSystem* actorSystem,
3535
size_t pathsLimit,
3636
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
37-
bool enableInfer)
37+
bool enableInfer,
38+
bool allowLocalFiles)
3839
: HostnamePatterns(hostnamePatterns)
3940
, PathsLimit(pathsLimit)
4041
, ActorSystem(actorSystem)
4142
, CredentialsFactory(std::move(credentialsFactory))
4243
, EnableInfer(enableInfer)
44+
, AllowLocalFiles(allowLocalFiles)
4345
{}
4446

4547
virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
@@ -320,7 +322,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
320322
.Url = meta->DataSourceLocation,
321323
.Credentials = credentials,
322324
.Pattern = effectiveFilePattern,
323-
}, Nothing(), false);
325+
}, Nothing(), AllowLocalFiles);
324326
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
325327
auto& listRes = listResFut.GetValue();
326328
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
@@ -613,6 +615,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
613615
NActors::TActorSystem* ActorSystem = nullptr;
614616
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> CredentialsFactory;
615617
const bool EnableInfer = false;
618+
const bool AllowLocalFiles;
616619
};
617620

618621
}
@@ -622,8 +625,9 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
622625
NActors::TActorSystem* actorSystem,
623626
size_t pathsLimit,
624627
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
625-
bool enableInfer) {
626-
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer);
628+
bool enableInfer,
629+
bool allowLocalFiles) {
630+
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer, allowLocalFiles);
627631
}
628632

629633
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {

ydb/core/external_sources/object_storage.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
1313
NActors::TActorSystem* actorSystem,
1414
size_t pathsLimit,
1515
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
16-
bool enableInfer);
16+
bool enableInfer,
17+
bool allowLocalFiles);
1718

1819
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);
1920

ydb/core/external_sources/object_storage_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,22 @@ namespace NKikimr {
88

99
Y_UNIT_TEST_SUITE(ObjectStorageTest) {
1010
Y_UNIT_TEST(SuccessValidation) {
11-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
11+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
1212
NKikimrExternalSources::TSchema schema;
1313
NKikimrExternalSources::TGeneral general;
1414
UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general));
1515
}
1616

1717
Y_UNIT_TEST(FailedCreate) {
18-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
18+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
1919
NKikimrExternalSources::TSchema schema;
2020
NKikimrExternalSources::TGeneral general;
2121
general.mutable_attributes()->insert({"a", "b"});
2222
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Unknown attribute a");
2323
}
2424

2525
Y_UNIT_TEST(FailedValidation) {
26-
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
26+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
2727
NKikimrExternalSources::TSchema schema;
2828
NKikimrExternalSources::TGeneral general;
2929
general.mutable_attributes()->insert({"projection.h", "b"});

ydb/core/fq/libs/init/init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ void Init(
216216
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
217217

218218
s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
219-
yqCounters->GetSubgroup("subsystem", "S3ReadActor"));
219+
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());
220220
s3ActorsFactory->RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
221221
httpGateway, s3HttpRetryPolicy);
222222

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
7878

7979
if (federatedQuerySetup) {
8080
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
81-
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig);
81+
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig, nullptr, federatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
8282
s3ActorsFactory->RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
8383

8484
if (federatedQuerySetup->ConnectorClient) {

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1073,7 +1073,8 @@ class TKqpHost : public IKqpHost {
10731073
ActorSystem,
10741074
FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit(),
10751075
FederatedQuerySetup ? FederatedQuerySetup->CredentialsFactory : nullptr,
1076-
Config->FeatureFlags.GetEnableExternalSourceSchemaInference());
1076+
Config->FeatureFlags.GetEnableExternalSourceSchemaInference(),
1077+
FederatedQuerySetup->S3GatewayConfig.GetAllowLocalFiles());
10771078
}
10781079
}
10791080

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6978,7 +6978,10 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
69786978
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory(
69796979
std::vector<TString>(hostnamePatterns.begin(), hostnamePatterns.end()),
69806980
nullptr,
6981-
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit()
6981+
appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit(),
6982+
nullptr,
6983+
appConfig.GetFeatureFlags().GetEnableExternalSourceSchemaInference(),
6984+
appConfig.GetQueryServiceConfig().GetS3().GetAllowLocalFiles()
69826985
);
69836986
}
69846987

ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,17 @@ namespace NYql::NDq {
6464
IHTTPGateway::TPtr gateway,
6565
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
6666
const TS3ReadActorFactoryConfig& cfg,
67-
::NMonitoring::TDynamicCounterPtr counters) override {
67+
::NMonitoring::TDynamicCounterPtr counters,
68+
bool allowLocalFiles) override {
6869

6970
#if defined(_linux_) || defined(_darwin_)
7071
NDB::registerFormats();
7172
factory.RegisterSource<NS3::TSource>("S3Source",
72-
[credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
73+
[credentialsFactory, gateway, retryPolicy, cfg, counters, allowLocalFiles](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
7374
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
7475
std::move(settings), args.InputIndex, args.StatsLevel, args.TxId, args.SecureParams,
7576
args.TaskParams, args.ReadRanges, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
76-
counters, args.TaskCounters, args.MemoryQuotaManager);
77+
counters, args.TaskCounters, args.MemoryQuotaManager, allowLocalFiles);
7778
});
7879
#else
7980
Y_UNUSED(factory);

0 commit comments

Comments
 (0)