Skip to content

Commit a0bcb0b

Browse files
hiddenpathblinkov
authored andcommitted
YT-24173: Fix error resolving path in remote clusters operations
commit_hash:9facb0640518c75b7a745dad2ddbd649cb91a83f
1 parent 2a27162 commit a0bcb0b

File tree

14 files changed

+142
-29
lines changed

14 files changed

+142
-29
lines changed

yt/cpp/mapreduce/client/client.cpp

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1496,8 +1496,7 @@ IClientPtr TClient::GetParentClient(bool ignoreGlobalTx)
14961496
RawClient_,
14971497
Context_,
14981498
TTransactionId(),
1499-
ClientRetryPolicy_
1500-
);
1499+
ClientRetryPolicy_);
15011500
} else {
15021501
return this;
15031502
}
@@ -1510,15 +1509,10 @@ void TClient::CheckShutdown() const
15101509
}
15111510
}
15121511

1513-
TClientContext CreateClientContext(
1514-
const TString& serverName,
1515-
const TCreateClientOptions& options)
1512+
void SetupClusterContext(
1513+
TClientContext& context,
1514+
const TString& serverName)
15161515
{
1517-
TClientContext context;
1518-
context.Config = options.Config_ ? options.Config_ : TConfig::Get();
1519-
context.TvmOnly = options.TvmOnly_;
1520-
context.ProxyAddress = options.ProxyAddress_;
1521-
15221516
context.ServerName = serverName;
15231517
ApplyProxyUrlAliasingRules(context.ServerName);
15241518

@@ -1531,9 +1525,8 @@ TClientContext CreateClientContext(
15311525

15321526
static constexpr char httpUrlSchema[] = "http://";
15331527
static constexpr char httpsUrlSchema[] = "https://";
1534-
if (options.UseTLS_) {
1535-
context.UseTLS = *options.UseTLS_;
1536-
} else {
1528+
1529+
if (!context.UseTLS) {
15371530
context.UseTLS = context.ServerName.StartsWith(httpsUrlSchema);
15381531
}
15391532

@@ -1555,9 +1548,25 @@ TClientContext CreateClientContext(
15551548
if (context.ServerName.find(':') == TString::npos) {
15561549
context.ServerName = CreateHostNameWithPort(context.ServerName, context);
15571550
}
1558-
if (options.TvmOnly_) {
1551+
if (context.TvmOnly) {
15591552
context.ServerName = Format("tvm.%v", context.ServerName);
15601553
}
1554+
}
1555+
1556+
TClientContext CreateClientContext(
1557+
const TString& serverName,
1558+
const TCreateClientOptions& options)
1559+
{
1560+
TClientContext context;
1561+
context.Config = options.Config_ ? options.Config_ : TConfig::Get();
1562+
context.TvmOnly = options.TvmOnly_;
1563+
context.ProxyAddress = options.ProxyAddress_;
1564+
1565+
if (options.UseTLS_) {
1566+
context.UseTLS = *options.UseTLS_;
1567+
}
1568+
1569+
SetupClusterContext(context, serverName);
15611570

15621571
if (options.ProxyRole_) {
15631572
context.Config->Hosts = "hosts?role=" + *options.ProxyRole_;

yt/cpp/mapreduce/client/client.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,10 @@ class TClient
509509

510510
////////////////////////////////////////////////////////////////////////////////
511511

512+
void SetupClusterContext(
513+
TClientContext& context,
514+
const TString& serverName);
515+
512516
TClientContext CreateClientContext(
513517
const TString& serverName,
514518
const TCreateClientOptions& options);

yt/cpp/mapreduce/client/client_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ TClientReader::TClientReader(
6969

7070
if (useFormatFromTableAttributes) {
7171
auto transactionId2 = ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_;
72-
auto newFormat = GetTableFormat(ClientRetryPolicy_, RawClient_, transactionId2, Path_);
72+
auto newFormat = GetTableFormat(ClientRetryPolicy_, RawClient_, Context_, transactionId2, Path_);
7373
if (newFormat) {
7474
Format_->Config = *newFormat;
7575
}

yt/cpp/mapreduce/client/operation.cpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,24 @@ TStructuredJobTableList ApplyProtobufColumnFilters(
220220
return tableList;
221221
}
222222

223-
auto isDynamic = NRawClient::BatchTransform(
223+
TVector<TRichYPath> tableListPaths;
224+
for (const auto& table: tableList) {
225+
Y_ABORT_UNLESS(table.RichYPath, "Cannot get path to apply column filters");
226+
tableListPaths.emplace_back(*table.RichYPath);
227+
}
228+
229+
auto isDynamic = NRawClient::RemoteClustersBatchTransform(
224230
preparer.GetClient()->GetRawClient(),
225-
tableList,
231+
preparer.GetContext(),
232+
tableListPaths,
226233
[&] (IRawBatchRequestPtr batch, const auto& table) {
227-
return batch->Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
234+
// In case of external cluster, we can't use the current transaction
235+
// since it is unknown for the external cluster.
236+
// Hence, we should take a global transaction.
237+
if (table.Cluster_ && !table.Cluster_->empty()) {
238+
return batch->Get(TTransactionId(), table.Path_ + "/@dynamic", TGetOptions());
239+
}
240+
return batch->Get(preparer.GetTransactionId(), table.Path_ + "/@dynamic", TGetOptions());
228241
});
229242

230243
auto newTableList = tableList;

yt/cpp/mapreduce/client/skiff.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {
279279

280280
NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
281281
const IRawClientPtr& rawClient,
282+
const TClientContext& context,
282283
const TTransactionId& transactionId,
283284
ENodeReaderFormat nodeReaderFormat,
284285
const TVector<TRichYPath>& tablePaths,
@@ -301,17 +302,23 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
301302
}
302303
}
303304

304-
auto nodes = NRawClient::BatchTransform(
305+
auto nodes = RemoteClustersBatchTransform(
305306
rawClient,
306-
NRawClient::CanonizeYPaths(rawClient, tablePaths),
307+
context,
308+
tablePaths,
307309
[&] (IRawBatchRequestPtr batch, const TRichYPath& path) {
308310
auto getOptions = TGetOptions()
309311
.AttributeFilter(
310312
TAttributeFilter()
311313
.AddAttribute("schema")
312314
.AddAttribute("dynamic")
313-
.AddAttribute("type")
314-
);
315+
.AddAttribute("type"));
316+
// In case of external cluster, we can't use the current transaction
317+
// since it is unknown for the external cluster.
318+
// Hence, we should take a global transaction.
319+
if (path.Cluster_ && !path.Cluster_->empty()) {
320+
return batch->Get(TTransactionId(), path.Path_, getOptions);
321+
}
315322
return batch->Get(transactionId, path.Path_, getOptions);
316323
});
317324

yt/cpp/mapreduce/client/skiff.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema);
6060

6161
NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
6262
const IRawClientPtr& rawClient,
63+
const TClientContext& context,
6364
const TTransactionId& transactionId,
6465
ENodeReaderFormat nodeReaderFormat,
6566
const TVector<TRichYPath>& tablePaths,

yt/cpp/mapreduce/client/structured_table_formats.cpp

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "structured_table_formats.h"
22

3+
#include "client.h"
34
#include "format_hints.h"
45
#include "skiff.h"
56

@@ -67,24 +68,32 @@ TMaybe<TNode> GetCommonTableFormat(
6768
TMaybe<TNode> GetTableFormat(
6869
const IClientRetryPolicyPtr& retryPolicy,
6970
const IRawClientPtr& rawClient,
71+
const TClientContext& context,
7072
const TTransactionId& transactionId,
7173
const TRichYPath& path)
7274
{
75+
auto newRawClient = rawClient;
76+
if (path.Cluster_ && !path.Cluster_->empty()) {
77+
auto newContext = context;
78+
NDetail::SetupClusterContext(newContext, *path.Cluster_);
79+
newRawClient = rawClient->Clone(newContext);
80+
}
81+
7382
auto formatPath = path.Path_ + "/@_format";
7483

7584
auto exists = NDetail::RequestWithRetry<bool>(
7685
retryPolicy->CreatePolicyForGenericRequest(),
77-
[&rawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) {
78-
return rawClient->Exists(transactionId, formatPath);
86+
[&newRawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) {
87+
return newRawClient->Exists(transactionId, formatPath);
7988
});
8089
if (!exists) {
8190
return TMaybe<TNode>();
8291
}
8392

8493
auto format = NDetail::RequestWithRetry<TMaybe<TNode>>(
8594
retryPolicy->CreatePolicyForGenericRequest(),
86-
[&rawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) {
87-
return rawClient->Get(transactionId, formatPath);
95+
[&newRawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) {
96+
return newRawClient->Get(transactionId, formatPath);
8897
});
8998
if (format.Get()->AsString() != "yamred_dsv") {
9099
return TMaybe<TNode>();
@@ -102,12 +111,13 @@ TMaybe<TNode> GetTableFormat(
102111
TMaybe<TNode> GetTableFormats(
103112
const IClientRetryPolicyPtr& clientRetryPolicy,
104113
const IRawClientPtr& rawClient,
114+
const TClientContext& context,
105115
const TTransactionId& transactionId,
106116
const TVector<TRichYPath>& inputs)
107117
{
108118
TVector<TMaybe<TNode>> formats;
109119
for (auto& table : inputs) {
110-
formats.push_back(GetTableFormat(clientRetryPolicy, rawClient, transactionId, table));
120+
formats.push_back(GetTableFormat(clientRetryPolicy, rawClient, context, transactionId, table));
111121
}
112122

113123
return GetCommonTableFormat(formats);
@@ -121,6 +131,7 @@ namespace NDetail {
121131

122132
NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
123133
const IRawClientPtr& rawClient,
134+
const TClientContext& context,
124135
const TTransactionId& transactionId,
125136
const TVector<TRichYPath>& tables,
126137
const TOperationOptions& options,
@@ -134,6 +145,7 @@ NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
134145
}
135146
return CreateSkiffSchemaIfNecessary(
136147
rawClient,
148+
context,
137149
transactionId,
138150
nodeReaderFormat,
139151
tables,
@@ -387,7 +399,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateYamrFormat(
387399
Y_ABORT_UNLESS(table.RichYPath, "Cannot use format from table for intermediate table");
388400
tableList.push_back(*table.RichYPath);
389401
}
390-
formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, RawClient_, TransactionId_, tableList);
402+
formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, RawClient_, Context_, TransactionId_, tableList);
391403
}
392404
if (formatFromTableAttributes) {
393405
return {
@@ -432,6 +444,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat(
432444
}
433445
skiffSchema = TryCreateSkiffSchema(
434446
RawClient_,
447+
Context_,
435448
TransactionId_,
436449
tableList,
437450
OperationOptions_,

yt/cpp/mapreduce/client/structured_table_formats.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ TMaybe<TNode> GetCommonTableFormat(
2222
TMaybe<TNode> GetTableFormat(
2323
const IClientRetryPolicyPtr& clientRetryPolicy,
2424
const IRawClientPtr& rawClient,
25+
const TClientContext& context,
2526
const TTransactionId& transactionId,
2627
const TRichYPath& path);
2728

2829
TMaybe<TNode> GetTableFormats(
2930
const IClientRetryPolicyPtr& clientRetryPolicy,
3031
const IRawClientPtr& rawClient,
32+
const TClientContext& context,
3133
const TTransactionId& transactionId,
3234
const TVector<TRichYPath>& paths);
3335

yt/cpp/mapreduce/http/context.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ bool operator==(const TClientContext& lhs, const TClientContext& rhs)
1313
lhs.HttpClient == rhs.HttpClient &&
1414
lhs.UseTLS == rhs.UseTLS &&
1515
lhs.TvmOnly == rhs.TvmOnly &&
16-
lhs.ProxyAddress == rhs.ProxyAddress;
16+
lhs.ProxyAddress == rhs.ProxyAddress &&
17+
lhs.ProxyRole == rhs.ProxyRole;
1718
}
1819

1920
////////////////////////////////////////////////////////////////////////////////

yt/cpp/mapreduce/http/context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ struct TClientContext
2222
bool UseTLS = false;
2323
TConfigPtr Config = TConfig::Get();
2424
TMaybe<TString> ProxyAddress;
25+
TMaybe<TString> ProxyRole;
2526
};
2627

2728
bool operator==(const TClientContext& lhs, const TClientContext& rhs);

0 commit comments

Comments
 (0)