|
4 | 4 | #include "helpers.h"
|
5 | 5 | #include "service.h"
|
6 | 6 |
|
| 7 | +#include <atomic> |
7 | 8 | #include <yt/yt/core/concurrency/thread_affinity.h>
|
8 | 9 | #include <yt/yt/core/concurrency/periodic_executor.h>
|
9 | 10 |
|
@@ -240,7 +241,9 @@ class TResponseKeeper
|
240 | 241 |
|
241 | 242 | const auto& responseMessage = responseMessageOrError.Value();
|
242 | 243 |
|
243 |
| - auto remember = ValidateHeaderAndParseRememberOption(responseMessage); |
| 244 | + NProto::TResponseHeader header; |
| 245 | + YT_VERIFY(TryParseResponseHeader(responseMessage, &header)); |
| 246 | + bool remember = FromProto<NRpc::EErrorCode>(header.error().code()) != NRpc::EErrorCode::Unavailable; |
244 | 247 |
|
245 | 248 | if (auto setResponseKeeperPromise = EndRequest(mutationId, responseMessage, remember)) {
|
246 | 249 | setResponseKeeperPromise();
|
@@ -310,16 +313,20 @@ class TResponseKeeper
|
310 | 313 |
|
311 | 314 | auto pendingIt = PendingResponses_.find(id);
|
312 | 315 | if (pendingIt != PendingResponses_.end()) {
|
313 |
| - ValidateRetry(id, isRetry); |
314 |
| - |
| 316 | + if (!isRetry) { |
| 317 | + THROW_ERROR_EXCEPTION("Duplicate request is not marked as \"retry\"") |
| 318 | + << TErrorAttribute("mutation_id", id); |
| 319 | + } |
315 | 320 | YT_LOG_DEBUG("Replying with pending response (MutationId: %v)", id);
|
316 | 321 | return pendingIt->second;
|
317 | 322 | }
|
318 | 323 |
|
319 | 324 | auto finishedIt = FinishedResponses_.find(id);
|
320 | 325 | if (finishedIt != FinishedResponses_.end()) {
|
321 |
| - ValidateRetry(id, isRetry); |
322 |
| - |
| 326 | + if (!isRetry) { |
| 327 | + THROW_ERROR_EXCEPTION("Duplicate request is not marked as \"retry\"") |
| 328 | + << TErrorAttribute("mutation_id", id); |
| 329 | + } |
323 | 330 | YT_LOG_DEBUG("Replying with finished response (MutationId: %v)", id);
|
324 | 331 | return MakeFuture(finishedIt->second);
|
325 | 332 | }
|
@@ -376,23 +383,6 @@ class TResponseKeeper
|
376 | 383 |
|
377 | 384 | ////////////////////////////////////////////////////////////////////////////////
|
378 | 385 |
|
379 |
| -bool ValidateHeaderAndParseRememberOption(const TSharedRefArray& responseMessage) |
380 |
| -{ |
381 |
| - NProto::TResponseHeader header; |
382 |
| - YT_VERIFY(TryParseResponseHeader(responseMessage, &header)); |
383 |
| - return FromProto<EErrorCode>(header.error().code()) != EErrorCode::Unavailable; |
384 |
| -} |
385 |
| - |
386 |
| -void ValidateRetry(TMutationId mutationId, bool isRetry) |
387 |
| -{ |
388 |
| - if (!isRetry) { |
389 |
| - THROW_ERROR_EXCEPTION("Duplicate request is not marked as \"retry\"") |
390 |
| - << TErrorAttribute("mutation_id", mutationId); |
391 |
| - } |
392 |
| -} |
393 |
| - |
394 |
| -//////////////////////////////////////////////////////////////////////////////// |
395 |
| - |
396 | 386 | IResponseKeeperPtr CreateResponseKeeper(
|
397 | 387 | TResponseKeeperConfigPtr config,
|
398 | 388 | IInvokerPtr invoker,
|
|
0 commit comments