Skip to content

Commit 18a91ed

Browse files
Inference projections support (#8744)
1 parent 37cf155 commit 18a91ed

File tree

3 files changed

+189
-48
lines changed

3 files changed

+189
-48
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 112 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h>
1919
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
2020
#include <ydb/library/yql/providers/s3/proto/credentials.pb.h>
21+
#include <ydb/library/yql/utils/yql_panic.h>
2122
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
2223
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
2324

@@ -332,44 +333,87 @@ struct TObjectStorageExternalSource : public IExternalSource {
332333

333334
const TString path = meta->TableLocation;
334335
const TString filePattern = meta->Attributes.Value("filepattern", TString{});
336+
const TString projection = meta->Attributes.Value("projection", TString{});
335337
const TVector<TString> partitionedBy = GetPartitionedByConfig(meta);
338+
339+
NYql::NPathGenerator::TPathGeneratorPtr pathGenerator;
340+
341+
bool shouldInferPartitions = !partitionedBy.empty() && !projection;
342+
bool ignoreEmptyListings = !projection.empty();
343+
336344
NYql::NS3Lister::TListingRequest request {
337345
.Url = meta->DataSourceLocation,
338346
.Credentials = credentials
339347
};
348+
TVector<NYql::NS3Lister::TListingRequest> requests;
349+
350+
if (!projection) {
351+
auto error = NYql::NS3::BuildS3FilePattern(path, filePattern, partitionedBy, request);
352+
if (error) {
353+
throw yexception() << *error;
354+
}
355+
requests.push_back(request);
356+
} else {
357+
if (NYql::NS3::HasWildcards(path)) {
358+
throw yexception() << "Path prefix: '" << path << "' contains wildcards";
359+
}
340360

341-
auto error = NYql::NS3::BuildS3FilePattern(path, filePattern, partitionedBy, request);
342-
if (error) {
343-
throw yexception() << *error;
361+
pathGenerator = NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy);
362+
for (const auto& rule : pathGenerator->GetRules()) {
363+
YQL_ENSURE(rule.ColumnValues.size() == partitionedBy.size());
364+
365+
request.Pattern = NYql::NS3::NormalizePath(TStringBuilder() << path << "/" << rule.Path << "/*");
366+
request.PatternType = NYql::NS3Lister::ES3PatternType::Wildcard;
367+
request.Prefix = request.Pattern.substr(0, NYql::NS3::GetFirstWildcardPos(request.Pattern));
368+
369+
requests.push_back(request);
370+
}
344371
}
345372

346373
auto partByData = std::make_shared<TStringBuilder>();
374+
if (shouldInferPartitions) {
375+
*partByData << JoinSeq(",", partitionedBy);
376+
}
347377

378+
TVector<NThreading::TFuture<NYql::NS3Lister::TListResult>> futures;
348379
auto httpGateway = NYql::IHTTPGateway::Make();
349380
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
350-
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, request, Nothing(), AllowLocalFiles, ActorSystem);
351-
auto afterListing = s3Lister->Next().Apply([partByData, partitionedBy, path = request.Pattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
352-
auto& listRes = listResFut.GetValue();
353-
auto& partByRef = *partByData;
354-
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
355-
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
356-
throw yexception() << error.Issues.ToString();
357-
}
358-
auto& entries = std::get<NYql::NS3Lister::TListEntries>(listRes);
359-
if (entries.Objects.empty()) {
360-
throw yexception() << "couldn't find files at " << path;
361-
}
381+
for (const auto& req : requests) {
382+
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, req, Nothing(), AllowLocalFiles, ActorSystem);
383+
futures.push_back(s3Lister->Next());
384+
}
362385

363-
partByRef << JoinSeq(",", partitionedBy);
364-
for (const auto& entry : entries.Objects) {
365-
Y_ENSURE(entry.MatchedGlobs.size() == partitionedBy.size());
366-
partByRef << Endl << JoinSeq(",", entry.MatchedGlobs);
367-
}
368-
for (const auto& entry : entries.Objects) {
369-
if (entry.Size > 0) {
370-
return entry;
386+
auto allFuture = NThreading::WaitExceptionOrAll(futures);
387+
auto afterListing = allFuture.Apply([partByData, shouldInferPartitions, ignoreEmptyListings, futures = std::move(futures), requests = std::move(requests)](const NThreading::TFuture<void>& result) {
388+
result.GetValue();
389+
for (size_t i = 0; i < futures.size(); ++i) {
390+
auto& listRes = futures[i].GetValue();
391+
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
392+
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
393+
throw yexception() << error.Issues.ToString();
394+
}
395+
auto& entries = std::get<NYql::NS3Lister::TListEntries>(listRes);
396+
if (entries.Objects.empty() && !ignoreEmptyListings) {
397+
throw yexception() << "couldn't find files at " << requests[i].Pattern;
398+
}
399+
400+
if (shouldInferPartitions) {
401+
for (const auto& entry : entries.Objects) {
402+
*partByData << Endl << JoinSeq(",", entry.MatchedGlobs);
403+
}
404+
}
405+
406+
for (const auto& entry : entries.Objects) {
407+
if (entry.Size > 0) {
408+
return entry;
409+
}
410+
}
411+
412+
if (!ignoreEmptyListings) {
413+
throw yexception() << "couldn't find any files for type inference, please check that the right path is provided";
371414
}
372415
}
416+
373417
throw yexception() << "couldn't find any files for type inference, please check that the right path is provided";
374418
});
375419

@@ -412,13 +456,45 @@ struct TObjectStorageExternalSource : public IExternalSource {
412456
));
413457

414458
return promise.GetFuture();
415-
}).Apply([arrowInferencinatorId, meta, partByData, partitionedBy, this](const NThreading::TFuture<TMetadataResult>& result) {
459+
}).Apply([arrowInferencinatorId, meta, partByData, partitionedBy, pathGenerator, this](const NThreading::TFuture<TMetadataResult>& result) {
416460
auto& value = result.GetValue();
417461
if (!value.Success()) {
418462
return result;
419463
}
420464

421-
return InferPartitionedColumnsTypes(arrowInferencinatorId, partByData, partitionedBy, result);
465+
auto meta = value.Metadata;
466+
if (pathGenerator) {
467+
for (const auto& rule : pathGenerator->GetConfig().Rules) {
468+
auto& destColumn = *meta->Schema.add_column();
469+
destColumn.mutable_name()->assign(rule.Name);
470+
switch (rule.Type) {
471+
case NYql::NPathGenerator::IPathGenerator::EType::INTEGER:
472+
destColumn.mutable_type()->set_type_id(Ydb::Type::INT64);
473+
break;
474+
475+
case NYql::NPathGenerator::IPathGenerator::EType::DATE:
476+
destColumn.mutable_type()->set_type_id(Ydb::Type::DATE);
477+
break;
478+
479+
case NYql::NPathGenerator::IPathGenerator::EType::ENUM:
480+
default:
481+
destColumn.mutable_type()->set_type_id(Ydb::Type::STRING);
482+
break;
483+
}
484+
}
485+
} else {
486+
for (const auto& partitionName : partitionedBy) {
487+
auto& destColumn = *meta->Schema.add_column();
488+
destColumn.mutable_name()->assign(partitionName);
489+
destColumn.mutable_type()->set_type_id(Ydb::Type::UTF8);
490+
}
491+
}
492+
493+
if (!partitionedBy.empty() && !pathGenerator) {
494+
return InferPartitionedColumnsTypes(arrowInferencinatorId, partByData, result);
495+
}
496+
497+
return result;
422498
}).Apply([](const NThreading::TFuture<TMetadataResult>& result) {
423499
auto& value = result.GetValue();
424500
if (value.Success()) {
@@ -436,20 +512,10 @@ struct TObjectStorageExternalSource : public IExternalSource {
436512
NThreading::TFuture<TMetadataResult> InferPartitionedColumnsTypes(
437513
NActors::TActorId arrowInferencinatorId,
438514
std::shared_ptr<TStringBuilder> partByData,
439-
const TVector<TString>& partitionedBy,
440515
const NThreading::TFuture<TMetadataResult>& result) const {
441516

442517
auto& value = result.GetValue();
443-
if (partitionedBy.empty()) {
444-
return result;
445-
}
446-
447518
auto meta = value.Metadata;
448-
for (const auto& partitionName : partitionedBy) {
449-
auto& destColumn = *meta->Schema.add_column();
450-
destColumn.mutable_name()->assign(partitionName);
451-
destColumn.mutable_type()->set_type_id(Ydb::Type::UTF8);
452-
}
453519

454520
arrow::BufferBuilder builder;
455521
auto partitionBuffer = std::make_shared<arrow::Buffer>(nullptr, 0);
@@ -500,15 +566,19 @@ struct TObjectStorageExternalSource : public IExternalSource {
500566
THashSet<TString> columns;
501567
if (auto partitioned = meta->Attributes.FindPtr("partitionedby"); partitioned) {
502568
NJson::TJsonValue values;
503-
Y_ENSURE(NJson::ReadJsonTree(*partitioned, &values));
504-
Y_ENSURE(values.GetType() == NJson::JSON_ARRAY);
569+
auto successful = NJson::ReadJsonTree(*partitioned, &values);
570+
if (!successful) {
571+
columns.insert(*partitioned);
572+
} else {
573+
Y_ENSURE(values.GetType() == NJson::JSON_ARRAY);
505574

506-
for (const auto& value : values.GetArray()) {
507-
Y_ENSURE(value.GetType() == NJson::JSON_STRING);
508-
if (columns.contains(value.GetString())) {
509-
throw yexception() << "invalid partitioned_by parameter, column " << value.GetString() << "mentioned twice";
575+
for (const auto& value : values.GetArray()) {
576+
Y_ENSURE(value.GetType() == NJson::JSON_STRING);
577+
if (columns.contains(value.GetString())) {
578+
throw yexception() << "invalid partitioned_by parameter, column " << value.GetString() << "mentioned twice";
579+
}
580+
columns.insert(value.GetString());
510581
}
511-
columns.insert(value.GetString());
512582
}
513583
}
514584

ydb/core/kqp/provider/read_attributes_utils.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,6 @@ class TGatheringAttributesVisitor : public IAstAttributesVisitor {
2121

2222
void VisitAttribute(TString key, TString value) override {
2323
Y_ABORT_UNLESS(CurrentSource, "cannot write %s: %s", key.c_str(), value.c_str());
24-
if (key == "partitionedby") {
25-
NJson::TJsonArray values({ value });
26-
CurrentSource->second.try_emplace(key, NJson::WriteJson({ values }));
27-
return;
28-
}
2924
CurrentSource->second.try_emplace(key, value);
3025
};
3126

@@ -126,9 +121,11 @@ class TAttributesReplacingVisitor : public IAstAttributesVisitor {
126121
auto nodeChildren = node->Children();
127122
if (!nodeChildren.empty() && nodeChildren[0]->IsAtom()) {
128123
TCoAtom attrName{nodeChildren[0]};
129-
if (attrName.StringValue().equal("userschema")) {
124+
if (attrName.StringValue() == "userschema") {
130125
node = BuildSchemaFromMetadata(Read->Pos(), Ctx, Metadata->Columns);
131126
ReplacedUserchema = true;
127+
} else if (attrName.StringValue() == "partitionedby") {
128+
NewAttributes.erase("partitionedby");
132129
}
133130
}
134131
Children.push_back(std::move(node));

ydb/tests/fq/s3/test_s3_0.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,80 @@ def test_inference_timestamp(self, kikimr, s3, client, unique_prefix):
488488
assert result_set.columns[2].name == "c"
489489
assert result_set.columns[2].type.type_id == ydb.Type.UTF8
490490

491+
@yq_v2
492+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
493+
def test_inference_projection(self, kikimr, s3, client, unique_prefix):
494+
resource = boto3.resource(
495+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
496+
)
497+
498+
bucket = resource.Bucket("fbucket")
499+
bucket.create(ACL='public-read')
500+
bucket.objects.all().delete()
501+
502+
s3_client = boto3.client(
503+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
504+
)
505+
506+
fruits = '''Fruit,Price,Weight
507+
Banana,3,100
508+
Apple,2,22
509+
Pear,15,33'''
510+
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='year=2023/fruits.csv', ContentType='text/plain')
511+
512+
kikimr.control_plane.wait_bootstrap(1)
513+
storage_connection_name = unique_prefix + "fruitbucket"
514+
client.create_storage_connection(storage_connection_name, "fbucket")
515+
516+
sql = '''$projection = @@ {
517+
"projection.enabled" : "true",
518+
"storage.location.template" : "/${date}",
519+
"projection.date.type" : "date",
520+
"projection.date.min" : "2022-11-02",
521+
"projection.date.max" : "2024-12-02",
522+
"projection.date.interval" : "1",
523+
"projection.date.format" : "/year=%Y",
524+
"projection.date.unit" : "YEARS"
525+
} @@;''' + f'''
526+
527+
SELECT *
528+
FROM `{storage_connection_name}`.`/`
529+
WITH (format=csv_with_names,
530+
with_infer='true',
531+
partitioned_by=(`date`),
532+
projection=$projection);
533+
'''
534+
535+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
536+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
537+
538+
data = client.get_result_data(query_id)
539+
result_set = data.result.result_set
540+
logging.debug(str(result_set))
541+
assert len(result_set.columns) == 4
542+
assert result_set.columns[0].name == "Fruit"
543+
assert result_set.columns[0].type.type_id == ydb.Type.UTF8
544+
assert result_set.columns[1].name == "Price"
545+
assert result_set.columns[1].type.optional_type.item.type_id == ydb.Type.INT64
546+
assert result_set.columns[2].name == "Weight"
547+
assert result_set.columns[2].type.optional_type.item.type_id == ydb.Type.INT64
548+
assert result_set.columns[3].name == "date"
549+
assert result_set.columns[3].type.type_id == ydb.Type.DATE
550+
assert len(result_set.rows) == 3
551+
assert result_set.rows[0].items[0].text_value == "Banana"
552+
assert result_set.rows[0].items[1].int64_value == 3
553+
assert result_set.rows[0].items[2].int64_value == 100
554+
assert result_set.rows[0].items[3].uint32_value == 19663
555+
assert result_set.rows[1].items[0].text_value == "Apple"
556+
assert result_set.rows[1].items[1].int64_value == 2
557+
assert result_set.rows[1].items[2].int64_value == 22
558+
assert result_set.rows[1].items[3].uint32_value == 19663
559+
assert result_set.rows[2].items[0].text_value == "Pear"
560+
assert result_set.rows[2].items[1].int64_value == 15
561+
assert result_set.rows[2].items[2].int64_value == 33
562+
assert result_set.rows[2].items[3].uint32_value == 19663
563+
assert sum(kikimr.control_plane.get_metering(1)) == 10
564+
491565
@yq_all
492566
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
493567
def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):

0 commit comments

Comments
 (0)