Skip to content

Commit bb37d56

Browse files
author
hiddenpath
committed
YT-23616: Make THttpRawBatchRequest as an implementation of IRawBatchRequest interface
commit_hash:9e6c556686dda1562697762d38da532dc5c87b80
1 parent 9a04c0a commit bb37d56

20 files changed

+260
-107
lines changed

yt/cpp/mapreduce/client/batch_request_impl.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ using ::NThreading::NewPromise;
3636

3737
TBatchRequest::TBatchRequest(const TTransactionId& defaultTransaction, ::TIntrusivePtr<TClient> client)
3838
: DefaultTransaction_(defaultTransaction)
39-
, Impl_(MakeIntrusive<THttpRawBatchRequest>(client->GetContext().Config))
39+
, Impl_(client->GetRawClient()->CreateRawBatchRequest())
4040
, Client_(client)
4141
{ }
4242

43-
TBatchRequest::TBatchRequest(THttpRawBatchRequest* impl, ::TIntrusivePtr<TClient> client)
43+
TBatchRequest::TBatchRequest(IRawBatchRequest* impl, ::TIntrusivePtr<TClient> client)
4444
: Impl_(impl)
4545
, Client_(std::move(client))
4646
{ }
@@ -189,7 +189,7 @@ TFuture<TCheckPermissionResponse> TBatchRequest::CheckPermission(
189189

190190
void TBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options)
191191
{
192-
Impl_->ExecuteBatch(Client_->GetRetryPolicy()->CreatePolicyForGenericRequest(), Client_->GetContext(), options);
192+
Impl_->ExecuteBatch(options);
193193
}
194194

195195
////////////////////////////////////////////////////////////////////////////////

yt/cpp/mapreduce/client/batch_request_impl.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@ struct TResponseInfo;
2222
class TClient;
2323
using TClientPtr = ::TIntrusivePtr<TClient>;
2424

25-
namespace NRawClient {
26-
class THttpRawBatchRequest;
27-
}
28-
2925
////////////////////////////////////////////////////////////////////////////////
3026

3127
class TBatchRequest
@@ -119,11 +115,11 @@ class TBatchRequest
119115
virtual void ExecuteBatch(const TExecuteBatchOptions& executeBatch) override;
120116

121117
private:
122-
TBatchRequest(NDetail::NRawClient::THttpRawBatchRequest* impl, ::TIntrusivePtr<TClient> client);
118+
TBatchRequest(IRawBatchRequest* impl, ::TIntrusivePtr<TClient> client);
123119

124120
private:
125121
TTransactionId DefaultTransaction_;
126-
::TIntrusivePtr<NDetail::NRawClient::THttpRawBatchRequest> Impl_;
122+
IRawBatchRequestPtr Impl_;
127123
std::unique_ptr<TBatchRequest> TmpWithTransaction_;
128124
::TIntrusivePtr<TClient> Client_;
129125

yt/cpp/mapreduce/client/lock.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class TLockPollerItem
2626
, Acquired_(acquired)
2727
{ }
2828

29-
void PrepareRequest(THttpRawBatchRequest* batchRequest) override
29+
void PrepareRequest(IRawBatchRequest* batchRequest) override
3030
{
3131
LockState_ = batchRequest->Get(TTransactionId(), LockStateYPath_, TGetOptions());
3232
}

yt/cpp/mapreduce/client/operation.cpp

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ TStructuredJobTableList ApplyProtobufColumnFilters(
228228
CreateDefaultRequestRetryPolicy(preparer.GetContext().Config),
229229
preparer.GetContext(),
230230
tableList,
231-
[&] (NRawClient::THttpRawBatchRequest& batch, const auto& table) {
231+
[&] (IRawBatchRequest& batch, const auto& table) {
232232
return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
233233
});
234234

@@ -318,7 +318,6 @@ TSimpleOperationIo CreateSimpleOperationIo(
318318
inputs,
319319
outputs,
320320
preparer.GetClient()->GetRawClient(),
321-
preparer.GetContext(),
322321
preparer.GetClientRetryPolicy(),
323322
preparer.GetTransactionId()),
324323
&inputs,
@@ -497,7 +496,6 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
497496
structuredInputs,
498497
structuredOutputs,
499498
preparer.GetClient()->GetRawClient(),
500-
preparer.GetContext(),
501499
preparer.GetClientRetryPolicy(),
502500
preparer.GetTransactionId()),
503501
&structuredInputs,
@@ -1688,7 +1686,6 @@ void ExecuteMapReduce(
16881686
structuredInputs,
16891687
mapperOutput,
16901688
preparer->GetClient()->GetRawClient(),
1691-
preparer->GetContext(),
16921689
preparer->GetClientRetryPolicy(),
16931690
preparer->GetTransactionId()),
16941691
&structuredInputs,
@@ -1758,7 +1755,6 @@ void ExecuteMapReduce(
17581755
inputs,
17591756
outputs,
17601757
preparer->GetClient()->GetRawClient(),
1761-
preparer->GetContext(),
17621758
preparer->GetClientRetryPolicy(),
17631759
preparer->GetTransactionId()),
17641760
&inputs,
@@ -1824,7 +1820,6 @@ void ExecuteMapReduce(
18241820
structuredInputs,
18251821
structuredOutputs,
18261822
preparer->GetClient()->GetRawClient(),
1827-
preparer->GetContext(),
18281823
preparer->GetClientRetryPolicy(),
18291824
preparer->GetTransactionId()),
18301825
&structuredInputs,
@@ -2343,7 +2338,7 @@ class TOperationPollerItem
23432338
: OperationImpl_(std::move(operationImpl))
23442339
{ }
23452340

2346-
void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) override
2341+
void PrepareRequest(IRawBatchRequest* batchRequest) override
23472342
{
23482343
auto filter = TOperationAttributeFilter()
23492344
.Add(EOperationAttribute::State)

yt/cpp/mapreduce/client/operation_preparer.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class TWaitOperationStartPollerItem
4242
, Transaction_(std::move(transaction))
4343
{ }
4444

45-
void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) override
45+
void PrepareRequest(IRawBatchRequest* batchRequest) override
4646
{
4747
Future_ = batchRequest->GetOperation(
4848
OperationId_,
@@ -213,26 +213,26 @@ void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths)
213213

214214
TVector<::NThreading::TFuture<TLockId>> lockIdFutures;
215215
lockIdFutures.reserve(paths->size());
216-
NRawClient::THttpRawBatchRequest lockRequest(GetContext().Config);
216+
auto lockRequest = Client_->GetRawClient()->CreateRawBatchRequest();
217217
for (const auto& path : *paths) {
218-
lockIdFutures.push_back(lockRequest.Lock(
218+
lockIdFutures.push_back(lockRequest->Lock(
219219
FileTransaction_->GetId(),
220220
path.Path_,
221221
ELockMode::LM_SNAPSHOT,
222222
TLockOptions().Waitable(true)));
223223
}
224-
lockRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext());
224+
lockRequest->ExecuteBatch();
225225

226226
TVector<::NThreading::TFuture<TNode>> nodeIdFutures;
227227
nodeIdFutures.reserve(paths->size());
228-
NRawClient::THttpRawBatchRequest getNodeIdRequest(GetContext().Config);
228+
auto getNodeIdRequest = Client_->GetRawClient()->CreateRawBatchRequest();
229229
for (const auto& lockIdFuture : lockIdFutures) {
230-
nodeIdFutures.push_back(getNodeIdRequest.Get(
230+
nodeIdFutures.push_back(getNodeIdRequest->Get(
231231
FileTransaction_->GetId(),
232232
::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id",
233233
TGetOptions()));
234234
}
235-
getNodeIdRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext());
235+
getNodeIdRequest->ExecuteBatch();
236236
237237
for (size_t i = 0; i != paths->size(); ++i) {
238238
auto& richPath = (*paths)[i];

yt/cpp/mapreduce/client/prepare_operation.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@ TOperationPreparationContext::TOperationPreparationContext(
2020
const TStructuredJobTableList& structuredInputs,
2121
const TStructuredJobTableList& structuredOutputs,
2222
const IRawClientPtr& rawClient,
23-
const TClientContext& context,
2423
const IClientRetryPolicyPtr& retryPolicy,
2524
TTransactionId transactionId)
2625
: RawClient_(rawClient)
27-
, Context_(context)
2826
, RetryPolicy_(retryPolicy)
2927
, TransactionId_(transactionId)
3028
, InputSchemas_(structuredInputs.size())
@@ -44,11 +42,9 @@ TOperationPreparationContext::TOperationPreparationContext(
4442
TVector<TRichYPath> inputs,
4543
TVector<TRichYPath> outputs,
4644
const IRawClientPtr& rawClient,
47-
const TClientContext& context,
4845
const IClientRetryPolicyPtr& retryPolicy,
4946
TTransactionId transactionId)
5047
: RawClient_(rawClient)
51-
, Context_(context)
5248
, RetryPolicy_(retryPolicy)
5349
, TransactionId_(transactionId)
5450
, InputSchemas_(inputs.size())
@@ -77,17 +73,17 @@ int TOperationPreparationContext::GetOutputCount() const
7773
const TVector<TTableSchema>& TOperationPreparationContext::GetInputSchemas() const
7874
{
7975
TVector<::NThreading::TFuture<TNode>> schemaFutures;
80-
NRawClient::THttpRawBatchRequest batch(Context_.Config);
76+
auto batch = RawClient_->CreateRawBatchRequest();
8177
for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
8278
if (InputSchemasLoaded_[tableIndex]) {
8379
schemaFutures.emplace_back();
8480
continue;
8581
}
8682
Y_ABORT_UNLESS(Inputs_[tableIndex]);
87-
schemaFutures.push_back(batch.Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{}));
83+
schemaFutures.push_back(batch->Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{}));
8884
}
8985

90-
batch.ExecuteBatch(RetryPolicy_->CreatePolicyForGenericRequest(), Context_);
86+
batch->ExecuteBatch();
9187

9288
for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
9389
if (schemaFutures[tableIndex].Initialized()) {

yt/cpp/mapreduce/client/prepare_operation.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@ class TOperationPreparationContext
1616
const TStructuredJobTableList& structuredInputs,
1717
const TStructuredJobTableList& structuredOutputs,
1818
const IRawClientPtr& rawClient,
19-
const TClientContext& context,
2019
const IClientRetryPolicyPtr& retryPolicy,
2120
TTransactionId transactionId);
2221

2322
TOperationPreparationContext(
2423
TVector<TRichYPath> inputs,
2524
TVector<TRichYPath> outputs,
2625
const IRawClientPtr& rawClient,
27-
const TClientContext& context,
2826
const IClientRetryPolicyPtr& retryPolicy,
2927
TTransactionId transactionId);
3028

@@ -42,7 +40,6 @@ class TOperationPreparationContext
4240
TVector<TMaybe<TRichYPath>> Outputs_;
4341

4442
const IRawClientPtr RawClient_;
45-
const TClientContext& Context_;
4643
const IClientRetryPolicyPtr RetryPolicy_;
4744

4845
TTransactionId TransactionId_;

yt/cpp/mapreduce/client/skiff.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
307307
clientRetryPolicy->CreatePolicyForGenericRequest(),
308308
context,
309309
NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
310-
[&] (THttpRawBatchRequest& batch, const TRichYPath& path) {
310+
[&] (IRawBatchRequest& batch, const TRichYPath& path) {
311311
auto getOptions = TGetOptions()
312312
.AttributeFilter(
313313
TAttributeFilter()

yt/cpp/mapreduce/client/yt_poller.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@ void TYtPoller::WatchLoop()
9292
Y_ABORT_UNLESS(!InProgress_.empty());
9393
}
9494

95-
THttpRawBatchRequest rawBatchRequest(Context_.Config);
95+
THttpRawBatchRequest rawBatchRequest(Context_, ClientRetryPolicy_->CreatePolicyForGenericRequest());
9696

9797
for (auto& item : InProgress_) {
9898
item->PrepareRequest(&rawBatchRequest);
9999
}
100100

101101
try {
102-
rawBatchRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_);
102+
rawBatchRequest.ExecuteBatch();
103103
} catch (const std::exception& ex) {
104104
YT_LOG_ERROR("Exception while executing batch request: %v", ex.what());
105105
}

yt/cpp/mapreduce/client/yt_poller.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515
namespace NYT {
1616
namespace NDetail {
1717

18-
namespace NRawClient {
19-
class THttpRawBatchRequest;
20-
}
21-
2218
////////////////////////////////////////////////////////////////////////////////
2319

2420
class IYtPollerItem
@@ -33,7 +29,7 @@ class IYtPollerItem
3329
public:
3430
virtual ~IYtPollerItem() = default;
3531

36-
virtual void PrepareRequest(NRawClient::THttpRawBatchRequest* batchRequest) = 0;
32+
virtual void PrepareRequest(IRawBatchRequest* batchRequest) = 0;
3733

3834
// Should return PollContinue if poller should continue polling this item.
3935
// Should return PollBreak if poller should stop polling this item.

0 commit comments

Comments
 (0)