Skip to content

Commit 9e45694

Browse files
merging bugfixes into q-stable-ydb-24-2 (ydb-platform#7655)
1 parent a7ca38e commit 9e45694

File tree

6 files changed

+275
-27
lines changed

6 files changed

+275
-27
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
1414
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
1515
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
16+
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h>
1617
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
1718
#include <ydb/library/yql/providers/s3/proto/credentials.pb.h>
1819
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
@@ -307,15 +308,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
307308
structuredTokenBuilder.SetNoAuth();
308309
}
309310

311+
auto effectiveFilePattern = NYql::NS3::NormalizePath(meta->TableLocation);
312+
if (meta->TableLocation.EndsWith('/')) {
313+
effectiveFilePattern += '*';
314+
}
315+
310316
const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson());
311317
auto httpGateway = NYql::IHTTPGateway::Make();
312318
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
313319
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
314320
.Url = meta->DataSourceLocation,
315321
.Credentials = credentials,
316-
.Pattern = meta->TableLocation,
322+
.Pattern = effectiveFilePattern,
317323
}, Nothing(), false);
318-
auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
324+
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
319325
auto& listRes = listResFut.GetValue();
320326
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
321327
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
@@ -349,13 +355,17 @@ struct TObjectStorageExternalSource : public IExternalSource {
349355
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {
350356
auto promise = NThreading::NewPromise<TMetadataResult>();
351357
auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) {
358+
if (!response.Status.IsSuccess()) {
359+
metaPromise.SetValue(NYql::NCommon::ResultFromError<TMetadataResult>(response.Status.GetIssues()));
360+
return;
361+
}
362+
TMetadataResult result;
352363
meta->Changed = true;
353364
meta->Schema.clear_column();
354365
for (const auto& column : response.Fields) {
355366
auto& destColumn = *meta->Schema.add_column();
356367
destColumn = column;
357368
}
358-
TMetadataResult result;
359369
result.SetSuccess();
360370
result.Metadata = meta;
361371
metaPromise.SetValue(std::move(result));

ydb/core/external_sources/object_storage/events.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
1111
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
1212
#include <ydb/public/api/protos/ydb_value.pb.h>
13+
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
1314

1415
namespace NKikimr::NExternalSource::NObjectStorage {
1516

@@ -128,10 +129,16 @@ struct TEvInferFileSchema : public NActors::TEventLocal<TEvInferFileSchema, EvIn
128129
struct TEvInferredFileSchema : public NActors::TEventLocal<TEvInferredFileSchema, EvInferredFileSchema> {
129130
TEvInferredFileSchema(TString path, std::vector<Ydb::Column>&& fields)
130131
: Path{std::move(path)}
132+
, Status{NYdb::EStatus::SUCCESS, {}}
131133
, Fields{std::move(fields)}
132134
{}
135+
TEvInferredFileSchema(TString path, NYql::TIssues&& issues)
136+
: Path{std::move(path)}
137+
, Status{NYdb::EStatus::INTERNAL_ERROR, std::move(issues)}
138+
{}
133139

134140
TString Path;
141+
NYdb::TStatus Status;
135142
std::vector<Ydb::Column> Fields;
136143
};
137144

ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,24 @@ namespace NKikimr::NExternalSource::NObjectStorage::NInference {
1414

1515
namespace {
1616

17-
bool ArrowToYdbType(Ydb::Type& resType, const arrow::DataType& type) {
17+
bool ShouldBeOptional(const arrow::DataType& type) {
1818
switch (type.id()) {
1919
case arrow::Type::NA:
20-
resType.set_null_type(google::protobuf::NullValue::NULL_VALUE);
20+
case arrow::Type::STRING:
21+
case arrow::Type::BINARY:
22+
case arrow::Type::LARGE_BINARY:
23+
case arrow::Type::FIXED_SIZE_BINARY:
24+
return false;
25+
default:
26+
return true;
27+
}
28+
}
29+
30+
bool ArrowToYdbType(Ydb::Type& maybeOptionalType, const arrow::DataType& type) {
31+
auto& resType = ShouldBeOptional(type) ? *maybeOptionalType.mutable_optional_type()->mutable_item() : maybeOptionalType;
32+
switch (type.id()) {
33+
case arrow::Type::NA:
34+
resType.set_type_id(Ydb::Type::UTF8);
2135
return true;
2236
case arrow::Type::BOOL:
2337
resType.set_type_id(Ydb::Type::BOOL);
@@ -139,6 +153,14 @@ bool ArrowToYdbType(Ydb::Type& resType, const arrow::DataType& type) {
139153
}
140154
return false;
141155
}
156+
157+
TEvInferredFileSchema* MakeErrorSchema(TString path, NFq::TIssuesIds::EIssueCode code, TString message) {
158+
NYql::TIssues issues;
159+
issues.AddIssue(std::move(message));
160+
issues.back().SetCode(code, NYql::TSeverityIds::S_ERROR);
161+
return new TEvInferredFileSchema{std::move(path), std::move(issues)};
162+
}
163+
142164
}
143165

144166
struct FormatConfig {
@@ -167,14 +189,14 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand
167189
.Value(&reader);
168190

169191
if (!readerStatus.ok()) {
170-
return TString{TStringBuilder{} << "couldn't make table from data: " << readerStatus.ToString()};
192+
return TString{TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString()};
171193
}
172194

173195
std::shared_ptr<arrow::Table> table;
174196
auto tableRes = reader->Read().Value(&table);
175197

176198
if (!tableRes.ok()) {
177-
return TStringBuilder{} << "couldn't read table from data: " << readerStatus.ToString();
199+
return TStringBuilder{} << "couldn't parse csv/tsv file, check format and compression params: " << readerStatus.ToString();
178200
}
179201

180202
return table->fields();
@@ -245,7 +267,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
245267
auto& file = *ev->Get();
246268
auto mbArrowFields = InferType(Format_, file.File, *Config_);
247269
if (std::holds_alternative<TString>(mbArrowFields)) {
248-
ctx.Send(RequesterId_, MakeError(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
270+
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
249271
return;
250272
}
251273

@@ -255,7 +277,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
255277
ydbFields.emplace_back();
256278
auto& ydbField = ydbFields.back();
257279
if (!ArrowToYdbType(*ydbField.mutable_type(), *field->type())) {
258-
ctx.Send(RequesterId_, MakeError(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
280+
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
259281
return;
260282
}
261283
ydbField.mutable_name()->assign(field->name());
@@ -265,7 +287,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
265287

266288
void HandleFileError(TEvFileError::TPtr& ev, const NActors::TActorContext& ctx) {
267289
Cout << "TArrowInferencinator::HandleFileError" << Endl;
268-
ctx.Send(RequesterId_, ev->Release());
290+
ctx.Send(RequesterId_, new TEvInferredFileSchema(ev->Get()->Path, std::move(ev->Get()->Issues)));
269291
}
270292

271293
private:

ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,16 @@ TEST_F(ArrowInferenceTest, csv_simple) {
9393
ASSERT_NE(response, nullptr);
9494

9595
auto& fields = response->Fields;
96-
ASSERT_TRUE(fields[0].type().has_type_id());
97-
ASSERT_EQ(response->Fields[0].type().type_id(), Ydb::Type::INT64);
98-
ASSERT_EQ(response->Fields[0].name(), "A");
96+
ASSERT_TRUE(fields[0].type().optional_type().item().has_type_id());
97+
ASSERT_EQ(fields[0].type().optional_type().item().type_id(), Ydb::Type::INT64);
98+
ASSERT_EQ(fields[0].name(), "A");
9999

100100
ASSERT_TRUE(fields[1].type().has_type_id());
101101
ASSERT_EQ(fields[1].type().type_id(), Ydb::Type::UTF8);
102102
ASSERT_EQ(fields[1].name(), "B");
103103

104-
ASSERT_TRUE(fields[2].type().has_type_id());
105-
ASSERT_EQ(fields[2].type().type_id(), Ydb::Type::DOUBLE);
104+
ASSERT_TRUE(fields[2].type().optional_type().item().has_type_id());
105+
ASSERT_EQ(fields[2].type().optional_type().item().type_id(), Ydb::Type::DOUBLE);
106106
ASSERT_EQ(fields[2].name(), "C");
107107
}
108108

@@ -129,16 +129,16 @@ TEST_F(ArrowInferenceTest, tsv_simple) {
129129
ASSERT_NE(response, nullptr);
130130

131131
auto& fields = response->Fields;
132-
ASSERT_TRUE(fields[0].type().has_type_id());
133-
ASSERT_EQ(response->Fields[0].type().type_id(), Ydb::Type::INT64);
134-
ASSERT_EQ(response->Fields[0].name(), "A");
132+
ASSERT_TRUE(fields[0].type().optional_type().item().has_type_id());
133+
ASSERT_EQ(fields[0].type().optional_type().item().type_id(), Ydb::Type::INT64);
134+
ASSERT_EQ(fields[0].name(), "A");
135135

136136
ASSERT_TRUE(fields[1].type().has_type_id());
137137
ASSERT_EQ(fields[1].type().type_id(), Ydb::Type::UTF8);
138138
ASSERT_EQ(fields[1].name(), "B");
139139

140-
ASSERT_TRUE(fields[2].type().has_type_id());
141-
ASSERT_EQ(fields[2].type().type_id(), Ydb::Type::DOUBLE);
140+
ASSERT_TRUE(fields[2].type().optional_type().item().has_type_id());
141+
ASSERT_EQ(fields[2].type().optional_type().item().type_id(), Ydb::Type::DOUBLE);
142142
ASSERT_EQ(fields[2].name(), "C");
143143
}
144144

ydb/core/kqp/gateway/kqp_metadata_loader.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,6 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
282282
tableMeta->ExternalSource.DataSourceAuth = description.GetAuth();
283283
tableMeta->ExternalSource.Properties = description.GetProperties();
284284
tableMeta->ExternalSource.DataSourcePath = tableName;
285-
tableMeta->ExternalSource.TableLocation = JoinPath(entry.Path);
286285
return result;
287286
}
288287

@@ -822,14 +821,14 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
822821

823822
switch (entry.Kind) {
824823
case EKind::KindExternalDataSource: {
825-
if (externalPath) {
826-
entry.Path = SplitPath(*externalPath);
827-
}
828824
auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
829825
if (!externalDataSourceMetadata.Success() || !settings.RequestAuthInfo_) {
830826
promise.SetValue(externalDataSourceMetadata);
831827
return;
832828
}
829+
if (externalPath) {
830+
externalDataSourceMetadata.Metadata->ExternalSource.TableLocation = *externalPath;
831+
}
833832
LoadExternalDataSourceSecretValues(entry, userToken, ActorSystem)
834833
.Subscribe([promise, externalDataSourceMetadata, settings](const TFuture<TEvDescribeSecretsResponse::TDescription>& result) mutable
835834
{

0 commit comments

Comments
 (0)