24
24
#include < ydb/public/sdk/cpp/client/ydb_topic/topic.h>
25
25
#include < ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>
26
26
27
- #include < ydb/library/actors/core/actor_bootstrapped .h>
27
+ #include < ydb/library/actors/core/actor .h>
28
28
#include < ydb/library/actors/core/event_local.h>
29
29
#include < ydb/library/actors/core/events.h>
30
30
#include < ydb/library/actors/core/hfunc.h>
@@ -94,16 +94,19 @@ struct TEvPrivate {
94
94
enum EEv : ui32 {
95
95
EvBegin = EventSpaceBegin (NActors::TEvents::ES_PRIVATE),
96
96
EvPrintState = EvBegin + 20 ,
97
+ EvProcessState = EvBegin + 21 ,
97
98
EvEnd
98
99
};
99
100
static_assert (EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), " expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)" );
100
101
struct TEvPrintState : public NActors ::TEventLocal<TEvPrintState, EvPrintState> {};
102
+ struct TEvProcessState : public NActors ::TEventLocal<TEvProcessState, EvProcessState> {};
101
103
};
102
104
103
- ui64 PrintStatePeriodSec = 60 ;
104
-
105
105
class TDqPqRdReadActor : public NActors ::TActor<TDqPqRdReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase {
106
- public:
106
+
107
+ const ui64 PrintStatePeriodSec = 60 ;
108
+ const ui64 ProcessStatePeriodSec = 2 ;
109
+
107
110
using TDebugOffsets = TMaybe<std::pair<ui64, ui64>>;
108
111
109
112
struct TReadyBatch {
@@ -136,6 +139,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
136
139
ui64 CoordinatorRequestCookie = 0 ;
137
140
TRowDispatcherReadActorMetrics Metrics;
138
141
bool SchedulePrintStatePeriod = false ;
142
+ bool ProcessStateScheduled = false ;
139
143
140
144
struct SessionInfo {
141
145
enum class ESessionStatus {
@@ -193,6 +197,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
193
197
void Handle (NActors::TEvents::TEvPong::TPtr& ev);
194
198
void Handle (const NActors::TEvents::TEvPing::TPtr&);
195
199
void Handle (TEvPrivate::TEvPrintState::TPtr&);
200
+ void Handle (TEvPrivate::TEvProcessState::TPtr&);
196
201
197
202
STRICT_STFUNC (StateFunc, {
198
203
hFunc (NFq::TEvRowDispatcher::TEvCoordinatorChanged, Handle);
@@ -212,6 +217,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
212
217
hFunc (NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
213
218
hFunc (NActors::TEvents::TEvPing, Handle);
214
219
hFunc (TEvPrivate::TEvPrintState, Handle);
220
+ hFunc (TEvPrivate::TEvProcessState, Handle);
215
221
})
216
222
217
223
static constexpr char ActorName[] = " DQ_PQ_READ_ACTOR" ;
@@ -224,7 +230,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
224
230
void ProcessState ();
225
231
void Stop (const TString& message);
226
232
void StopSessions ();
227
- void ReInit ();
233
+ void ReInit (const TString& reason );
228
234
void PrintInternalState ();
229
235
};
230
236
@@ -262,6 +268,10 @@ void TDqPqRdReadActor::ProcessState() {
262
268
if (!ReadyBuffer.empty ()) {
263
269
return ;
264
270
}
271
+ if (!ProcessStateScheduled) {
272
+ ProcessStateScheduled = true ;
273
+ Schedule (TDuration::Seconds (ProcessStatePeriodSec), new TEvPrivate::TEvProcessState ());
274
+ }
265
275
if (!CoordinatorActorId) {
266
276
SRC_LOG_D (" Send TEvCoordinatorChangesSubscribe to local row dispatcher, self id " << SelfId ());
267
277
Send (LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe ());
@@ -401,7 +411,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& e
401
411
402
412
ui64 partitionId = ev->Get ()->Record .GetConsumer ().GetPartitionId ();
403
413
auto sessionIt = Sessions.find (partitionId);
404
- YQL_ENSURE (sessionIt != Sessions.end (), " Unknown partition id" );
414
+ if (sessionIt == Sessions.end ()) {
415
+ SRC_LOG_W (" Ignore TEvStartSessionAck from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
416
+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
417
+ YQL_ENSURE (State != EState::STARTED);
418
+ return ;
419
+ }
405
420
auto & sessionInfo = sessionIt->second ;
406
421
if (!sessionInfo.EventsQueue .OnEventReceived (ev)) {
407
422
SRC_LOG_W (" Wrong seq num ignore message, seqNo " << meta.GetSeqNo ());
@@ -415,7 +430,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev)
415
430
416
431
ui64 partitionId = ev->Get ()->Record .GetPartitionId ();
417
432
auto sessionIt = Sessions.find (partitionId);
418
- YQL_ENSURE (sessionIt != Sessions.end (), " Unknown partition id" );
433
+ if (sessionIt == Sessions.end ()) {
434
+ SRC_LOG_W (" Ignore TEvSessionError from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
435
+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
436
+ YQL_ENSURE (State != EState::STARTED);
437
+ return ;
438
+ }
419
439
420
440
auto & sessionInfo = sessionIt->second ;
421
441
if (!sessionInfo.EventsQueue .OnEventReceived (ev)) {
@@ -431,7 +451,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatus::TPtr& ev) {
431
451
432
452
ui64 partitionId = ev->Get ()->Record .GetPartitionId ();
433
453
auto sessionIt = Sessions.find (partitionId);
434
- YQL_ENSURE (sessionIt != Sessions.end (), " Unknown partition id" );
454
+ if (sessionIt == Sessions.end ()) {
455
+ SRC_LOG_W (" Ignore TEvStatus from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
456
+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
457
+ YQL_ENSURE (State != EState::STARTED);
458
+ return ;
459
+ }
435
460
auto & sessionInfo = sessionIt->second ;
436
461
437
462
if (!sessionInfo.EventsQueue .OnEventReceived (ev)) {
@@ -452,7 +477,9 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
452
477
ui64 partitionId = ev->Get ()->Record .GetPartitionId ();
453
478
auto sessionIt = Sessions.find (partitionId);
454
479
if (sessionIt == Sessions.end ()) {
455
- Stop (" Internal error: unknown partition id " + ToString (partitionId));
480
+ SRC_LOG_W (" Ignore TEvNewDataArrived from " << ev->Sender << " , seqNo " << meta.GetSeqNo ()
481
+ << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo () << " , PartitionId " << partitionId);
482
+ YQL_ENSURE (State != EState::STARTED);
456
483
return ;
457
484
}
458
485
@@ -512,20 +539,18 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr
512
539
}
513
540
514
541
CoordinatorActorId = ev->Get ()->CoordinatorActorId ;
515
- SRC_LOG_I (" Coordinator is changed, reinit all sessions" );
516
- ReInit ();
542
+ ReInit (" Coordinator is changed" );
517
543
ProcessState ();
518
544
}
519
545
520
- void TDqPqRdReadActor::ReInit () {
521
- SRC_LOG_I (" ReInit state" );
546
+ void TDqPqRdReadActor::ReInit (const TString& reason ) {
547
+ SRC_LOG_I (" ReInit state, reason " << reason );
522
548
StopSessions ();
523
549
Sessions.clear ();
524
550
State = EState::INIT;
525
551
if (!ReadyBuffer.empty ()) {
526
552
Send (ComputeActorId, new TEvNewAsyncInputDataArrived (InputIndex));
527
553
}
528
- ProcessState ();
529
554
}
530
555
531
556
void TDqPqRdReadActor::Stop (const TString& message) {
@@ -582,24 +607,23 @@ void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
582
607
}
583
608
584
609
if (CoordinatorActorId && *CoordinatorActorId == ev->Sender ) {
585
- SRC_LOG_D (" TEvUndelivered to coordinator, reinit" );
586
- ReInit ();
610
+ ReInit (" TEvUndelivered to coordinator" );
587
611
}
588
612
}
589
613
590
614
void TDqPqRdReadActor::Handle (NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
591
615
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get ()->Record .GetTransportMeta ();
592
616
SRC_LOG_T (" TEvMessageBatch from " << ev->Sender << " , seqNo " << meta.GetSeqNo () << " , ConfirmedSeqNo " << meta.GetConfirmedSeqNo ());
593
617
ui64 partitionId = ev->Get ()->Record .GetPartitionId ();
594
- YQL_ENSURE (Sessions. count (partitionId), " Unknown partition id " << partitionId);
595
- auto it = Sessions.find (partitionId);
596
- if (it == Sessions. end ()) {
597
- Stop ( " Wrong session data " );
598
- return ;
618
+ auto sessionIt = Sessions. find ( partitionId);
619
+ if (sessionIt == Sessions.end ()) {
620
+ SRC_LOG_W ( " Ignore TEvMessageBatch from " << ev-> Sender << " , seqNo " << meta. GetSeqNo ()
621
+ << " , ConfirmedSeqNo " << meta. GetConfirmedSeqNo () << " , PartitionId " << partitionId );
622
+ YQL_ENSURE (State != EState::STARTED) ;
599
623
}
600
624
601
625
Metrics.InFlyGetNextBatch ->Set (0 );
602
- auto & sessionInfo = it ->second ;
626
+ auto & sessionInfo = sessionIt ->second ;
603
627
if (!sessionInfo.EventsQueue .OnEventReceived (ev)) {
604
628
SRC_LOG_W (" Wrong seq num ignore message, seqNo " << meta.GetSeqNo ());
605
629
return ;
@@ -631,8 +655,7 @@ std::pair<NUdf::TUnboxedValuePod, i64> TDqPqRdReadActor::CreateItem(const TStrin
631
655
}
632
656
633
657
void TDqPqRdReadActor::Handle (const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ev) {
634
- SRC_LOG_D (" Session closed, event queue id " << ev->Get ()->EventQueueId );
635
- ReInit ();
658
+ ReInit (TStringBuilder () << " Session closed, event queue id " << ev->Get ()->EventQueueId );
636
659
}
637
660
638
661
void TDqPqRdReadActor::Handle (NActors::TEvents::TEvPong::TPtr& ev) {
@@ -654,6 +677,11 @@ void TDqPqRdReadActor::PrintInternalState() {
654
677
SRC_LOG_D (str.Str ());
655
678
}
656
679
680
+ void TDqPqRdReadActor::Handle (TEvPrivate::TEvProcessState::TPtr&) {
681
+ Schedule (TDuration::Seconds (ProcessStatePeriodSec), new TEvPrivate::TEvProcessState ());
682
+ ProcessState ();
683
+ }
684
+
657
685
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor (
658
686
NPq::NProto::TDqPqTopicSource&& settings,
659
687
ui64 inputIndex,
0 commit comments