Skip to content

Commit 050eae7

Browse files
authored
YQ-3837 Add Heartbeat messages (#11467)
1 parent c5e92e4 commit 050eae7

File tree

7 files changed

+84
-47
lines changed

7 files changed

+84
-47
lines changed

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ struct TEvRowDispatcher {
2828
EvCoordinatorRequest,
2929
EvCoordinatorResult,
3030
EvSessionStatistic,
31+
EvHeartbeat,
3132
EvEnd,
3233
};
3334

@@ -128,6 +129,13 @@ struct TEvRowDispatcher {
128129
: Stat(stat) {}
129130
TopicSessionStatistic Stat;
130131
};
132+
133+
struct TEvHeartbeat : public NActors::TEventPB<TEvHeartbeat, NFq::NRowDispatcherProto::TEvHeartbeat, EEv::EvHeartbeat> {
134+
TEvHeartbeat() = default;
135+
TEvHeartbeat(ui32 partitionId) {
136+
Record.SetPartitionId(partitionId);
137+
}
138+
};
131139
};
132140

133141
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/protos/events.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,7 @@ message TEvSessionError {
7676
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
7777
}
7878

79+
message TEvHeartbeat {
80+
uint32 PartitionId = 1;
81+
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
82+
}

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,9 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
221221
void Handle(NFq::TEvRowDispatcher::TEvStatus::TPtr& ev);
222222
void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev);
223223

224-
void Handle(NActors::TEvents::TEvPing::TPtr& ev);
224+
void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev);
225225
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&);
226-
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr&);
226+
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
227227
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
228228
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
229229
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
@@ -251,9 +251,9 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
251251
hFunc(NFq::TEvRowDispatcher::TEvStatus, Handle);
252252
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
253253
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
254-
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvPing, Handle);
254+
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
255255
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
256-
hFunc(NActors::TEvents::TEvPing, Handle);
256+
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
257257
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
258258
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
259259
hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle);
@@ -348,7 +348,7 @@ void TRowDispatcher::Handle(TEvPrivate::TEvCoordinatorPing::TPtr&) {
348348
}
349349

350350
void TRowDispatcher::Handle(NActors::TEvents::TEvPong::TPtr&) {
351-
LOG_ROW_DISPATCHER_TRACE("NActors::TEvents::TEvPong ");
351+
LOG_ROW_DISPATCHER_TRACE("NActors::TEvents::TEvPong");
352352
}
353353

354354
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe::TPtr& ev) {
@@ -495,9 +495,16 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
495495
Forward(ev, it->second->TopicSessionId);
496496
}
497497

498-
void TRowDispatcher::Handle(NActors::TEvents::TEvPing::TPtr& ev) {
499-
LOG_ROW_DISPATCHER_TRACE("TEvPing from " << ev->Sender);
500-
Send(ev->Sender, new NActors::TEvents::TEvPong());
498+
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
499+
LOG_ROW_DISPATCHER_TRACE("TEvHeartbeat from " << ev->Sender);
500+
501+
ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
502+
auto it = Consumers.find(key);
503+
if (it == Consumers.end()) {
504+
LOG_ROW_DISPATCHER_WARN("Wrong consumer, sender " << ev->Sender << ", part id " << ev->Cookie);
505+
return;
506+
}
507+
it->second->EventsQueue.OnEventReceived(ev);
501508
}
502509

503510
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
@@ -578,14 +585,19 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPt
578585
it->second->EventsQueue.Retry();
579586
}
580587

581-
void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr& ev) {
582-
LOG_ROW_DISPATCHER_TRACE("TEvRetryQueuePrivate::TEvPing " << ev->Get()->EventQueueId);
588+
void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) {
589+
LOG_ROW_DISPATCHER_TRACE("TEvRetryQueuePrivate::TEvEvHeartbeat " << ev->Get()->EventQueueId);
583590
auto it = ConsumersByEventQueueId.find(ev->Get()->EventQueueId);
584591
if (it == ConsumersByEventQueueId.end()) {
585592
LOG_ROW_DISPATCHER_WARN("No consumer with EventQueueId = " << ev->Get()->EventQueueId);
586593
return;
587594
}
588-
it->second->EventsQueue.Ping();
595+
auto& sessionInfo = it->second;
596+
597+
bool needSend = sessionInfo->EventsQueue.Heartbeat();
598+
if (needSend) {
599+
sessionInfo->EventsQueue.Send(new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo->PartitionId));
600+
}
589601
}
590602

591603
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) {

ydb/library/yql/dq/actors/common/retry_queue.cpp

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ void TRetryEventsQueue::HandleNodeConnected(ui32 nodeId) {
5656
}
5757
}
5858
if (KeepAlive) {
59-
SchedulePing();
59+
ScheduleHeartbeat();
6060
}
6161
}
6262
}
@@ -86,21 +86,16 @@ void TRetryEventsQueue::Retry() {
8686
}
8787
}
8888

89-
void TRetryEventsQueue::Ping() {
90-
PingScheduled = false;
89+
bool TRetryEventsQueue::Heartbeat() {
90+
HeartbeatScheduled = false;
9191

9292
if (!Connected) {
93-
return;
93+
return false;
9494
}
95-
96-
if (TInstant::Now() - LastReceivedDataTime < TDuration::Seconds(PingPeriodSeconds)) {
97-
SchedulePing();
98-
return;
99-
}
100-
101-
auto ev = MakeHolder<NActors::TEvents::TEvPing>();
102-
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), NActors::IEventHandle::FlagTrackDelivery));
103-
SchedulePing();
95+
ScheduleHeartbeat();
96+
auto now = TInstant::Now();
97+
return (now - LastReceivedDataTime >= TDuration::Seconds(PingPeriodSeconds)
98+
|| (now - LastSentDataTime >= TDuration::Seconds(PingPeriodSeconds)));
10499
}
105100

106101
void TRetryEventsQueue::Connect() {
@@ -133,6 +128,7 @@ void TRetryEventsQueue::RemoveConfirmedEvents(ui64 confirmedSeqNo) {
133128
}
134129

135130
void TRetryEventsQueue::SendRetryable(const IRetryableEvent::TPtr& ev) {
131+
LastSentDataTime = TInstant::Now();
136132
NActors::TActivationContext::Send(ev->Clone(MyConfirmedSeqNo).Release());
137133
}
138134

@@ -148,13 +144,13 @@ void TRetryEventsQueue::ScheduleRetry() {
148144
NActors::TActivationContext::Schedule(RetryState->GetNextDelay(), new NActors::IEventHandle(SelfId, SelfId, ev.Release()));
149145
}
150146

151-
void TRetryEventsQueue::SchedulePing() {
152-
if (!KeepAlive || PingScheduled) {
147+
void TRetryEventsQueue::ScheduleHeartbeat() {
148+
if (!KeepAlive || HeartbeatScheduled) {
153149
return;
154150
}
155151

156-
PingScheduled = true;
157-
auto ev = MakeHolder<TEvRetryQueuePrivate::TEvPing>(EventQueueId);
152+
HeartbeatScheduled = true;
153+
auto ev = MakeHolder<TEvRetryQueuePrivate::TEvEvHeartbeat>(EventQueueId);
158154
NActors::TActivationContext::Schedule(TDuration::Seconds(PingPeriodSeconds), new NActors::IEventHandle(SelfId, SelfId, ev.Release()));
159155
}
160156

ydb/library/yql/dq/actors/common/retry_queue.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ struct TEvRetryQueuePrivate {
1818
enum EEv : ui32 {
1919
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
2020
EvRetry = EvBegin,
21-
EvPing,
21+
EvHeartbeat,
2222
EvSessionClosed, // recipientId does not exist anymore
2323
EvEnd
2424
};
@@ -34,8 +34,8 @@ struct TEvRetryQueuePrivate {
3434
const ui64 EventQueueId;
3535
};
3636

37-
struct TEvPing : NActors::TEventLocal<TEvPing, EvPing> {
38-
explicit TEvPing(ui64 eventQueueId)
37+
struct TEvEvHeartbeat : NActors::TEventLocal<TEvEvHeartbeat, EvHeartbeat> {
38+
explicit TEvEvHeartbeat(ui64 eventQueueId)
3939
: EventQueueId(eventQueueId)
4040
{ }
4141
const ui64 EventQueueId;
@@ -91,6 +91,7 @@ class TRetryEventsQueue {
9191
template <TProtobufEventWithTransportMeta T>
9292
void Send(THolder<T> ev, ui64 cookie = 0) {
9393
if (LocalRecipient) {
94+
LastSentDataTime = TInstant::Now();
9495
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), cookie));
9596
return;
9697
}
@@ -149,7 +150,8 @@ class TRetryEventsQueue {
149150
void HandleNodeDisconnected(ui32 nodeId);
150151
bool HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev);
151152
void Retry();
152-
void Ping();
153+
bool Heartbeat();
154+
153155
void Unsubscribe();
154156
void PrintInternalState(TStringStream& stream) const;
155157

@@ -164,7 +166,7 @@ class TRetryEventsQueue {
164166
void RemoveConfirmedEvents(ui64 confirmedSeqNo);
165167
void SendRetryable(const IRetryableEvent::TPtr& ev);
166168
void ScheduleRetry();
167-
void SchedulePing();
169+
void ScheduleHeartbeat();
168170
void Connect();
169171

170172
private:
@@ -220,11 +222,12 @@ class TRetryEventsQueue {
220222
std::set<ui64> ReceivedEventsSeqNos;
221223
bool Connected = false;
222224
bool RetryScheduled = false;
223-
bool PingScheduled = false;
225+
bool HeartbeatScheduled = false;
224226
TMaybe<TRetryState> RetryState;
225227
TTxId TxId;
226228
bool KeepAlive = false;
227229
TInstant LastReceivedDataTime = TInstant::Now();
230+
TInstant LastSentDataTime = TInstant::Now();
228231
bool UseConnect = true;
229232
};
230233

ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ class ClientActor : public TActorBootstrapped<ClientActor> {
5151
EventsQueue.Retry();
5252
}
5353

54-
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr& ) {
55-
EventsQueue.Ping();
54+
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ) {
55+
if (EventsQueue.Heartbeat()) {
56+
EventsQueue.Send(new TEvDqCompute::TEvInjectCheckpoint());
57+
}
5658
}
5759

5860
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ) {
@@ -77,7 +79,7 @@ class ClientActor : public TActorBootstrapped<ClientActor> {
7779

7880
STRICT_STFUNC(StateFunc,
7981
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
80-
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvPing, Handle);
82+
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
8183
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
8284
hFunc(TEvPrivate::TEvSend, Handle);
8385
hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected);

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,10 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
199199
void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev);
200200
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev);
201201
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&);
202-
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr&);
202+
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
203203
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
204204
void Handle(NActors::TEvents::TEvPong::TPtr& ev);
205-
void Handle(const NActors::TEvents::TEvPing::TPtr&);
205+
void Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr&);
206206
void Handle(TEvPrivate::TEvPrintState::TPtr&);
207207
void Handle(TEvPrivate::TEvProcessState::TPtr&);
208208

@@ -220,9 +220,9 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
220220
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
221221
hFunc(NActors::TEvents::TEvUndelivered, Handle);
222222
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
223-
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvPing, Handle);
223+
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
224224
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
225-
hFunc(NActors::TEvents::TEvPing, Handle);
225+
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
226226
hFunc(TEvPrivate::TEvPrintState, Handle);
227227
hFunc(TEvPrivate::TEvProcessState, Handle);
228228
})
@@ -520,21 +520,33 @@ void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::T
520520
sessionIt->second.EventsQueue.Retry();
521521
}
522522

523-
void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr& ev) {
524-
SRC_LOG_T("TEvRetryQueuePrivate::TEvPing");
523+
void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) {
524+
SRC_LOG_T("TEvRetryQueuePrivate::TEvEvHeartbeat");
525525
ui64 partitionId = ev->Get()->EventQueueId;
526526

527527
auto sessionIt = Sessions.find(partitionId);
528528
if (sessionIt == Sessions.end()) {
529529
SRC_LOG_W("Unknown partition id " << partitionId << ", skip TEvPing");
530530
return;
531531
}
532-
sessionIt->second.EventsQueue.Ping();
532+
auto& sessionInfo = sessionIt->second;
533+
bool needSend = sessionInfo.EventsQueue.Heartbeat();
534+
if (needSend) {
535+
sessionInfo.EventsQueue.Send(new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo.PartitionId));
536+
}
533537
}
534538

535-
void TDqPqRdReadActor::Handle(const NActors::TEvents::TEvPing::TPtr& ev) {
536-
SRC_LOG_T("NActors::TEvents::TEvPing");
537-
Send(ev->Sender, new NActors::TEvents::TEvPong());
539+
void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
540+
SRC_LOG_T("TEvHeartbeat");
541+
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
542+
543+
auto sessionIt = Sessions.find(ev->Get()->Record.GetPartitionId());
544+
if (sessionIt == Sessions.end()) {
545+
SRC_LOG_W("Ignore TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
546+
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << ev->Get()->Record.GetPartitionId());
547+
return;
548+
}
549+
sessionIt->second.EventsQueue.OnEventReceived(ev);
538550
}
539551

540552
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {

0 commit comments

Comments
 (0)