|
4 | 4 | #include "helpers.h"
|
5 | 5 | #include "service.h"
|
6 | 6 |
|
7 |
| -#include <atomic> |
8 | 7 | #include <yt/yt/core/concurrency/thread_affinity.h>
|
9 | 8 | #include <yt/yt/core/concurrency/periodic_executor.h>
|
10 | 9 |
|
@@ -241,9 +240,7 @@ class TResponseKeeper
|
241 | 240 |
|
242 | 241 | const auto& responseMessage = responseMessageOrError.Value();
|
243 | 242 |
|
244 |
| - NProto::TResponseHeader header; |
245 |
| - YT_VERIFY(TryParseResponseHeader(responseMessage, &header)); |
246 |
| - bool remember = FromProto<NRpc::EErrorCode>(header.error().code()) != NRpc::EErrorCode::Unavailable; |
| 243 | + auto remember = ValidateHeaderAndParseRememberOption(responseMessage); |
247 | 244 |
|
248 | 245 | if (auto setResponseKeeperPromise = EndRequest(mutationId, responseMessage, remember)) {
|
249 | 246 | setResponseKeeperPromise();
|
@@ -313,20 +310,16 @@ class TResponseKeeper
|
313 | 310 |
|
314 | 311 | auto pendingIt = PendingResponses_.find(id);
|
315 | 312 | if (pendingIt != PendingResponses_.end()) {
|
316 |
| - if (!isRetry) { |
317 |
| - THROW_ERROR_EXCEPTION("Duplicate request is not marked as \"retry\"") |
318 |
| - << TErrorAttribute("mutation_id", id); |
319 |
| - } |
| 313 | + ValidateRetry(id, isRetry); |
| 314 | + |
320 | 315 | YT_LOG_DEBUG("Replying with pending response (MutationId: %v)", id);
|
321 | 316 | return pendingIt->second;
|
322 | 317 | }
|
323 | 318 |
|
324 | 319 | auto finishedIt = FinishedResponses_.find(id);
|
325 | 320 | if (finishedIt != FinishedResponses_.end()) {
|
326 |
| - if (!isRetry) { |
327 |
| - THROW_ERROR_EXCEPTION("Duplicate request is not marked as \"retry\"") |
328 |
| - << TErrorAttribute("mutation_id", id); |
329 |
| - } |
| 321 | + ValidateRetry(id, isRetry); |
| 322 | + |
330 | 323 | YT_LOG_DEBUG("Replying with finished response (MutationId: %v)", id);
|
331 | 324 | return MakeFuture(finishedIt->second);
|
332 | 325 | }
|
@@ -383,6 +376,23 @@ class TResponseKeeper
|
383 | 376 |
|
384 | 377 | ////////////////////////////////////////////////////////////////////////////////
|
385 | 378 |
|
| 379 | +bool ValidateHeaderAndParseRememberOption(const TSharedRefArray& responseMessage) |
| 380 | +{ |
| 381 | + NProto::TResponseHeader header; |
| 382 | + YT_VERIFY(TryParseResponseHeader(responseMessage, &header)); |
| 383 | + return FromProto<EErrorCode>(header.error().code()) != EErrorCode::Unavailable; |
| 384 | +} |
| 385 | + |
| 386 | +void ValidateRetry(TMutationId mutationId, bool isRetry) |
| 387 | +{ |
| 388 | + if (!isRetry) { |
| 389 | + THROW_ERROR_EXCEPTION("Duplicate request is not marked as \"retry\"") |
| 390 | + << TErrorAttribute("mutation_id", mutationId); |
| 391 | + } |
| 392 | +} |
| 393 | + |
| 394 | +//////////////////////////////////////////////////////////////////////////////// |
| 395 | + |
386 | 396 | IResponseKeeperPtr CreateResponseKeeper(
|
387 | 397 | TResponseKeeperConfigPtr config,
|
388 | 398 | IInvokerPtr invoker,
|
|
0 commit comments