Skip to content

Commit c5304ff

Browse files
inference params support (#8252)
1 parent efd441b commit c5304ff

File tree

17 files changed

+492
-181
lines changed

17 files changed

+492
-181
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 123 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
#include "validation_functions.h"
44
#include "object_storage/s3_fetcher.h"
55

6+
#include <util/string/join.h>
67
#include <ydb/core/external_sources/object_storage/inference/arrow_fetcher.h>
78
#include <ydb/core/external_sources/object_storage/inference/arrow_inferencinator.h>
9+
#include <ydb/core/external_sources/object_storage/inference/infer_config.h>
810
#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
911
#include <ydb/core/protos/external_sources.pb.h>
1012
#include <ydb/core/protos/flat_scheme_op.pb.h>
@@ -20,6 +22,10 @@
2022
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
2123

2224
#include <library/cpp/scheme/scheme.h>
25+
#include <library/cpp/json/json_reader.h>
26+
#include <arrow/buffer_builder.h>
27+
#include <arrow/buffer.h>
28+
#include <arrow/io/memory.h>
2329

2430
#include <util/string/builder.h>
2531

@@ -322,21 +328,29 @@ struct TObjectStorageExternalSource : public IExternalSource {
322328
structuredTokenBuilder.SetNoAuth();
323329
}
324330

325-
auto effectiveFilePattern = NYql::NS3::NormalizePath(meta->TableLocation);
326-
if (meta->TableLocation.EndsWith('/')) {
327-
effectiveFilePattern += '*';
328-
}
329-
330331
const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson());
332+
333+
const TString path = meta->TableLocation;
334+
const TString filePattern = meta->Attributes.Value("filepattern", TString{});
335+
const TVector<TString> partitionedBy = GetPartitionedByConfig(meta);
336+
NYql::NS3Lister::TListingRequest request {
337+
.Url = meta->DataSourceLocation,
338+
.Credentials = credentials
339+
};
340+
341+
auto error = NYql::NS3::BuildS3FilePattern(path, filePattern, partitionedBy, request);
342+
if (error) {
343+
throw yexception() << *error;
344+
}
345+
346+
auto partByData = std::make_shared<TStringBuilder>();
347+
331348
auto httpGateway = NYql::IHTTPGateway::Make();
332349
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
333-
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
334-
.Url = meta->DataSourceLocation,
335-
.Credentials = credentials,
336-
.Pattern = effectiveFilePattern,
337-
}, Nothing(), AllowLocalFiles, ActorSystem);
338-
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
350+
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, request, Nothing(), AllowLocalFiles, ActorSystem);
351+
auto afterListing = s3Lister->Next().Apply([partByData, partitionedBy, path = request.Pattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
339352
auto& listRes = listResFut.GetValue();
353+
auto& partByRef = *partByData;
340354
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
341355
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
342356
throw yexception() << error.Issues.ToString();
@@ -345,6 +359,12 @@ struct TObjectStorageExternalSource : public IExternalSource {
345359
if (entries.Objects.empty()) {
346360
throw yexception() << "couldn't find files at " << path;
347361
}
362+
363+
partByRef << JoinSeq(",", partitionedBy);
364+
for (const auto& entry : entries.Objects) {
365+
Y_ENSURE(entry.MatchedGlobs.size() == partitionedBy.size());
366+
partByRef << Endl << JoinSeq(",", entry.MatchedGlobs);
367+
}
348368
for (const auto& entry : entries.Objects) {
349369
if (entry.Size > 0) {
350370
return entry;
@@ -362,9 +382,8 @@ struct TObjectStorageExternalSource : public IExternalSource {
362382

363383
meta->Attributes.erase("withinfer");
364384

365-
auto fileFormat = NObjectStorage::NInference::ConvertFileFormat(*format);
366-
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat, meta->Attributes));
367-
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));
385+
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, meta->Attributes));
386+
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId));
368387

369388
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<NYql::NS3Lister::TObjectListEntry>& entryFut) {
370389
auto promise = NThreading::NewPromise<TMetadataResult>();
@@ -387,12 +406,19 @@ struct TObjectStorageExternalSource : public IExternalSource {
387406
auto [path, size, _] = entryFut.GetValue();
388407
actorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvInferFileSchema, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
389408
arrowInferencinatorId,
390-
new NObjectStorage::TEvInferFileSchema(TString{path}, size),
409+
new NObjectStorage::TEvInferFileSchema(std::move(path), size),
391410
promise,
392411
std::move(schemaToMetadata)
393412
));
394413

395414
return promise.GetFuture();
415+
}).Apply([arrowInferencinatorId, meta, partByData, partitionedBy, this](const NThreading::TFuture<TMetadataResult>& result) {
416+
auto& value = result.GetValue();
417+
if (!value.Success()) {
418+
return result;
419+
}
420+
421+
return InferPartitionedColumnsTypes(arrowInferencinatorId, partByData, partitionedBy, result);
396422
}).Apply([](const NThreading::TFuture<TMetadataResult>& result) {
397423
auto& value = result.GetValue();
398424
if (value.Success()) {
@@ -407,6 +433,88 @@ struct TObjectStorageExternalSource : public IExternalSource {
407433
}
408434

409435
private:
436+
NThreading::TFuture<TMetadataResult> InferPartitionedColumnsTypes(
437+
NActors::TActorId arrowInferencinatorId,
438+
std::shared_ptr<TStringBuilder> partByData,
439+
const TVector<TString>& partitionedBy,
440+
const NThreading::TFuture<TMetadataResult>& result) const {
441+
442+
auto& value = result.GetValue();
443+
if (partitionedBy.empty()) {
444+
return result;
445+
}
446+
447+
auto meta = value.Metadata;
448+
for (const auto& partitionName : partitionedBy) {
449+
auto& destColumn = *meta->Schema.add_column();
450+
destColumn.mutable_name()->assign(partitionName);
451+
destColumn.mutable_type()->set_type_id(Ydb::Type::UTF8);
452+
}
453+
454+
arrow::BufferBuilder builder;
455+
auto partitionBuffer = std::make_shared<arrow::Buffer>(nullptr, 0);
456+
auto buildStatus = builder.Append(partByData->data(), partByData->size());
457+
auto finishStatus = builder.Finish(&partitionBuffer);
458+
459+
if (!buildStatus.ok() || !finishStatus.ok()) {
460+
return result;
461+
}
462+
463+
auto promise = NThreading::NewPromise<TMetadataResult>();
464+
auto partitionsToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response){
465+
if (response.Status.IsSuccess()) {
466+
THashMap<TString, Ydb::Type> inferredTypes;
467+
for (const auto& column : response.Fields) {
468+
if (ValidateCommonProjectionType(column.type(), column.name()).Empty()) {
469+
inferredTypes[column.name()] = column.type();
470+
}
471+
}
472+
473+
for (auto& destColumn : *meta->Schema.mutable_column()) {
474+
if (auto type = inferredTypes.FindPtr(destColumn.name()); type) {
475+
destColumn.mutable_type()->set_type_id(type->type_id());
476+
}
477+
}
478+
}
479+
TMetadataResult result;
480+
result.SetSuccess();
481+
result.Metadata = meta;
482+
metaPromise.SetValue(std::move(result));
483+
};
484+
485+
auto bufferReader = std::make_shared<arrow::io::BufferReader>(std::move(partitionBuffer));
486+
auto file = std::dynamic_pointer_cast<arrow::io::RandomAccessFile>(bufferReader);
487+
auto config = NObjectStorage::NInference::MakeFormatConfig({{ "format", "csv_with_names" }});
488+
config->ShouldMakeOptional = false;
489+
ActorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvArrowFile, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
490+
arrowInferencinatorId,
491+
new NObjectStorage::TEvArrowFile(config, std::move(file), ""),
492+
promise,
493+
std::move(partitionsToMetadata)
494+
));
495+
496+
return promise.GetFuture();
497+
}
498+
499+
static TVector<TString> GetPartitionedByConfig(std::shared_ptr<TMetadata> meta) {
500+
THashSet<TString> columns;
501+
if (auto partitioned = meta->Attributes.FindPtr("partitionedby"); partitioned) {
502+
NJson::TJsonValue values;
503+
Y_ENSURE(NJson::ReadJsonTree(*partitioned, &values));
504+
Y_ENSURE(values.GetType() == NJson::JSON_ARRAY);
505+
506+
for (const auto& value : values.GetArray()) {
507+
Y_ENSURE(value.GetType() == NJson::JSON_STRING);
508+
if (columns.contains(value.GetString())) {
509+
throw yexception() << "invalid partitioned_by parameter, column " << value.GetString() << "mentioned twice";
510+
}
511+
columns.insert(value.GetString());
512+
}
513+
}
514+
515+
return TVector<TString>{columns.begin(), columns.end()};
516+
}
517+
410518
static bool IsValidIntervalUnit(const TString& unit) {
411519
static constexpr std::array<std::string_view, 7> IntervalUnits = {
412520
"MICROSECONDS"sv,

ydb/core/external_sources/object_storage/events.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/library/actors/core/event_local.h>
99
#include <ydb/library/actors/core/events.h>
1010
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
11+
#include <ydb/core/external_sources/object_storage/inference/infer_config.h>
1112
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
1213
#include <ydb/public/api/protos/ydb_value.pb.h>
1314
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
@@ -109,11 +110,16 @@ struct TEvS3RangeError : public NActors::TEventLocal<TEvS3RangeError, EvS3RangeE
109110
};
110111

111112
struct TEvArrowFile : public NActors::TEventLocal<TEvArrowFile, EvArrowFile> {
112-
TEvArrowFile(std::shared_ptr<arrow::io::RandomAccessFile> file, TString path)
113-
: File{std::move(file)}
113+
TEvArrowFile(
114+
std::shared_ptr<NInference::FormatConfig> config,
115+
std::shared_ptr<arrow::io::RandomAccessFile> file,
116+
TString path)
117+
: Config{std::move(config)}
118+
, File{std::move(file)}
114119
, Path{std::move(path)}
115120
{}
116121

122+
std::shared_ptr<NInference::FormatConfig> Config;
117123
std::shared_ptr<arrow::io::RandomAccessFile> File;
118124
TString Path;
119125
};

ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "arrow_fetcher.h"
22
#include "arrow_inferencinator.h"
3+
#include "infer_config.h"
34

45
#include <arrow/buffer.h>
56
#include <arrow/buffer_builder.h>
@@ -27,11 +28,11 @@ namespace NKikimr::NExternalSource::NObjectStorage::NInference {
2728
class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher> {
2829
static constexpr uint64_t PrefixSize = 10_MB;
2930
public:
30-
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params)
31+
TArrowFileFetcher(NActors::TActorId s3FetcherId, const THashMap<TString, TString>& params)
3132
: S3FetcherId_{s3FetcherId}
32-
, Format_{format}
33+
, Config_{MakeFormatConfig(params)}
3334
{
34-
Y_ABORT_UNLESS(IsArrowInferredFormat(Format_));
35+
Y_ABORT_UNLESS(IsArrowInferredFormat(Config_->Format));
3536

3637
auto decompression = params.FindPtr("compression");
3738
if (decompression) {
@@ -58,7 +59,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
5859
.MetadataRequest = false,
5960
};
6061

61-
switch (Format_) {
62+
switch (Config_->Format) {
6263
case EFileFormat::CsvWithNames:
6364
case EFileFormat::TsvWithNames:
6465
case EFileFormat::JsonEachRow:
@@ -72,7 +73,11 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
7273
break;
7374
}
7475
default: {
75-
ctx.Send(localRequest.Requester, MakeError(localRequest.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "unsupported format for inference: " << ConvertFileFormat(Format_)));
76+
ctx.Send(localRequest.Requester, MakeError(
77+
localRequest.Path,
78+
NFq::TIssuesIds::UNSUPPORTED,
79+
TStringBuilder{} << "unsupported format for inference: " << ConvertFileFormat(Config_->Format))
80+
);
7681
return;
7782
}
7883
case EFileFormat::Undefined:
@@ -97,16 +102,11 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
97102
}
98103

99104
std::shared_ptr<arrow::io::RandomAccessFile> file;
100-
switch (Format_) {
105+
switch (Config_->Format) {
101106
case EFileFormat::CsvWithNames:
102107
case EFileFormat::TsvWithNames: {
103-
// TODO: obtain from request
104-
arrow::csv::ParseOptions options;
105-
if (Format_ == EFileFormat::TsvWithNames) {
106-
options.delimiter = '\t';
107-
}
108-
file = CleanupCsvFile(data, request, options, ctx);
109-
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
108+
file = CleanupCsvFile(data, request, std::dynamic_pointer_cast<CsvConfig>(Config_)->ParseOpts, ctx);
109+
ctx.Send(request.Requester, new TEvArrowFile(Config_, std::move(file), request.Path));
110110
break;
111111
}
112112
case EFileFormat::Parquet: {
@@ -115,13 +115,13 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
115115
return;
116116
}
117117
file = BuildParquetFileFromMetadata(data, request, ctx);
118-
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
118+
ctx.Send(request.Requester, new TEvArrowFile(Config_, std::move(file), request.Path));
119119
break;
120120
}
121121
case EFileFormat::JsonEachRow:
122122
case EFileFormat::JsonList: {
123-
file = CleanupJsonFile(data, request, arrow::json::ParseOptions::Defaults(), ctx);
124-
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
123+
file = CleanupJsonFile(data, request, std::dynamic_pointer_cast<JsonConfig>(Config_)->ParseOpts, ctx);
124+
ctx.Send(request.Requester, new TEvArrowFile(Config_, std::move(file), request.Path));
125125
break;
126126
}
127127
case EFileFormat::Undefined:
@@ -171,9 +171,10 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
171171
}
172172

173173
void HandleAsRAFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx) {
174+
auto format = Config_->Format;
174175
auto error = MakeError(
175176
insertedRequest.Path, NFq::TIssuesIds::UNSUPPORTED,
176-
TStringBuilder{} << "got unsupported format: " << ConvertFileFormat(Format_) << '(' << static_cast<ui32>(Format_) << ')'
177+
TStringBuilder{} << "got unsupported format: " << ConvertFileFormat(format) << '(' << static_cast<ui32>(format) << ')'
177178
);
178179
SendError(ctx, error);
179180
}
@@ -280,7 +281,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
280281
std::shared_ptr<arrow::Buffer> whole, partial;
281282
auto arrowData = BuildBufferFromData(data, request, ctx);
282283

283-
if (Format_ == EFileFormat::JsonList) {
284+
if (Config_->Format == EFileFormat::JsonList) {
284285
auto empty = std::make_shared<arrow::Buffer>(nullptr, 0);
285286
int64_t count = 1;
286287
auto status = chunker->ProcessSkip(empty, arrowData, false, &count, &whole);
@@ -353,12 +354,12 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
353354

354355
// Fields
355356
NActors::TActorId S3FetcherId_;
356-
EFileFormat Format_;
357+
std::shared_ptr<FormatConfig> Config_;
357358
TMaybe<TString> DecompressionFormat_;
358359
std::unordered_map<TString, TRequest> InflightRequests_; // Path -> Request
359360
};
360361

361-
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params) {
362-
return new TArrowFileFetcher{s3FetcherId, format, params};
362+
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, const THashMap<TString, TString>& params) {
363+
return new TArrowFileFetcher{s3FetcherId, params};
363364
}
364365
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference

ydb/core/external_sources/object_storage/inference/arrow_fetcher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55

66
namespace NKikimr::NExternalSource::NObjectStorage::NInference {
77

8-
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params);
8+
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, const THashMap<TString, TString>& params);
99
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference

0 commit comments

Comments
 (0)