Skip to content

Commit e17223e

Browse files
authored
[TKqpExecuter] if got poisoned - die silently (#11286)
1 parent e31118f commit e17223e

File tree

3 files changed

+20
-5
lines changed

3 files changed

+20
-5
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2851,8 +2851,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28512851
hFunc(TEvDqCompute::TEvState, HandleShutdown);
28522852
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
28532853
hFunc(TEvents::TEvPoison, HandleShutdown);
2854+
hFunc(TEvDq::TEvAbortExecution, HandleShutdown);
28542855
default:
2855-
LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events
2856+
LOG_E("Unexpected event while waiting for shutdown: " << ev->GetTypeName()); // ignore all other events
28562857
}
28572858
}
28582859

@@ -2894,6 +2895,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28942895
}
28952896
}
28962897

2898+
void HandleShutdown(TEvDq::TEvAbortExecution::TPtr& ev) {
2899+
auto statusCode = NYql::NDq::DqStatusToYdbStatus(ev->Get()->Record.GetStatusCode());
2900+
2901+
// In case of external timeout the response is already sent to the client - no need to wait for stats.
2902+
if (statusCode == Ydb::StatusIds::TIMEOUT) {
2903+
LOG_I("External timeout while waiting for Compute Actors to finish - forcing shutdown. Sender: " << ev->Sender);
2904+
PassAway();
2905+
}
2906+
}
2907+
28972908
private:
28982909
void ReplyTxStateUnknown(ui64 shardId) {
28992910
auto message = TStringBuilder() << "Tx state unknown for shard " << shardId << ", txid " << TxId;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1719,9 +1719,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17191719

17201720
protected:
17211721
void UnexpectedEvent(const TString& state, ui32 eventType) {
1722-
LOG_C("TKqpExecuter, unexpected event: " << eventType << ", at state:" << state << ", selfID: " << this->SelfId());
1723-
InternalError(TStringBuilder() << "Unexpected event at TKqpExecuter, state: " << state
1724-
<< ", event: " << eventType);
1722+
if (eventType == TEvents::TEvPoison::EventType) {
1723+
LOG_D("TKqpExecuter, TEvPoison event at state:" << state << ", selfID: " << this->SelfId());
1724+
InternalError(TStringBuilder() << "TKqpExecuter got poisoned, state: " << state);
1725+
} else {
1726+
LOG_E("TKqpExecuter, unexpected event: " << eventType << ", at state:" << state << ", selfID: " << this->SelfId());
1727+
InternalError(TStringBuilder() << "Unexpected event at TKqpExecuter, state: " << state << ", event: " << eventType);
1728+
}
17251729
}
17261730

17271731
void InternalError(const NYql::TIssues& issues) {

ydb/core/kqp/ut/query/kqp_limits_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1026,7 +1026,7 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
10261026
return TTestActorRuntime::EEventAction::PROCESS;
10271027
});
10281028

1029-
auto settings = TExecDataQuerySettings().OperationTimeout(TDuration::MilliSeconds(500));
1029+
auto settings = TExecDataQuerySettings().OperationTimeout(TDuration::Seconds(20));
10301030
kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); });
10311031

10321032
TDispatchOptions opts;

0 commit comments

Comments
 (0)