Skip to content

Commit 7a0f1b7

Browse files
authored
Fail ScanFetcher on Registration (with CA) Timeout (#19900) (#19939)
2 parents b9664cb + 7f7b2ba commit 7a0f1b7

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ static constexpr ui64 MAX_SHARD_RETRIES = 5; // retry after: 0, 250, 500, 1000
2121
static constexpr ui64 MAX_TOTAL_SHARD_RETRIES = 20;
2222
static constexpr ui64 MAX_SHARD_RESOLVES = 3;
2323

24+
constexpr TDuration REGISTRATION_TIMEOUT = TDuration::Seconds(60);
25+
constexpr TDuration PING_PERIOD = TDuration::Seconds(30);
26+
2427
} // anonymous namespace
2528

2629
TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TComputeRuntimeSettings& settings,
@@ -79,16 +82,18 @@ void TKqpScanFetcherActor::Bootstrap() {
7982
auto& state = PendingShards.emplace_back(TShardState(read.GetShardId()));
8083
state.Ranges = BuildSerializedTableRanges(read);
8184
}
85+
RegistrationStartTime = Now();
8286
for (auto&& c : ComputeActorIds) {
8387
Sender<TEvScanExchange::TEvRegisterFetcher>().SendTo(c);
8488
}
8589
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "bootstrap")("compute", ComputeActorIds.size())("shards", PendingShards.size());
8690
StartTableScan();
8791
Become(&TKqpScanFetcherActor::StateFunc);
88-
Schedule(TDuration::Seconds(30), new NActors::TEvents::TEvWakeup());
92+
Schedule(PING_PERIOD, new NActors::TEvents::TEvWakeup());
8993
}
9094

9195
void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev) {
96+
RegistrationFinished = true;
9297
AFL_ENSURE(ev->Get()->GetFreeSpace());
9398
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "AckDataFromCompute")("self_id", SelfId())("scan_id", ScanId)(
9499
"packs_to_send", InFlightComputes.GetPacksToSendCount())("from", ev->Sender)("shards remain", PendingShards.size())(
@@ -697,8 +702,18 @@ void TKqpScanFetcherActor::CheckFinish() {
697702
}
698703

699704
void TKqpScanFetcherActor::HandleExecute(NActors::TEvents::TEvWakeup::TPtr&) {
700-
InFlightShards.PingAllScanners();
701-
Schedule(TDuration::Seconds(30), new NActors::TEvents::TEvWakeup());
705+
if (RegistrationFinished) {
706+
InFlightShards.PingAllScanners();
707+
} else if (Now() - RegistrationStartTime > REGISTRATION_TIMEOUT) {
708+
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "TEvWakeup")("info", "Abort fetcher due to Registration timeout");
709+
InFlightShards.AbortAllScanners("Abort fetcher due to Registration timeout");
710+
TIssues issues;
711+
issues.AddIssue(TIssue("Abort fetcher due to Registration timeout"));
712+
SendGlobalFail(NDqProto::COMPUTE_STATE_FAILURE, NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues);
713+
PassAway();
714+
return;
715+
}
716+
Schedule(PING_PERIOD, new NActors::TEvents::TEvWakeup());
702717
}
703718

704719
} // namespace NKikimr::NKqp::NScanPrivate

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
189189
std::set<ui32> TrackingNodes;
190190
ui32 MaxInFlight = 1024;
191191
bool IsAggregationRequest = false;
192+
bool RegistrationFinished = false;
193+
TInstant RegistrationStartTime;
192194
};
193195

194196
}

0 commit comments

Comments
 (0)