Skip to content

Commit 248c41c

Browse files
authored
YQ: Sync to stable (#11503)
1 parent 2c99cf0 commit 248c41c

File tree

7 files changed

+99
-53
lines changed

7 files changed

+99
-53
lines changed

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ struct TEvRowDispatcher {
2828
EvCoordinatorRequest,
2929
EvCoordinatorResult,
3030
EvSessionStatistic,
31+
EvHeartbeat,
3132
EvEnd,
3233
};
3334

@@ -128,6 +129,13 @@ struct TEvRowDispatcher {
128129
: Stat(stat) {}
129130
TopicSessionStatistic Stat;
130131
};
132+
133+
struct TEvHeartbeat : public NActors::TEventPB<TEvHeartbeat, NFq::NRowDispatcherProto::TEvHeartbeat, EEv::EvHeartbeat> {
134+
TEvHeartbeat() = default;
135+
TEvHeartbeat(ui32 partitionId) {
136+
Record.SetPartitionId(partitionId);
137+
}
138+
};
131139
};
132140

133141
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/protos/events.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,7 @@ message TEvSessionError {
7676
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
7777
}
7878

79+
message TEvHeartbeat {
80+
uint32 PartitionId = 1;
81+
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
82+
}

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
171171
TActorId TopicSessionId;
172172
const TString QueryId;
173173
ConsumerCounters Counters;
174+
bool PendingGetNextBatch = false;
175+
bool PendingNewDataArrived = false;
174176
TopicSessionClientStatistic Stat;
175177
};
176178

@@ -219,9 +221,9 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
219221
void Handle(NFq::TEvRowDispatcher::TEvStatus::TPtr& ev);
220222
void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev);
221223

222-
void Handle(NActors::TEvents::TEvPing::TPtr& ev);
224+
void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev);
223225
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&);
224-
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr&);
226+
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
225227
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
226228
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
227229
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
@@ -249,9 +251,9 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
249251
hFunc(NFq::TEvRowDispatcher::TEvStatus, Handle);
250252
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
251253
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
252-
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvPing, Handle);
254+
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
253255
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
254-
hFunc(NActors::TEvents::TEvPing, Handle);
256+
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
255257
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
256258
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
257259
hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle);
@@ -346,7 +348,7 @@ void TRowDispatcher::Handle(TEvPrivate::TEvCoordinatorPing::TPtr&) {
346348
}
347349

348350
void TRowDispatcher::Handle(NActors::TEvents::TEvPong::TPtr&) {
349-
LOG_ROW_DISPATCHER_TRACE("NActors::TEvents::TEvPong ");
351+
LOG_ROW_DISPATCHER_TRACE("NActors::TEvents::TEvPong");
350352
}
351353

352354
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe::TPtr& ev) {
@@ -395,7 +397,8 @@ TString TRowDispatcher::GetInternalState() {
395397
str << " " << consumer->QueryId << " " << readActorId << " unread rows "
396398
<< consumer->Stat.UnreadRows << " unread bytes " << consumer->Stat.UnreadBytes << " offset " << consumer->Stat.Offset
397399
<< " get " << consumer->Counters.GetNextBatch
398-
<< " arrived " << consumer->Counters.NewDataArrived << " batch " << consumer->Counters.MessageBatch << " ";
400+
<< " arr " << consumer->Counters.NewDataArrived << " btc " << consumer->Counters.MessageBatch
401+
<< " pend get " << consumer->PendingGetNextBatch << " pend new " << consumer->PendingNewDataArrived << " ";
399402
str << " retry queue: ";
400403
consumer->EventsQueue.PrintInternalState(str);
401404
}
@@ -486,19 +489,27 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
486489
LOG_ROW_DISPATCHER_ERROR("TEvGetNextBatch: wrong seq num from " << ev->Sender.ToString() << ", seqNo " << seqNo << ", ignore message");
487490
return;
488491
}
492+
it->second->PendingNewDataArrived = false;
493+
it->second->PendingGetNextBatch = true;
489494
it->second->Counters.GetNextBatch++;
490495
Forward(ev, it->second->TopicSessionId);
491496
}
492497

493-
void TRowDispatcher::Handle(NActors::TEvents::TEvPing::TPtr& ev) {
494-
LOG_ROW_DISPATCHER_TRACE("TEvPing from " << ev->Sender);
495-
Send(ev->Sender, new NActors::TEvents::TEvPong());
498+
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
499+
LOG_ROW_DISPATCHER_TRACE("TEvHeartbeat from " << ev->Sender);
500+
501+
ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
502+
auto it = Consumers.find(key);
503+
if (it == Consumers.end()) {
504+
LOG_ROW_DISPATCHER_WARN("Wrong consumer, sender " << ev->Sender << ", part id " << ev->Cookie);
505+
return;
506+
}
507+
it->second->EventsQueue.OnEventReceived(ev);
496508
}
497509

498510
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
499511
LOG_ROW_DISPATCHER_DEBUG("TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
500512
" partitionId " << ev->Get()->Record.GetPartitionId());
501-
502513
ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
503514
auto it = Consumers.find(key);
504515
if (it == Consumers.end()) {
@@ -574,14 +585,19 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPt
574585
it->second->EventsQueue.Retry();
575586
}
576587

577-
void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr& ev) {
578-
LOG_ROW_DISPATCHER_TRACE("TEvRetryQueuePrivate::TEvPing " << ev->Get()->EventQueueId);
588+
void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) {
589+
LOG_ROW_DISPATCHER_TRACE("TEvRetryQueuePrivate::TEvEvHeartbeat " << ev->Get()->EventQueueId);
579590
auto it = ConsumersByEventQueueId.find(ev->Get()->EventQueueId);
580591
if (it == ConsumersByEventQueueId.end()) {
581592
LOG_ROW_DISPATCHER_WARN("No consumer with EventQueueId = " << ev->Get()->EventQueueId);
582593
return;
583594
}
584-
it->second->EventsQueue.Ping();
595+
auto& sessionInfo = it->second;
596+
597+
bool needSend = sessionInfo->EventsQueue.Heartbeat();
598+
if (needSend) {
599+
sessionInfo->EventsQueue.Send(new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo->PartitionId));
600+
}
585601
}
586602

587603
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) {
@@ -593,6 +609,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev)
593609
return;
594610
}
595611
LOG_ROW_DISPATCHER_TRACE("Forward TEvNewDataArrived to " << ev->Get()->ReadActorId);
612+
it->second->PendingNewDataArrived = true;
596613
it->second->Counters.NewDataArrived++;
597614
it->second->EventsQueue.Send(ev->Release().Release());
598615
}
@@ -607,6 +624,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
607624
}
608625
Metrics.RowsSent->Add(ev->Get()->Record.MessagesSize());
609626
LOG_ROW_DISPATCHER_TRACE("Forward TEvMessageBatch to " << ev->Get()->ReadActorId);
627+
it->second->PendingGetNextBatch = false;
610628
it->second->Counters.MessageBatch++;
611629
it->second->EventsQueue.Send(ev->Release().Release());
612630
}

ydb/library/yql/dq/actors/common/retry_queue.cpp

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ void TRetryEventsQueue::HandleNodeConnected(ui32 nodeId) {
5656
}
5757
}
5858
if (KeepAlive) {
59-
SchedulePing();
59+
ScheduleHeartbeat();
6060
}
6161
}
6262
}
@@ -86,21 +86,16 @@ void TRetryEventsQueue::Retry() {
8686
}
8787
}
8888

89-
void TRetryEventsQueue::Ping() {
90-
PingScheduled = false;
89+
bool TRetryEventsQueue::Heartbeat() {
90+
HeartbeatScheduled = false;
9191

9292
if (!Connected) {
93-
return;
93+
return false;
9494
}
95-
96-
if (TInstant::Now() - LastReceivedDataTime < TDuration::Seconds(PingPeriodSeconds)) {
97-
SchedulePing();
98-
return;
99-
}
100-
101-
auto ev = MakeHolder<NActors::TEvents::TEvPing>();
102-
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), NActors::IEventHandle::FlagTrackDelivery));
103-
SchedulePing();
95+
ScheduleHeartbeat();
96+
auto now = TInstant::Now();
97+
return (now - LastReceivedDataTime >= TDuration::Seconds(PingPeriodSeconds)
98+
|| (now - LastSentDataTime >= TDuration::Seconds(PingPeriodSeconds)));
10499
}
105100

106101
void TRetryEventsQueue::Connect() {
@@ -133,6 +128,7 @@ void TRetryEventsQueue::RemoveConfirmedEvents(ui64 confirmedSeqNo) {
133128
}
134129

135130
void TRetryEventsQueue::SendRetryable(const IRetryableEvent::TPtr& ev) {
131+
LastSentDataTime = TInstant::Now();
136132
NActors::TActivationContext::Send(ev->Clone(MyConfirmedSeqNo).Release());
137133
}
138134

@@ -148,13 +144,13 @@ void TRetryEventsQueue::ScheduleRetry() {
148144
NActors::TActivationContext::Schedule(RetryState->GetNextDelay(), new NActors::IEventHandle(SelfId, SelfId, ev.Release()));
149145
}
150146

151-
void TRetryEventsQueue::SchedulePing() {
152-
if (!KeepAlive || PingScheduled) {
147+
void TRetryEventsQueue::ScheduleHeartbeat() {
148+
if (!KeepAlive || HeartbeatScheduled) {
153149
return;
154150
}
155151

156-
PingScheduled = true;
157-
auto ev = MakeHolder<TEvRetryQueuePrivate::TEvPing>(EventQueueId);
152+
HeartbeatScheduled = true;
153+
auto ev = MakeHolder<TEvRetryQueuePrivate::TEvEvHeartbeat>(EventQueueId);
158154
NActors::TActivationContext::Schedule(TDuration::Seconds(PingPeriodSeconds), new NActors::IEventHandle(SelfId, SelfId, ev.Release()));
159155
}
160156

@@ -173,7 +169,7 @@ TDuration TRetryEventsQueue::TRetryState::RandomizeDelay(TDuration baseDelay) {
173169

174170
void TRetryEventsQueue::PrintInternalState(TStringStream& stream) const {
175171
stream << "id " << EventQueueId << ", NextSeqNo "
176-
<< NextSeqNo << ", MyConfSeqNo " << MyConfirmedSeqNo << ", SeqNos " << ReceivedEventsSeqNos.size() << ", events size " << Events.size() << "\n";
172+
<< NextSeqNo << ", MyConfSeqNo " << MyConfirmedSeqNo << ", SeqNos " << ReceivedEventsSeqNos.size() << ", events size " << Events.size() << ", connected " << Connected << "\n";
177173
}
178174

179175

ydb/library/yql/dq/actors/common/retry_queue.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ struct TEvRetryQueuePrivate {
1818
enum EEv : ui32 {
1919
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
2020
EvRetry = EvBegin,
21-
EvPing,
21+
EvHeartbeat,
2222
EvSessionClosed, // recipientId does not exist anymore
2323
EvEnd
2424
};
@@ -34,8 +34,8 @@ struct TEvRetryQueuePrivate {
3434
const ui64 EventQueueId;
3535
};
3636

37-
struct TEvPing : NActors::TEventLocal<TEvPing, EvPing> {
38-
explicit TEvPing(ui64 eventQueueId)
37+
struct TEvEvHeartbeat : NActors::TEventLocal<TEvEvHeartbeat, EvHeartbeat> {
38+
explicit TEvEvHeartbeat(ui64 eventQueueId)
3939
: EventQueueId(eventQueueId)
4040
{ }
4141
const ui64 EventQueueId;
@@ -91,6 +91,7 @@ class TRetryEventsQueue {
9191
template <TProtobufEventWithTransportMeta T>
9292
void Send(THolder<T> ev, ui64 cookie = 0) {
9393
if (LocalRecipient) {
94+
LastSentDataTime = TInstant::Now();
9495
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), cookie));
9596
return;
9697
}
@@ -149,7 +150,8 @@ class TRetryEventsQueue {
149150
void HandleNodeDisconnected(ui32 nodeId);
150151
bool HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev);
151152
void Retry();
152-
void Ping();
153+
bool Heartbeat();
154+
153155
void Unsubscribe();
154156
void PrintInternalState(TStringStream& stream) const;
155157

@@ -164,7 +166,7 @@ class TRetryEventsQueue {
164166
void RemoveConfirmedEvents(ui64 confirmedSeqNo);
165167
void SendRetryable(const IRetryableEvent::TPtr& ev);
166168
void ScheduleRetry();
167-
void SchedulePing();
169+
void ScheduleHeartbeat();
168170
void Connect();
169171

170172
private:
@@ -220,11 +222,12 @@ class TRetryEventsQueue {
220222
std::set<ui64> ReceivedEventsSeqNos;
221223
bool Connected = false;
222224
bool RetryScheduled = false;
223-
bool PingScheduled = false;
225+
bool HeartbeatScheduled = false;
224226
TMaybe<TRetryState> RetryState;
225227
TTxId TxId;
226228
bool KeepAlive = false;
227229
TInstant LastReceivedDataTime = TInstant::Now();
230+
TInstant LastSentDataTime = TInstant::Now();
228231
bool UseConnect = true;
229232
};
230233

ydb/library/yql/dq/actors/common/ut/retry_events_queue_ut.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ class ClientActor : public TActorBootstrapped<ClientActor> {
5151
EventsQueue.Retry();
5252
}
5353

54-
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr& ) {
55-
EventsQueue.Ping();
54+
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ) {
55+
if (EventsQueue.Heartbeat()) {
56+
EventsQueue.Send(new TEvDqCompute::TEvInjectCheckpoint());
57+
}
5658
}
5759

5860
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ) {
@@ -77,7 +79,7 @@ class ClientActor : public TActorBootstrapped<ClientActor> {
7779

7880
STRICT_STFUNC(StateFunc,
7981
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
80-
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvPing, Handle);
82+
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
8183
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
8284
hFunc(TEvPrivate::TEvSend, Handle);
8385
hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected);

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ struct TEvPrivate {
104104

105105
class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase {
106106

107-
const ui64 PrintStatePeriodSec = 60;
107+
const ui64 PrintStatePeriodSec = 300;
108108
const ui64 ProcessStatePeriodSec = 2;
109109

110110
using TDebugOffsets = TMaybe<std::pair<ui64, ui64>>;
@@ -199,10 +199,10 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
199199
void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev);
200200
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev);
201201
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&);
202-
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr&);
202+
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
203203
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
204204
void Handle(NActors::TEvents::TEvPong::TPtr& ev);
205-
void Handle(const NActors::TEvents::TEvPing::TPtr&);
205+
void Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr&);
206206
void Handle(TEvPrivate::TEvPrintState::TPtr&);
207207
void Handle(TEvPrivate::TEvProcessState::TPtr&);
208208

@@ -220,9 +220,9 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
220220
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
221221
hFunc(NActors::TEvents::TEvUndelivered, Handle);
222222
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
223-
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvPing, Handle);
223+
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
224224
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
225-
hFunc(NActors::TEvents::TEvPing, Handle);
225+
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
226226
hFunc(TEvPrivate::TEvPrintState, Handle);
227227
hFunc(TEvPrivate::TEvProcessState, Handle);
228228
})
@@ -520,21 +520,33 @@ void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::T
520520
sessionIt->second.EventsQueue.Retry();
521521
}
522522

523-
void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr& ev) {
524-
SRC_LOG_T("TEvRetryQueuePrivate::TEvPing");
523+
void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) {
524+
SRC_LOG_T("TEvRetryQueuePrivate::TEvEvHeartbeat");
525525
ui64 partitionId = ev->Get()->EventQueueId;
526526

527527
auto sessionIt = Sessions.find(partitionId);
528528
if (sessionIt == Sessions.end()) {
529529
SRC_LOG_W("Unknown partition id " << partitionId << ", skip TEvPing");
530530
return;
531531
}
532-
sessionIt->second.EventsQueue.Ping();
532+
auto& sessionInfo = sessionIt->second;
533+
bool needSend = sessionInfo.EventsQueue.Heartbeat();
534+
if (needSend) {
535+
sessionInfo.EventsQueue.Send(new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo.PartitionId));
536+
}
533537
}
534538

535-
void TDqPqRdReadActor::Handle(const NActors::TEvents::TEvPing::TPtr& ev) {
536-
SRC_LOG_T("NActors::TEvents::TEvPing");
537-
Send(ev->Sender, new NActors::TEvents::TEvPong());
539+
void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
540+
SRC_LOG_T("TEvHeartbeat");
541+
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
542+
543+
auto sessionIt = Sessions.find(ev->Get()->Record.GetPartitionId());
544+
if (sessionIt == Sessions.end()) {
545+
SRC_LOG_W("Ignore TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
546+
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << ev->Get()->Record.GetPartitionId());
547+
return;
548+
}
549+
sessionIt->second.EventsQueue.OnEventReceived(ev);
538550
}
539551

540552
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
@@ -699,10 +711,13 @@ void TDqPqRdReadActor::PrintInternalState() {
699711
TStringStream str;
700712
str << "State:\n";
701713
for (auto& [partitionId, sessionInfo] : Sessions) {
702-
str << " partId " << partitionId << " ";
714+
715+
str << " partId " << partitionId << " status " << static_cast<ui64>(sessionInfo.Status)
716+
<< " next offset " << sessionInfo.NextOffset << " is waiting " << sessionInfo.IsWaitingRowDispatcherResponse
717+
<< " has pending data " << sessionInfo.HasPendingData << " ";
703718
sessionInfo.EventsQueue.PrintInternalState(str);
704719
}
705-
SRC_LOG_D(str.Str());
720+
SRC_LOG_I(str.Str());
706721
}
707722

708723
void TDqPqRdReadActor::Handle(TEvPrivate::TEvProcessState::TPtr&) {

0 commit comments

Comments
 (0)