Skip to content

Commit f328b7c

Browse files
Merging inference updates (ydb-platform#8616)
1 parent 1cdbb71 commit f328b7c

File tree

20 files changed

+686
-205
lines changed

20 files changed

+686
-205
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 123 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
#include "validation_functions.h"
44
#include "object_storage/s3_fetcher.h"
55

6+
#include <util/string/join.h>
67
#include <ydb/core/external_sources/object_storage/inference/arrow_fetcher.h>
78
#include <ydb/core/external_sources/object_storage/inference/arrow_inferencinator.h>
9+
#include <ydb/core/external_sources/object_storage/inference/infer_config.h>
810
#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
911
#include <ydb/core/protos/external_sources.pb.h>
1012
#include <ydb/core/protos/flat_scheme_op.pb.h>
@@ -20,6 +22,10 @@
2022
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
2123

2224
#include <library/cpp/scheme/scheme.h>
25+
#include <library/cpp/json/json_reader.h>
26+
#include <arrow/buffer_builder.h>
27+
#include <arrow/buffer.h>
28+
#include <arrow/io/memory.h>
2329

2430
#include <util/string/builder.h>
2531

@@ -320,21 +326,29 @@ struct TObjectStorageExternalSource : public IExternalSource {
320326
structuredTokenBuilder.SetNoAuth();
321327
}
322328

323-
auto effectiveFilePattern = NYql::NS3::NormalizePath(meta->TableLocation);
324-
if (meta->TableLocation.EndsWith('/')) {
325-
effectiveFilePattern += '*';
326-
}
327-
328329
const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson());
330+
331+
const TString path = meta->TableLocation;
332+
const TString filePattern = meta->Attributes.Value("filepattern", TString{});
333+
const TVector<TString> partitionedBy = GetPartitionedByConfig(meta);
334+
NYql::NS3Lister::TListingRequest request {
335+
.Url = meta->DataSourceLocation,
336+
.Credentials = credentials
337+
};
338+
339+
auto error = NYql::NS3::BuildS3FilePattern(path, filePattern, partitionedBy, request);
340+
if (error) {
341+
throw yexception() << *error;
342+
}
343+
344+
auto partByData = std::make_shared<TStringBuilder>();
345+
329346
auto httpGateway = NYql::IHTTPGateway::Make();
330347
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
331-
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
332-
.Url = meta->DataSourceLocation,
333-
.Credentials = credentials,
334-
.Pattern = effectiveFilePattern,
335-
}, Nothing(), false, ActorSystem);
336-
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
348+
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, request, Nothing(), true, ActorSystem);
349+
auto afterListing = s3Lister->Next().Apply([partByData, partitionedBy, path = request.Pattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
337350
auto& listRes = listResFut.GetValue();
351+
auto& partByRef = *partByData;
338352
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
339353
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
340354
throw yexception() << error.Issues.ToString();
@@ -343,6 +357,12 @@ struct TObjectStorageExternalSource : public IExternalSource {
343357
if (entries.Objects.empty()) {
344358
throw yexception() << "couldn't find files at " << path;
345359
}
360+
361+
partByRef << JoinSeq(",", partitionedBy);
362+
for (const auto& entry : entries.Objects) {
363+
Y_ENSURE(entry.MatchedGlobs.size() == partitionedBy.size());
364+
partByRef << Endl << JoinSeq(",", entry.MatchedGlobs);
365+
}
346366
for (const auto& entry : entries.Objects) {
347367
if (entry.Size > 0) {
348368
return entry;
@@ -360,9 +380,8 @@ struct TObjectStorageExternalSource : public IExternalSource {
360380

361381
meta->Attributes.erase("withinfer");
362382

363-
auto fileFormat = NObjectStorage::NInference::ConvertFileFormat(*format);
364-
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat, meta->Attributes));
365-
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));
383+
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, meta->Attributes));
384+
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId));
366385

367386
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<NYql::NS3Lister::TObjectListEntry>& entryFut) {
368387
auto promise = NThreading::NewPromise<TMetadataResult>();
@@ -385,12 +404,19 @@ struct TObjectStorageExternalSource : public IExternalSource {
385404
auto [path, size, _] = entryFut.GetValue();
386405
actorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvInferFileSchema, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
387406
arrowInferencinatorId,
388-
new NObjectStorage::TEvInferFileSchema(TString{path}, size),
407+
new NObjectStorage::TEvInferFileSchema(std::move(path), size),
389408
promise,
390409
std::move(schemaToMetadata)
391410
));
392411

393412
return promise.GetFuture();
413+
}).Apply([arrowInferencinatorId, meta, partByData, partitionedBy, this](const NThreading::TFuture<TMetadataResult>& result) {
414+
auto& value = result.GetValue();
415+
if (!value.Success()) {
416+
return result;
417+
}
418+
419+
return InferPartitionedColumnsTypes(arrowInferencinatorId, partByData, partitionedBy, result);
394420
}).Apply([](const NThreading::TFuture<TMetadataResult>& result) {
395421
auto& value = result.GetValue();
396422
if (value.Success()) {
@@ -405,6 +431,88 @@ struct TObjectStorageExternalSource : public IExternalSource {
405431
}
406432

407433
private:
434+
NThreading::TFuture<TMetadataResult> InferPartitionedColumnsTypes(
435+
NActors::TActorId arrowInferencinatorId,
436+
std::shared_ptr<TStringBuilder> partByData,
437+
const TVector<TString>& partitionedBy,
438+
const NThreading::TFuture<TMetadataResult>& result) const {
439+
440+
auto& value = result.GetValue();
441+
if (partitionedBy.empty()) {
442+
return result;
443+
}
444+
445+
auto meta = value.Metadata;
446+
for (const auto& partitionName : partitionedBy) {
447+
auto& destColumn = *meta->Schema.add_column();
448+
destColumn.mutable_name()->assign(partitionName);
449+
destColumn.mutable_type()->set_type_id(Ydb::Type::UTF8);
450+
}
451+
452+
arrow::BufferBuilder builder;
453+
auto partitionBuffer = std::make_shared<arrow::Buffer>(nullptr, 0);
454+
auto buildStatus = builder.Append(partByData->data(), partByData->size());
455+
auto finishStatus = builder.Finish(&partitionBuffer);
456+
457+
if (!buildStatus.ok() || !finishStatus.ok()) {
458+
return result;
459+
}
460+
461+
auto promise = NThreading::NewPromise<TMetadataResult>();
462+
auto partitionsToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response){
463+
if (response.Status.IsSuccess()) {
464+
THashMap<TString, Ydb::Type> inferredTypes;
465+
for (const auto& column : response.Fields) {
466+
if (ValidateCommonProjectionType(column.type(), column.name()).Empty()) {
467+
inferredTypes[column.name()] = column.type();
468+
}
469+
}
470+
471+
for (auto& destColumn : *meta->Schema.mutable_column()) {
472+
if (auto type = inferredTypes.FindPtr(destColumn.name()); type) {
473+
destColumn.mutable_type()->set_type_id(type->type_id());
474+
}
475+
}
476+
}
477+
TMetadataResult result;
478+
result.SetSuccess();
479+
result.Metadata = meta;
480+
metaPromise.SetValue(std::move(result));
481+
};
482+
483+
auto bufferReader = std::make_shared<arrow::io::BufferReader>(std::move(partitionBuffer));
484+
auto file = std::dynamic_pointer_cast<arrow::io::RandomAccessFile>(bufferReader);
485+
auto config = NObjectStorage::NInference::MakeFormatConfig({{ "format", "csv_with_names" }});
486+
config->ShouldMakeOptional = false;
487+
ActorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvArrowFile, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
488+
arrowInferencinatorId,
489+
new NObjectStorage::TEvArrowFile(config, std::move(file), ""),
490+
promise,
491+
std::move(partitionsToMetadata)
492+
));
493+
494+
return promise.GetFuture();
495+
}
496+
497+
static TVector<TString> GetPartitionedByConfig(std::shared_ptr<TMetadata> meta) {
498+
THashSet<TString> columns;
499+
if (auto partitioned = meta->Attributes.FindPtr("partitionedby"); partitioned) {
500+
NJson::TJsonValue values;
501+
Y_ENSURE(NJson::ReadJsonTree(*partitioned, &values));
502+
Y_ENSURE(values.GetType() == NJson::JSON_ARRAY);
503+
504+
for (const auto& value : values.GetArray()) {
505+
Y_ENSURE(value.GetType() == NJson::JSON_STRING);
506+
if (columns.contains(value.GetString())) {
507+
throw yexception() << "invalid partitioned_by parameter, column " << value.GetString() << "mentioned twice";
508+
}
509+
columns.insert(value.GetString());
510+
}
511+
}
512+
513+
return TVector<TString>{columns.begin(), columns.end()};
514+
}
515+
408516
static bool IsValidIntervalUnit(const TString& unit) {
409517
static constexpr std::array<std::string_view, 7> IntervalUnits = {
410518
"MICROSECONDS"sv,

ydb/core/external_sources/object_storage/events.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/library/actors/core/event_local.h>
99
#include <ydb/library/actors/core/events.h>
1010
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
11+
#include <ydb/core/external_sources/object_storage/inference/infer_config.h>
1112
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
1213
#include <ydb/public/api/protos/ydb_value.pb.h>
1314
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
@@ -109,11 +110,16 @@ struct TEvS3RangeError : public NActors::TEventLocal<TEvS3RangeError, EvS3RangeE
109110
};
110111

111112
struct TEvArrowFile : public NActors::TEventLocal<TEvArrowFile, EvArrowFile> {
112-
TEvArrowFile(std::shared_ptr<arrow::io::RandomAccessFile> file, TString path)
113-
: File{std::move(file)}
113+
TEvArrowFile(
114+
std::shared_ptr<NInference::FormatConfig> config,
115+
std::shared_ptr<arrow::io::RandomAccessFile> file,
116+
TString path)
117+
: Config{std::move(config)}
118+
, File{std::move(file)}
114119
, Path{std::move(path)}
115120
{}
116121

122+
std::shared_ptr<NInference::FormatConfig> Config;
117123
std::shared_ptr<arrow::io::RandomAccessFile> File;
118124
TString Path;
119125
};

0 commit comments

Comments
 (0)