9
9
#include < ydb/library/yql/providers/dq/counters/counters.h>
10
10
#include < ydb/library/yql/public/purecalc/common/interface.h>
11
11
12
+ #include < ydb/core/base/appdata_fwd.h>
12
13
#include < ydb/core/fq/libs/actors/logging/log.h>
13
14
#include < ydb/core/fq/libs/events/events.h>
15
+ #include < ydb/core/mon/mon.h>
14
16
15
17
#include < ydb/core/fq/libs/row_dispatcher/actors_factory.h>
16
18
#include < ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
@@ -223,11 +225,12 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
223
225
void Handle (const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
224
226
void Handle (NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
225
227
void Handle (NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
228
+ void Handle (const NMon::TEvHttpInfo::TPtr&);
226
229
227
230
void DeleteConsumer (const ConsumerSessionKey& key);
228
231
void UpdateInterconnectSessions (const NActors::TActorId& interconnectSession);
229
232
void UpdateMetrics ();
230
- void PrintInternalState ();
233
+ TString GetInternalState ();
231
234
232
235
STRICT_STFUNC (
233
236
StateFunc, {
@@ -252,6 +255,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
252
255
hFunc (NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
253
256
hFunc (NFq::TEvPrivate::TEvUpdateMetrics, Handle);
254
257
hFunc (NFq::TEvPrivate::TEvPrintStateToLog, Handle);
258
+ hFunc (NMon::TEvHttpInfo, Handle);
255
259
})
256
260
};
257
261
@@ -287,6 +291,13 @@ void TRowDispatcher::Bootstrap() {
287
291
Schedule (TDuration::Seconds (CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing ());
288
292
Schedule (TDuration::Seconds (UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics ());
289
293
Schedule (TDuration::Seconds (PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog ());
294
+
295
+ NActors::TMon* mon = NKikimr::AppData ()->Mon ;
296
+ if (mon) {
297
+ ::NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage (" actors" , " Actors" );
298
+ mon->RegisterActorPage (actorsMonPage, " row_dispatcher" , " Row Dispatcher" , false ,
299
+ TlsActivationContext->ExecutorThread .ActorSystem , SelfId ());
300
+ }
290
301
}
291
302
292
303
void TRowDispatcher::Handle (NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
@@ -372,7 +383,7 @@ void TRowDispatcher::UpdateMetrics() {
372
383
}
373
384
}
374
385
375
- void TRowDispatcher::PrintInternalState () {
386
+ TString TRowDispatcher::GetInternalState () {
376
387
TStringStream str;
377
388
str << " Statistics:\n " ;
378
389
for (auto & [key, sessionsInfo] : TopicSessions) {
@@ -390,7 +401,7 @@ void TRowDispatcher::PrintInternalState() {
390
401
}
391
402
}
392
403
}
393
- LOG_ROW_DISPATCHER_DEBUG ( str.Str () );
404
+ return str.Str ();
394
405
}
395
406
396
407
void TRowDispatcher::Handle (NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
@@ -632,10 +643,22 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
632
643
}
633
644
634
645
void TRowDispatcher::Handle (NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) {
635
- PrintInternalState ( );
646
+ LOG_ROW_DISPATCHER_DEBUG ( GetInternalState () );
636
647
Schedule (TDuration::Seconds (PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog ());
637
648
}
638
649
650
+ void TRowDispatcher::Handle (const NMon::TEvHttpInfo::TPtr& ev) {
651
+ TStringStream str;
652
+ HTML (str) {
653
+ PRE () {
654
+ str << " Current state:" << Endl;
655
+ str << GetInternalState () << Endl;
656
+ str << Endl;
657
+ }
658
+ }
659
+ Send (ev->Sender , new NMon::TEvHttpInfoRes (str.Str ()));
660
+ }
661
+
639
662
void TRowDispatcher::Handle (NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {
640
663
LOG_ROW_DISPATCHER_TRACE (" TEvSessionStatistic from " << ev->Sender );
641
664
const auto & key = ev->Get ()->Stat .SessionKey ;
0 commit comments