Skip to content

Commit bce2fdd

Browse files
authored
YDB FQ: handle exception in YQL Generic Provider when IAM service is not available (#9092)
1 parent e0463f5 commit bce2fdd

File tree

5 files changed

+114
-48
lines changed

5 files changed

+114
-48
lines changed

ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
7878
dsi.Setdatabase("some_db");
7979
dsi.Setuse_tls(true);
8080
dsi.set_protocol(::NYql::NConnector::NApi::EProtocol::NATIVE);
81-
auto token = dsi.mutable_credentials() -> mutable_token();
81+
auto token = dsi.mutable_credentials()->mutable_token();
8282
token->Settype("IAM");
8383
token->Setvalue("TEST_TOKEN");
8484

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,14 @@ namespace NYql::NDq {
152152
Y_ABORT_UNLESS(response.splits_size() == 1);
153153
auto& split = response.splits(0);
154154
NConnector::NApi::TReadSplitsRequest readRequest;
155-
*readRequest.mutable_data_source_instance() = GetDataSourceInstanceWithToken();
155+
156+
*readRequest.mutable_data_source_instance() = LookupSource.data_source_instance();
157+
auto error = TokenProvider->MaybeFillToken(*readRequest.mutable_data_source_instance());
158+
if (error) {
159+
SendError(TActivationContext::ActorSystem(), SelfId(), std::move(error));
160+
return;
161+
}
162+
156163
*readRequest.add_splits() = split;
157164
readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
158165
Connector->ReadSplits(readRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
@@ -194,9 +201,16 @@ namespace NYql::NDq {
194201
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys";
195202
Y_ABORT_IF(InProgress);
196203
Y_ABORT_IF(request.size() == 0 || request.size() > MaxKeysInRequest);
204+
197205
Request = std::move(request);
198206
NConnector::NApi::TListSplitsRequest splitRequest;
199-
*splitRequest.add_selects() = CreateSelect();
207+
208+
auto error = FillSelect(*splitRequest.add_selects());
209+
if (error) {
210+
SendError(TActivationContext::ActorSystem(), SelfId(), std::move(error));
211+
return;
212+
};
213+
200214
splitRequest.Setmax_split_count(1);
201215
Connector->ListSplits(splitRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
202216
auto result = ExtractFromConstFuture(asyncResult);
@@ -285,6 +299,12 @@ namespace NYql::NDq {
285299
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
286300
}
287301

302+
static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, TString error) {
303+
NConnector::NApi::TError dst;
304+
*dst.mutable_message() = error;
305+
SendError(actorSystem, selfId, std::move(dst));
306+
}
307+
288308
private:
289309
enum class EColumnDestination {
290310
Key,
@@ -314,17 +334,13 @@ namespace NYql::NDq {
314334
return result;
315335
}
316336

317-
NYql::NConnector::NApi::TDataSourceInstance GetDataSourceInstanceWithToken() const {
337+
TString FillSelect(NConnector::NApi::TSelect& select) {
318338
auto dsi = LookupSource.data_source_instance();
319-
// Note: returned token may be stale and we have no way to check or recover here
320-
// Consider to redesign ICredentialsProvider
321-
TokenProvider->MaybeFillToken(dsi);
322-
return dsi;
323-
}
324-
325-
NConnector::NApi::TSelect CreateSelect() {
326-
NConnector::NApi::TSelect select;
327-
*select.mutable_data_source_instance() = GetDataSourceInstanceWithToken();
339+
auto error = TokenProvider->MaybeFillToken(dsi);
340+
if (error) {
341+
return error;
342+
}
343+
*select.mutable_data_source_instance() = dsi;
328344

329345
for (ui32 i = 0; i != SelectResultType->GetMembersCount(); ++i) {
330346
auto c = select.mutable_what()->add_items()->mutable_column();
@@ -349,7 +365,7 @@ namespace NYql::NDq {
349365
*disjunction.mutable_operands()->Add()->mutable_conjunction() = conjunction;
350366
}
351367
*select.mutable_where()->mutable_filter_typed()->mutable_disjunction() = disjunction;
352-
return select;
368+
return {};
353369
}
354370

355371
private:

ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,14 @@ namespace NYql::NDq {
5959

6060
void Bootstrap() {
6161
Become(&TGenericReadActor::StateFunc);
62-
InitSplitsListing();
62+
auto issue = InitSplitsListing();
63+
if (issue) {
64+
return NotifyComputeActorWithIssue(
65+
TActivationContext::ActorSystem(),
66+
ComputeActorId_,
67+
InputIndex_,
68+
std::move(*issue));
69+
};
6370
}
6471

6572
static constexpr char ActorName[] = "GENERIC_READ_ACTOR";
@@ -79,13 +86,18 @@ namespace NYql::NDq {
7986

8087
// ListSplits
8188

82-
void InitSplitsListing() {
89+
TMaybe<TIssue> InitSplitsListing() {
8390
YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits listing";
8491

8592
// Prepare request
8693
NConnector::NApi::TListSplitsRequest request;
8794
NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source
88-
TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance());
95+
96+
auto error = TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance());
97+
if (error) {
98+
return TIssue(error);
99+
}
100+
89101
*request.mutable_selects()->Add() = std::move(select);
90102

91103
// Initialize stream
@@ -100,6 +112,8 @@ namespace NYql::NDq {
100112
TEvListSplitsIterator>(
101113
actorSystem, selfId, computeActorId, inputIndex, future);
102114
});
115+
116+
return Nothing();
103117
}
104118

105119
void Handle(TEvListSplitsIterator::TPtr& ev) {
@@ -145,7 +159,16 @@ namespace NYql::NDq {
145159
// Server sent EOF, now we are ready to start splits reading
146160
if (NConnector::GrpcStatusEndOfStream(status)) {
147161
YQL_CLOG(DEBUG, ProviderGeneric) << "Handle :: EvListSplitsFinished :: last message was reached, start data reading";
148-
return InitSplitsReading();
162+
auto issue = InitSplitsReading();
163+
if (issue) {
164+
return NotifyComputeActorWithIssue(
165+
TActivationContext::ActorSystem(),
166+
ComputeActorId_,
167+
InputIndex_,
168+
std::move(*issue));
169+
}
170+
171+
return;
149172
}
150173

151174
// Server temporary failure
@@ -163,27 +186,31 @@ namespace NYql::NDq {
163186
}
164187

165188
// ReadSplits
166-
void InitSplitsReading() {
189+
TMaybe<TIssue> InitSplitsReading() {
167190
YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits reading";
168191

169192
if (Splits_.empty()) {
170193
YQL_CLOG(WARN, ProviderGeneric) << "Accumulated empty list of splits";
171194
ReadSplitsFinished_ = true;
172-
return NotifyComputeActorWithData();
195+
NotifyComputeActorWithData();
196+
return Nothing();
173197
}
174198

175199
// Prepare request
176200
NConnector::NApi::TReadSplitsRequest request;
177201
request.set_format(NConnector::NApi::TReadSplitsRequest::ARROW_IPC_STREAMING);
178202
request.mutable_splits()->Reserve(Splits_.size());
179203

180-
std::for_each(
181-
Splits_.cbegin(), Splits_.cend(),
182-
[&](const NConnector::NApi::TSplit& split) {
183-
NConnector::NApi::TSplit splitCopy = split;
184-
TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance());
185-
*request.mutable_splits()->Add() = std::move(split);
186-
});
204+
for (const auto& split : Splits_) {
205+
NConnector::NApi::TSplit splitCopy = split;
206+
207+
auto error = TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance());
208+
if (error) {
209+
return TIssue(std::move(error));
210+
}
211+
212+
*request.mutable_splits()->Add() = std::move(splitCopy);
213+
}
187214

188215
// Start streaming
189216
Client_->ReadSplits(request).Subscribe(
@@ -197,6 +224,8 @@ namespace NYql::NDq {
197224
TEvReadSplitsIterator>(
198225
actorSystem, selfId, computeActorId, inputIndex, future);
199226
});
227+
228+
return Nothing();
200229
}
201230

202231
void Handle(TEvReadSplitsIterator::TPtr& ev) {
@@ -308,8 +337,8 @@ namespace NYql::NDq {
308337

309338
static void NotifyComputeActorWithError(
310339
TActorSystem* actorSystem,
311-
const NActors::TActorId computeActorId,
312-
const ui64 inputIndex,
340+
NActors::TActorId computeActorId,
341+
ui64 inputIndex,
313342
const NConnector::NApi::TError& error) {
314343
actorSystem->Send(computeActorId,
315344
new TEvAsyncInputError(
@@ -319,6 +348,19 @@ namespace NYql::NDq {
319348
return;
320349
}
321350

351+
static void NotifyComputeActorWithIssue(
352+
TActorSystem* actorSystem,
353+
NActors::TActorId computeActorId,
354+
ui64 inputIndex,
355+
TIssue issue) {
356+
actorSystem->Send(computeActorId,
357+
new TEvAsyncInputError(
358+
inputIndex,
359+
TIssues{std::move(issue)},
360+
NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_INTERNAL_ERROR));
361+
return;
362+
}
363+
322364
i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer,
323365
TMaybe<TInstant>&,
324366
bool& finished,

ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "yql_generic_token_provider.h"
22

33
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
4+
#include <ydb/library/yql/utils/log/log.h>
45

56
namespace NYql::NDq {
67
TGenericTokenProvider::TGenericTokenProvider(const TString& staticIamToken)
@@ -9,59 +10,63 @@ namespace NYql::NDq {
910
}
1011

1112
TGenericTokenProvider::TGenericTokenProvider(
12-
const TString& serviceAccountId,
13-
const TString& ServiceAccountIdSignature,
14-
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory)
15-
{
13+
const TString& serviceAccountId, const TString& ServiceAccountIdSignature,
14+
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
1615
Y_ENSURE(!serviceAccountId.Empty(), "No service account provided");
1716
Y_ENSURE(!ServiceAccountIdSignature.Empty(), "No service account signature provided");
1817
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");
1918

2019
auto structuredTokenJSON =
21-
TStructuredTokenBuilder()
22-
.SetServiceAccountIdAuth(serviceAccountId, ServiceAccountIdSignature)
23-
.ToJson();
20+
TStructuredTokenBuilder().SetServiceAccountIdAuth(serviceAccountId, ServiceAccountIdSignature).ToJson();
2421

2522
Y_ENSURE(structuredTokenJSON, "empty structured token");
2623

27-
auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false);
24+
auto credentialsProviderFactory =
25+
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false);
2826
CredentialsProvider_ = credentialsProviderFactory->CreateProvider();
2927
}
3028

31-
void TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const {
29+
TString TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const {
3230
// 1. Don't need tokens if basic auth is set
3331
if (dsi.credentials().has_basic()) {
34-
return;
32+
return {};
3533
}
3634

3735
*dsi.mutable_credentials()->mutable_token()->mutable_type() = "IAM";
3836

3937
// 2. If static IAM-token has been provided, use it
4038
if (!StaticIAMToken_.empty()) {
4139
*dsi.mutable_credentials()->mutable_token()->mutable_value() = StaticIAMToken_;
42-
return;
40+
return {};
4341
}
4442

4543
// 3. Otherwise use credentials provider to get token
4644
Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized");
4745

48-
auto iamToken = CredentialsProvider_->GetAuthInfo();
46+
TString iamToken;
47+
try {
48+
iamToken = CredentialsProvider_->GetAuthInfo();
49+
} catch (const std::exception& e) {
50+
YQL_CLOG(ERROR, ProviderGeneric) << "MaybeFillToken: " << e.what();
51+
return TString(e.what());
52+
}
53+
4954
Y_ENSURE(iamToken, "CredentialsProvider returned empty IAM token");
5055

5156
*dsi.mutable_credentials()->mutable_token()->mutable_value() = std::move(iamToken);
57+
return {};
5258
}
5359

5460
TGenericTokenProvider::TPtr
55-
CreateGenericTokenProvider(
56-
const TString& staticIamToken,
57-
const TString& serviceAccountId, const TString& serviceAccountIdSignature,
58-
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory)
59-
{
61+
CreateGenericTokenProvider(const TString& staticIamToken, const TString& serviceAccountId,
62+
const TString& serviceAccountIdSignature,
63+
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
6064
if (!staticIamToken.Empty()) {
6165
return std::make_unique<TGenericTokenProvider>(staticIamToken);
6266
}
6367
if (!serviceAccountId.Empty()) {
64-
return std::make_unique<TGenericTokenProvider>(serviceAccountId, serviceAccountIdSignature, credentialsFactory);
68+
return std::make_unique<TGenericTokenProvider>(serviceAccountId, serviceAccountIdSignature,
69+
credentialsFactory);
6570
}
6671
return std::make_unique<TGenericTokenProvider>();
6772
}

ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
44
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
55
#include <ydb/library/yql/providers/generic/proto/source.pb.h>
6+
#include <ydb/library/yql/public/issue/yql_issue.h>
67

78
namespace NYql::NDq {
89
// When accessing external data sources using authentication via tokens,
@@ -19,7 +20,9 @@ namespace NYql::NDq {
1920
const TString& ServiceAccountIdSignature,
2021
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
2122

22-
void MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const;
23+
// MaybeFillToken sets IAM-token within DataSourceInstance.
24+
// Returns string containing error, if it happened.
25+
TString MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const;
2326

2427
private:
2528
TString StaticIAMToken_;

0 commit comments

Comments
 (0)