Skip to content

Commit 4adbae4

Browse files
merging bugfixes and some inference updates into stable (ydb-platform#9640)
1 parent 6ee50a5 commit 4adbae4

38 files changed

+447
-278
lines changed

ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -126,32 +126,10 @@ bool ArrowToYdbType(Ydb::Type& maybeOptionalType, const arrow::DataType& type, s
126126
case arrow::Type::LIST: { // TODO: is ok?
127127
return false;
128128
}
129-
case arrow::Type::STRUCT: { // TODO: is ok?
130-
auto& structType = *resType.mutable_struct_type();
131-
for (const auto& field : type.fields()) {
132-
auto& member = *structType.add_members();
133-
auto& memberType = *member.mutable_type();
134-
if (!ArrowToYdbType(memberType, *field->type(), config)) {
135-
return false;
136-
}
137-
member.mutable_name()->assign(field->name().data(), field->name().size());
138-
}
139-
return true;
140-
}
129+
case arrow::Type::STRUCT:
141130
case arrow::Type::SPARSE_UNION:
142-
case arrow::Type::DENSE_UNION: { // TODO: is ok?
143-
auto& variant = *resType.mutable_variant_type()->mutable_struct_items();
144-
for (const auto& field : type.fields()) {
145-
auto& member = *variant.add_members();
146-
if (!ArrowToYdbType(*member.mutable_type(), *field->type(), config)) {
147-
return false;
148-
}
149-
if (field->name().empty()) {
150-
return false;
151-
}
152-
member.mutable_name()->assign(field->name().data(), field->name().size());
153-
}
154-
return true;
131+
case arrow::Type::DENSE_UNION: {
132+
return false;
155133
}
156134
case arrow::Type::DICTIONARY: // TODO: is representable?
157135
return false;
@@ -325,14 +303,15 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
325303
auto& arrowFields = std::get<ArrowFields>(mbArrowFields);
326304
std::vector<Ydb::Column> ydbFields;
327305
for (const auto& field : arrowFields) {
328-
ydbFields.emplace_back();
329-
auto& ydbField = ydbFields.back();
330-
if (!ArrowToYdbType(*ydbField.mutable_type(), *field->type(), file.Config)) {
331-
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
332-
RequesterId_ = {};
333-
return;
306+
Ydb::Column column;
307+
if (!ArrowToYdbType(*column.mutable_type(), *field->type(), file.Config)) {
308+
continue;
309+
}
310+
if (field->name().empty()) {
311+
continue;
334312
}
335-
ydbField.mutable_name()->assign(field->name());
313+
column.mutable_name()->assign(field->name());
314+
ydbFields.push_back(column);
336315
}
337316

338317
ctx.Send(RequesterId_, new TEvInferredFileSchema(file.Path, std::move(ydbFields)));

ydb/core/fq/libs/actors/database_resolver.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#include <ydb/core/fq/libs/common/cache.h>
55
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
66
#include <ydb/core/fq/libs/events/events.h>
7-
#include <ydb/core/fq/libs/exceptions/exceptions.h>
7+
#include <ydb/library/yql/utils/exceptions.h>
88
#include <ydb/core/util/tuples.h>
99
#include <ydb/library/services/services.pb.h>
1010
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
@@ -213,11 +213,13 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
213213
DatabaseId2Description[std::make_pair(params.Id, params.DatabaseType)] = description;
214214
result.ConstructInPlace(description);
215215
return "";
216-
} catch (const TCodeLineException& ex) {
216+
} catch (const NYql::TCodeLineException& ex) {
217+
LOG_E("ResponseProcessor::Handle(HttpIncomingResponse): " << ex.what());
217218
return TStringBuilder()
218219
<< "response parser error: " << params.ToDebugString() << Endl
219220
<< ex.GetRawMessage();
220221
} catch (...) {
222+
LOG_E("ResponseProcessor::Handle(HttpIncomingResponse): " << CurrentExceptionMessage());
221223
return TStringBuilder()
222224
<< "response parser error: " << params.ToDebugString() << Endl
223225
<< CurrentExceptionMessage();
@@ -359,7 +361,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
359361
}
360362

361363
if (aliveHosts.empty()) {
362-
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE ClickHouse hosts found";
364+
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE ClickHouse hosts found";
363365
}
364366

365367
NYql::IMdbEndpointGenerator::TParams params = {
@@ -407,7 +409,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
407409
}
408410

409411
if (aliveHosts.empty()) {
410-
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE PostgreSQL hosts found";
412+
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE PostgreSQL hosts found";
411413
}
412414

413415
NYql::IMdbEndpointGenerator::TParams params = {
@@ -445,7 +447,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
445447
}
446448

447449
if (aliveHost == "") {
448-
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE Greenplum hosts found";
450+
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE Greenplum hosts found";
449451
}
450452

451453
NYql::IMdbEndpointGenerator::TParams params = {
@@ -495,7 +497,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
495497
}
496498

497499
if (aliveHosts.empty()) {
498-
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
500+
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
499501
}
500502

501503
NYql::IMdbEndpointGenerator::TParams params = {

ydb/core/fq/libs/actors/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ PEERDIR(
4343
ydb/core/fq/libs/db_id_async_resolver_impl
4444
ydb/core/fq/libs/db_schema
4545
ydb/core/fq/libs/events
46-
ydb/core/fq/libs/exceptions
4746
ydb/core/fq/libs/grpc
4847
ydb/core/fq/libs/private_client
4948
ydb/core/fq/libs/rate_limiter/utils

ydb/core/fq/libs/config/yq_issue.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,28 @@
55

66
namespace NFq {
77

8-
NYql::TIssue MakeFatalIssue(TIssuesIds::EIssueCode id, const TString& message) {
8+
NYql::TIssue MakeFatalIssue(ui32 id, const TString& message) {
99
NYql::TIssue issue;
1010
issue.SetCode(id, NYql::TSeverityIds::S_FATAL);
1111
issue.SetMessage(message);
1212
return issue;
1313
}
1414

15-
NYql::TIssue MakeErrorIssue(TIssuesIds::EIssueCode id, const TString& message) {
15+
NYql::TIssue MakeErrorIssue(ui32 id, const TString& message) {
1616
NYql::TIssue issue;
1717
issue.SetCode(id, NYql::TSeverityIds::S_ERROR);
1818
issue.SetMessage(message);
1919
return issue;
2020
}
2121

22-
NYql::TIssue MakeWarningIssue(TIssuesIds::EIssueCode id, const TString& message) {
22+
NYql::TIssue MakeWarningIssue(ui32 id, const TString& message) {
2323
NYql::TIssue issue;
2424
issue.SetCode(id, NYql::TSeverityIds::S_WARNING);
2525
issue.SetMessage(message);
2626
return issue;
2727
}
2828

29-
NYql::TIssue MakeInfoIssue(TIssuesIds::EIssueCode id, const TString& message) {
29+
NYql::TIssue MakeInfoIssue(ui32 id, const TString& message) {
3030
NYql::TIssue issue;
3131
issue.SetCode(id, NYql::TSeverityIds::S_INFO);
3232
issue.SetMessage(message);

ydb/core/fq/libs/config/yq_issue.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66

77
namespace NFq {
88

9-
NYql::TIssue MakeFatalIssue(TIssuesIds::EIssueCode id, const TString& message);
9+
NYql::TIssue MakeFatalIssue(ui32 id, const TString& message);
1010

11-
NYql::TIssue MakeErrorIssue(TIssuesIds::EIssueCode id, const TString& message);
11+
NYql::TIssue MakeErrorIssue(ui32 id, const TString& message);
1212

13-
NYql::TIssue MakeWarningIssue(TIssuesIds::EIssueCode id, const TString& message);
13+
NYql::TIssue MakeWarningIssue(ui32 id, const TString& message);
1414

15-
NYql::TIssue MakeInfoIssue(TIssuesIds::EIssueCode id, const TString& message);
15+
NYql::TIssue MakeInfoIssue(ui32 id, const TString& message);
1616

1717
}

ydb/core/fq/libs/control_plane_storage/extractors.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ TValidationQuery CreateEntityExtractor(const TString& scope,
2929
auto validator = [response, entityColumnName, parseProtobufError](NYdb::NTable::TDataQueryResult result) {
3030
const auto& resultSets = result.GetResultSets();
3131
if (resultSets.size() != 1) {
32-
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "internal error, result set size is not equal to 1 but equal " << resultSets.size();
32+
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "internal error, result set size is not equal to 1 but equal " << resultSets.size();
3333
}
3434

3535
NYdb::TResultSetParser parser(resultSets.back());
@@ -39,7 +39,7 @@ TValidationQuery CreateEntityExtractor(const TString& scope,
3939

4040
if (!response->second.Before.ConstructInPlace().ParseFromString(*parser.ColumnParser(entityColumnName).GetOptionalString())) {
4141
parseProtobufError->Inc();
42-
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message. Please contact internal support";
42+
ythrow NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message. Please contact internal support";
4343
}
4444
return false;
4545
};

ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,13 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam
161161

162162
if (!task.Query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) {
163163
commonCounters->ParseProtobufError->Inc();
164-
throw TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support";
164+
throw NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support";
165165
}
166166
const TInstant deadline = TInstant::Now() + (task.Query.content().automatic() ? std::min(automaticQueriesTtl, resultSetsTtl) : resultSetsTtl);
167167
task.Deadline = deadline;
168168
if (!task.Internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
169169
commonCounters->ParseProtobufError->Inc();
170-
throw TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support";
170+
throw NYql::TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support";
171171
}
172172

173173
*task.Internal.mutable_result_ttl() = NProtoInterop::CastToProto(resultSetsTtl);

0 commit comments

Comments
 (0)