Skip to content

Commit b8f8460

Browse files
authored
listing fix (#7643)
1 parent 4fddb0c commit b8f8460

File tree

14 files changed

+139
-29
lines changed

14 files changed

+139
-29
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,6 @@ struct TObjectStorageExternalSource : public IExternalSource {
292292
};
293293

294294
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
295-
Y_UNUSED(ActorSystem);
296295
auto format = meta->Attributes.FindPtr("format");
297296
if (!format || !meta->Attributes.contains("withinfer")) {
298297
return NThreading::MakeFuture(std::move(meta));
@@ -335,7 +334,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
335334
.Url = meta->DataSourceLocation,
336335
.Credentials = credentials,
337336
.Pattern = effectiveFilePattern,
338-
}, Nothing(), AllowLocalFiles);
337+
}, Nothing(), AllowLocalFiles, ActorSystem);
339338
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
340339
auto& listRes = listResFut.GetValue();
341340
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {

ydb/core/fq/libs/actors/run_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1971,7 +1971,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
19711971

19721972
{
19731973
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory,
1974-
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
1974+
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles(), NActors::TActivationContext::ActorSystem())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
19751975
}
19761976

19771977
{

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,6 +1700,7 @@ class TKqpHost : public IKqpHost {
17001700
state->Gateway = FederatedQuerySetup->HttpGateway;
17011701
state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
17021702
state->ExecutorPoolId = AppData()->UserPoolId;
1703+
state->ActorSystem = ActorSystem;
17031704

17041705
auto dataSource = NYql::CreateS3DataSource(state);
17051706
auto dataSink = NYql::CreateS3DataSink(state);

ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,8 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
502502
PatternType,
503503
object.GetPath()},
504504
Nothing(),
505-
AllowLocalFiles);
505+
AllowLocalFiles,
506+
NActors::TActivationContext::ActorSystem());
506507
Fetch();
507508
return true;
508509
}

ydb/library/yql/providers/s3/object_listers/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ PEERDIR(
1414
ydb/library/yql/providers/common/http_gateway
1515
ydb/library/yql/providers/s3/credentials
1616
ydb/library/yql/utils
17+
ydb/library/yql/utils/actor_log
1718
ydb/library/yql/utils/threading
1819
)
1920

ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
55
#include <ydb/library/yql/providers/s3/common/util.h>
6+
#include <ydb/library/yql/utils/actor_log/log.h>
67
#include <ydb/library/yql/utils/log/log.h>
78
#include <ydb/library/yql/utils/url_builder.h>
89
#include <ydb/library/yql/utils/yql_panic.h>
@@ -25,7 +26,7 @@
2526
namespace NYql::NS3Lister {
2627

2728
IOutputStream& operator<<(IOutputStream& stream, const TListingRequest& request) {
28-
return stream << "TListingRequest{.url=" << request.Url
29+
return stream << "[TS3Lister] TListingRequest{.url=" << request.Url
2930
<< ",.Prefix=" << request.Prefix
3031
<< ",.Pattern=" << request.Pattern
3132
<< ",.PatternType=" << request.PatternType
@@ -51,7 +52,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex,
5152

5253
const size_t numGroups = re->NumberOfCapturingGroups();
5354
YQL_CLOG(DEBUG, ProviderS3)
54-
<< "Got regex: '" << regex << "' with " << numGroups << " capture groups ";
55+
<< "[TS3Lister] Got regex: '" << regex << "' with " << numGroups << " capture groups ";
5556

5657
auto groups = std::make_shared<std::vector<std::string>>(numGroups);
5758
auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups);
@@ -101,7 +102,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& patt
101102
}
102103

103104
const auto regex = NS3::RegexFromWildcards(pattern);
104-
YQL_CLOG(DEBUG, ProviderS3) << "Got prefix: '" << regexPatternPrefix << "', regex: '"
105+
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Got prefix: '" << regexPatternPrefix << "', regex: '"
105106
<< regex << "' from original pattern '" << pattern << "'";
106107

107108
return MakeFilterRegexp(regex, sharedCtx);
@@ -238,6 +239,8 @@ class TS3Lister : public IS3Lister {
238239
const TMaybe<TString> Delimiter;
239240
const TMaybe<TString> ContinuationToken;
240241
const ui64 MaxKeys;
242+
const std::pair<TString, TString> CurrentLogContextPath;
243+
const NActors::TActorSystem* ActorSystem;
241244
};
242245

243246
TS3Lister(
@@ -246,7 +249,8 @@ class TS3Lister : public IS3Lister {
246249
const TListingRequest& listingRequest,
247250
const TMaybe<TString>& delimiter,
248251
size_t maxFilesPerQuery,
249-
TSharedListingContextPtr sharedCtx)
252+
TSharedListingContextPtr sharedCtx,
253+
NActors::TActorSystem* actorSystem)
250254
: MaxFilesPerQuery(maxFilesPerQuery) {
251255
Y_ENSURE(
252256
listingRequest.Url.substr(0, 7) != "file://",
@@ -270,7 +274,9 @@ class TS3Lister : public IS3Lister {
270274
std::move(request),
271275
delimiter,
272276
Nothing(),
273-
MaxFilesPerQuery};
277+
MaxFilesPerQuery,
278+
NLog::CurrentLogContextPath(),
279+
actorSystem};
274280

275281
YQL_CLOG(TRACE, ProviderS3)
276282
<< "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url
@@ -335,9 +341,19 @@ class TS3Lister : public IS3Lister {
335341
/*data=*/"",
336342
retryPolicy);
337343
}
344+
338345
static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) {
339346
return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) {
340-
OnDiscovery(c, std::move(result));
347+
if (c.ActorSystem) {
348+
NDq::TYqlLogScope logScope(c.ActorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second);
349+
OnDiscovery(c, std::move(result));
350+
} else {
351+
/*
352+
If the subsystem doesn't use the actor system
353+
then there is a need to use an own YqlLoggerScope on the top level
354+
*/
355+
OnDiscovery(c, std::move(result));
356+
}
341357
};
342358
}
343359

@@ -351,7 +367,7 @@ class TS3Lister : public IS3Lister {
351367
const NXml::TDocument xml(xmlString, NXml::TDocument::String);
352368
auto parsedResponse = ParseListObjectV2Response(xml, ctx.RequestId);
353369
YQL_CLOG(DEBUG, ProviderS3)
354-
<< "Listing of " << ctx.ListingRequest.Url
370+
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url
355371
<< ctx.ListingRequest.Prefix << ": have " << ctx.Output->Size()
356372
<< " entries, got another " << parsedResponse.KeyCount
357373
<< " entries, request id: [" << ctx.RequestId << "]";
@@ -380,7 +396,7 @@ class TS3Lister : public IS3Lister {
380396
}
381397

382398
if (parsedResponse.IsTruncated && !earlyStop) {
383-
YQL_CLOG(DEBUG, ProviderS3) << "Listing of " << ctx.ListingRequest.Url
399+
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Listing of " << ctx.ListingRequest.Url
384400
<< ctx.ListingRequest.Prefix
385401
<< ": got truncated flag, will continue";
386402

@@ -409,14 +425,14 @@ class TS3Lister : public IS3Lister {
409425
TStringBuilder{} << "request id: [" << ctx.RequestId << "]",
410426
std::move(result.Issues));
411427
YQL_CLOG(INFO, ProviderS3)
412-
<< "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
428+
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
413429
<< ": got error from http gateway: " << issues.ToString(true);
414430
ctx.Promise.SetValue(TListError{EListError::GENERAL, std::move(issues)});
415431
ctx.NextRequestPromise.SetValue(Nothing());
416432
}
417433
} catch (const std::exception& ex) {
418434
YQL_CLOG(INFO, ProviderS3)
419-
<< "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
435+
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
420436
<< " : got exception: " << ex.what();
421437
ctx.Promise.SetException(std::current_exception());
422438
ctx.NextRequestPromise.SetValue(Nothing());
@@ -452,9 +468,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
452468
using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>;
453469

454470
explicit TS3ParallelLimitedListerFactory(
455-
size_t maxParallelOps, TSharedListingContextPtr sharedCtx)
471+
size_t maxParallelOps, TSharedListingContextPtr sharedCtx, NActors::TActorSystem* actorSystem)
456472
: SharedCtx(std::move(sharedCtx))
457-
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { }
473+
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps)))
474+
, ActorSystem(actorSystem) { }
458475

459476
TFuture<NS3Lister::IS3Lister::TPtr> Make(
460477
const IHTTPGateway::TPtr& httpGateway,
@@ -464,10 +481,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
464481
bool allowLocalFiles) override {
465482
auto acquired = Semaphore->AcquireAsync();
466483
return acquired.Apply(
467-
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles](const auto& f) {
484+
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem = ActorSystem](const auto& f) {
468485
return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{
469486
NS3Lister::MakeS3Lister(
470-
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, ctx),
487+
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem, ctx),
471488
std::make_unique<TAsyncSemaphore::TAutoRelease>(
472489
f.GetValue()->MakeAutoRelease())});
473490
});
@@ -503,6 +520,7 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
503520
private:
504521
TSharedListingContextPtr SharedCtx;
505522
const TAsyncSemaphore::TPtr Semaphore;
523+
NActors::TActorSystem* ActorSystem;
506524
};
507525

508526
} // namespace
@@ -513,10 +531,11 @@ IS3Lister::TPtr MakeS3Lister(
513531
const TListingRequest& listingRequest,
514532
const TMaybe<TString>& delimiter,
515533
bool allowLocalFiles,
534+
NActors::TActorSystem* actorSystem,
516535
TSharedListingContextPtr sharedCtx) {
517536
if (listingRequest.Url.substr(0, 7) != "file://") {
518537
return std::make_shared<TS3Lister>(
519-
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx));
538+
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx), actorSystem);
520539
}
521540

522541
if (!allowLocalFiles) {
@@ -530,13 +549,14 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
530549
size_t maxParallelOps,
531550
size_t callbackThreadCount,
532551
size_t callbackPerThreadQueueSize,
533-
size_t regexpCacheSize) {
552+
size_t regexpCacheSize,
553+
NActors::TActorSystem* actorSystem) {
534554
std::shared_ptr<TSharedListingContext> sharedCtx = nullptr;
535555
if (callbackThreadCount != 0 || regexpCacheSize != 0) {
536556
sharedCtx = std::make_shared<TSharedListingContext>(
537557
callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize);
538558
}
539-
return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx);
559+
return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx, actorSystem);
540560
}
541561

542562
} // namespace NYql::NS3Lister

ydb/library/yql/providers/s3/object_listers/yql_s3_list.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <library/cpp/cache/cache.h>
44
#include <library/cpp/threading/future/future.h>
55
#include <util/thread/pool.h>
6+
#include <ydb/library/actors/core/actorsystem.h>
67
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
78
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
89

@@ -169,6 +170,7 @@ IS3Lister::TPtr MakeS3Lister(
169170
const TListingRequest& listingRequest,
170171
const TMaybe<TString>& delimiter,
171172
bool allowLocalFiles,
173+
NActors::TActorSystem* actorSystem,
172174
TSharedListingContextPtr sharedCtx = nullptr);
173175

174176
class IS3ListerFactory {
@@ -189,7 +191,8 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
189191
size_t maxParallelOps,
190192
size_t callbackThreadCount,
191193
size_t callbackPerThreadQueueSize,
192-
size_t regexpCacheSize);
194+
size_t regexpCacheSize,
195+
NActors::TActorSystem* actorSystem);
193196

194197
} // namespace NS3Lister
195198
} // namespace NYql

ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
8383
State_->Configuration->MaxInflightListsPerQuery,
8484
State_->Configuration->ListingCallbackThreadCount,
8585
State_->Configuration->ListingCallbackPerThreadQueueSize,
86-
State_->Configuration->RegexpCacheSize))
86+
State_->Configuration->RegexpCacheSize,
87+
State_->ActorSystem))
8788
, ListingStrategy_(MakeS3ListingStrategy(
8889
State_->Gateway,
8990
State_->GatewayRetryPolicy,

ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,8 +492,14 @@ class TBFSDirectoryResolverIterator : public IS3Lister {
492492
});
493493
return NextDirectoryListeningChunk;
494494
}
495+
496+
static TString ParseBasePath(const TString& path) {
497+
TString basePath = TString{TStringBuf{path}.RBefore('/')};
498+
return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath;
499+
}
500+
495501
void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) {
496-
result.Directories.push_back({.Path = sourcePrefix});
502+
result.Directories.push_back({.Path = ParseBasePath(sourcePrefix)});
497503
for (auto& directoryPrefix : DirectoryPrefixQueue) {
498504
result.Directories.push_back({.Path = directoryPrefix});
499505
}

ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
namespace NYql {
66

7-
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles) {
8-
return [gateway, credentialsFactory, allowLocalFiles] (
7+
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles, NActors::TActorSystem* actorSystem) {
8+
return [gateway, credentialsFactory, allowLocalFiles, actorSystem] (
99
const TString& userName,
1010
const TString& sessionId,
1111
const TGatewaysConfig* gatewaysConfig,
@@ -31,6 +31,7 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway
3131
state->Types = typeCtx.Get();
3232
state->FunctionRegistry = functionRegistry;
3333
state->CredentialsFactory = credentialsFactory;
34+
state->ActorSystem = actorSystem;
3435
if (gatewaysConfig) {
3536
state->Configuration->Init(gatewaysConfig->GetS3(), typeCtx);
3637
}

0 commit comments

Comments
 (0)