4
4
#include < ydb/core/base/appdata.h>
5
5
#include < ydb/core/discovery/discovery.h>
6
6
#include < ydb/core/engine/minikql/flat_local_tx_factory.h>
7
+ #include < ydb/core/tablet/tablet_counters_protobuf.h>
7
8
8
9
namespace NKikimr ::NReplication {
9
10
@@ -13,6 +14,13 @@ TController::TController(const TActorId& tablet, TTabletStorageInfo* info)
13
14
: TActor(&TThis::StateInit)
14
15
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
15
16
, LogPrefix(this )
17
+ , TabletCountersPtr(new TProtobufTabletCounters<
18
+ ESimpleCounters_descriptor,
19
+ ECumulativeCounters_descriptor,
20
+ EPercentileCounters_descriptor,
21
+ ETxTypes_descriptor
22
+ >())
23
+ , TabletCounters(TabletCountersPtr.Get())
16
24
{
17
25
}
18
26
@@ -30,6 +38,7 @@ void TController::OnTabletDead(TEvTablet::TEvTabletDead::TPtr&, const TActorCont
30
38
31
39
void TController::OnActivateExecutor (const TActorContext& ctx) {
32
40
CLOG_T (ctx, " OnActivateExecutor" );
41
+ Executor ()->RegisterExternalTabletCounters (TabletCountersPtr.Release ());
33
42
RunTxInitSchema (ctx);
34
43
}
35
44
@@ -292,9 +301,11 @@ void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext&
292
301
void TController::CreateSession (ui32 nodeId, const TActorContext& ctx) {
293
302
CLOG_D (ctx, " Create session"
294
303
<< " : nodeId# " << nodeId);
304
+ TabletCounters->Cumulative ()[COUNTER_CREATE_SESSION] += 1 ;
295
305
296
306
Y_ABORT_UNLESS (!Sessions.contains (nodeId));
297
307
Sessions.emplace (nodeId, TSessionInfo ());
308
+ TabletCounters->Simple ()[COUNTER_SESSIONS] = Sessions.size ();
298
309
299
310
auto ev = MakeHolder<TEvService::TEvHandshake>(TabletID (), Executor ()->Generation ());
300
311
ui32 flags = 0 ;
@@ -308,6 +319,7 @@ void TController::CreateSession(ui32 nodeId, const TActorContext& ctx) {
308
319
void TController::DeleteSession (ui32 nodeId, const TActorContext& ctx) {
309
320
CLOG_D (ctx, " Delete session"
310
321
<< " : nodeId# " << nodeId);
322
+ TabletCounters->Cumulative ()[COUNTER_DELETE_SESSION] += 1 ;
311
323
312
324
Y_ABORT_UNLESS (Sessions.contains (nodeId));
313
325
auto & session = Sessions[nodeId];
@@ -327,6 +339,8 @@ void TController::DeleteSession(ui32 nodeId, const TActorContext& ctx) {
327
339
}
328
340
329
341
Sessions.erase (nodeId);
342
+ TabletCounters->Simple ()[COUNTER_SESSIONS] = Sessions.size ();
343
+
330
344
CloseSession (nodeId, ctx);
331
345
ScheduleProcessQueues ();
332
346
}
@@ -431,6 +445,9 @@ void TController::UpdateLag(const TWorkerId& id, TDuration lag) {
431
445
}
432
446
433
447
target->UpdateLag (id.WorkerId (), lag);
448
+ if (const auto lag = replication->GetLag ()) {
449
+ TabletCounters->Simple ()[COUNTER_DATA_LAG] = lag->MilliSeconds ();
450
+ }
434
451
}
435
452
436
453
void TController::Handle (TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx) {
@@ -486,6 +503,7 @@ TWorkerInfo* TController::GetOrCreateWorker(const TWorkerId& id, NKikimrReplicat
486
503
auto it = Workers.find (id);
487
504
if (it == Workers.end ()) {
488
505
it = Workers.emplace (id, cmd).first ;
506
+ TabletCounters->Simple ()[COUNTER_WORKERS] = Workers.size ();
489
507
}
490
508
491
509
auto replication = Find (id.ReplicationId ());
@@ -499,6 +517,9 @@ TWorkerInfo* TController::GetOrCreateWorker(const TWorkerId& id, NKikimrReplicat
499
517
}
500
518
501
519
void TController::ScheduleProcessQueues () {
520
+ TabletCounters->Simple ()[COUNTER_BOOT_QUEUE] = BootQueue.size ();
521
+ TabletCounters->Simple ()[COUNTER_STOP_QUEUE] = StopQueue.size ();
522
+
502
523
if (ProcessQueuesScheduled || (!BootQueue && !StopQueue)) {
503
524
return ;
504
525
}
@@ -652,6 +673,7 @@ void TController::RemoveWorker(const TWorkerId& id, const TActorContext& ctx) {
652
673
653
674
RemoveQueue.erase (id);
654
675
Workers.erase (id);
676
+ TabletCounters->Simple ()[COUNTER_WORKERS] = Workers.size ();
655
677
656
678
auto replication = Find (id.ReplicationId ());
657
679
if (!replication) {
0 commit comments