Skip to content

Commit f872253

Browse files
authored
YQ-3924 / YQ-4095 Fix actor name / memory leak / to stable (#14366)
1 parent fdf539d commit f872253

File tree

6 files changed

+11
-17
lines changed

6 files changed

+11
-17
lines changed

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,13 @@ struct TEvRowDispatcher {
163163
TTopicSessionStatistic Stat;
164164
};
165165

166-
// Network events (without seqNo checks)
167-
166+
// two purposes: confirm seqNo and check the availability of the recipient actor (wait TEvUndelivered)
168167
struct TEvHeartbeat : public NActors::TEventPB<TEvHeartbeat, NFq::NRowDispatcherProto::TEvHeartbeat, EEv::EvHeartbeat> {
169168
TEvHeartbeat() = default;
170169
};
171170

171+
// Network events (without seqNo checks)
172+
172173
struct TEvNoSession : public NActors::TEventPB<TEvNoSession, NFq::NRowDispatcherProto::TEvNoSession, EEv::EvNoSession> {
173174
TEvNoSession() = default;
174175
};

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ namespace {
1717
class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public ITopicFormatHandler, public TTypeParser {
1818
using TBase = NActors::TActor<TTopicFormatHandler>;
1919

20+
public:
2021
static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_FORMAT_HANDLER";
2122

23+
private:
2224
struct TCounters {
2325
TCountersDesc Desc;
2426

ydb/core/fq/libs/row_dispatcher/protos/events.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ message TEvSessionError {
105105

106106
message TEvHeartbeat {
107107
uint32 PartitionId = 1;
108+
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
108109
}
109110

110111
message TEvNoSession {

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -884,9 +884,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
884884
}
885885
LWPROBE(Heartbeat, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, ev->Get()->Record.ByteSizeLong());
886886
LOG_ROW_DISPATCHER_TRACE("Received TEvHeartbeat from " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId);
887-
if (ev->Cookie != it->second->Generation) {
888-
LOG_ROW_DISPATCHER_WARN("Wrong message generation (TEvHeartbeat), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << it->second->Generation << ", query id " << it->second->QueryId);
889-
}
887+
CheckSession(it->second, ev);
890888
}
891889

892890
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNoSession::TPtr& ev) {
@@ -983,7 +981,7 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbea
983981
if (needSend) {
984982
LOG_ROW_DISPATCHER_TRACE("Send TEvHeartbeat to " << sessionInfo->ReadActorId << " query id " << sessionInfo->QueryId);
985983
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvHeartbeat>();
986-
Send(new IEventHandle(sessionInfo->ReadActorId, SelfId(), event.release(), 0, sessionInfo->Generation));
984+
sessionInfo->EventsQueue.Send(new NFq::TEvRowDispatcher::TEvHeartbeat(), sessionInfo->Generation);
987985
}
988986
}
989987

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -680,23 +680,14 @@ void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartb
680680
bool needSend = sessionInfo.EventsQueue.Heartbeat();
681681
if (needSend) {
682682
SRC_LOG_T("Send TEvEvHeartbeat");
683-
Send(sessionInfo.RowDispatcherActorId, new NFq::TEvRowDispatcher::TEvHeartbeat(), IEventHandle::FlagTrackDelivery, sessionInfo.Generation);
683+
sessionInfo.EventsQueue.Send(new NFq::TEvRowDispatcher::TEvHeartbeat(), sessionInfo.Generation);
684684
}
685685
}
686686

687687
void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
688688
SRC_LOG_T("Received TEvHeartbeat from " << ev->Sender << ", generation " << ev->Cookie);
689689
Counters.Heartbeat++;
690-
auto sessionIt = Sessions.find(ev->Sender);
691-
if (sessionIt == Sessions.end()) {
692-
SRC_LOG_W("Ignore TEvHeartbeat from " << ev->Sender << ", generation " << ev->Cookie);
693-
SendNoSession(ev->Sender, ev->Cookie);
694-
return;
695-
}
696-
697-
if (ev->Cookie != sessionIt->second.Generation) {
698-
SendNoSession(ev->Sender, ev->Cookie);
699-
}
690+
FindSession(ev);
700691
}
701692

702693
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -977,6 +977,7 @@ def test_sensors(self, kikimr, client):
977977
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
978978
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
979979
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_COMPILE_SERVICE", COMPUTE_NODE_COUNT)
980+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_FORMAT_HANDLER", 1)
980981
wait_row_dispatcher_sensor_value(kikimr, "ClientsCount", 1)
981982
wait_row_dispatcher_sensor_value(kikimr, "RowsSent", 1, exact_match=False)
982983
wait_row_dispatcher_sensor_value(kikimr, "IncomingRequests", 1, exact_match=False)

0 commit comments

Comments
 (0)