@@ -85,6 +85,30 @@ struct TEvPrivate {
85
85
} // namespace
86
86
87
87
class TDqPqReadActor : public NActors ::TActor<TDqPqReadActor>, public IDqComputeActorAsyncInput {
88
+ struct TMetrics {
89
+ TMetrics (const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters)
90
+ : TxId(std::visit([](auto arg) { return ToString (arg); }, txId))
91
+ , Counters(counters) {
92
+ SubGroup = Counters->GetSubgroup (" sink" , " PqRead" );
93
+ auto sink = SubGroup->GetSubgroup (" tx_id" , TxId);
94
+ auto task = sink->GetSubgroup (" task_id" , ToString (taskId));
95
+ InFlyAsyncInputData = task->GetCounter (" InFlyAsyncInputData" );
96
+ InFlySubscribe = task->GetCounter (" InFlySubscribe" );
97
+ AsyncInputDataRate = task->GetCounter (" AsyncInputDataRate" , true );
98
+ }
99
+
100
+ ~TMetrics () {
101
+ SubGroup->RemoveSubgroup (" id" , TxId);
102
+ }
103
+
104
+ TString TxId;
105
+ ::NMonitoring::TDynamicCounterPtr Counters;
106
+ ::NMonitoring::TDynamicCounterPtr SubGroup;
107
+ ::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
108
+ ::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
109
+ ::NMonitoring::TDynamicCounters::TCounterPtr AsyncInputDataRate;
110
+ };
111
+
88
112
public:
89
113
using TPartitionKey = std::pair<TString, ui64>; // Cluster, partition id.
90
114
using TDebugOffsets = TMaybe<std::pair<ui64, ui64>>;
@@ -100,10 +124,12 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
100
124
NYdb::TDriver driver,
101
125
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
102
126
const NActors::TActorId& computeActorId,
127
+ const ::NMonitoring::TDynamicCounterPtr& counters,
103
128
i64 bufferSize)
104
129
: TActor<TDqPqReadActor>(&TDqPqReadActor::StateFunc)
105
130
, InputIndex(inputIndex)
106
131
, TxId(txId)
132
+ , Metrics(txId, taskId, counters)
107
133
, BufferSize(bufferSize)
108
134
, HolderFactory(holderFactory)
109
135
, LogPrefix(TStringBuilder() << " SelfId: " << this ->SelfId () << ", TxId: " << TxId << ", task: " << taskId << ". PQ source. ")
@@ -245,9 +271,14 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
245
271
hFunc (TEvPrivate::TEvSourceDataReady, Handle);
246
272
)
247
273
248
- void Handle (TEvPrivate::TEvSourceDataReady::TPtr&) {
274
+ void Handle (TEvPrivate::TEvSourceDataReady::TPtr& ev ) {
249
275
SRC_LOG_T (" SessionId: " << GetSessionId () << " Source data ready" );
250
276
SubscribedOnEvent = false ;
277
+ if (ev.Get ()->Cookie ) {
278
+ Metrics.InFlySubscribe ->Dec ();
279
+ }
280
+ Metrics.InFlyAsyncInputData ->Set (1 );
281
+ Metrics.AsyncInputDataRate ->Inc ();
251
282
Send (ComputeActorId, new TEvNewAsyncInputDataArrived (InputIndex));
252
283
}
253
284
@@ -282,6 +313,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
282
313
}
283
314
284
315
i64 GetAsyncInputData (NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, bool &, i64 freeSpace) override {
316
+ Metrics.InFlyAsyncInputData ->Set (0 );
285
317
SRC_LOG_T (" SessionId: " << GetSessionId () << " GetAsyncInputData freeSpace = " << freeSpace);
286
318
287
319
const auto now = TInstant::Now ();
@@ -387,9 +419,10 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
387
419
void SubscribeOnNextEvent () {
388
420
if (!SubscribedOnEvent) {
389
421
SubscribedOnEvent = true ;
422
+ Metrics.InFlySubscribe ->Inc ();
390
423
NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem ();
391
424
EventFuture = GetReadSession ().WaitEvent ().Subscribe ([actorSystem, selfId = SelfId ()](const auto &){
392
- actorSystem->Send (selfId, new TEvPrivate::TEvSourceDataReady ());
425
+ actorSystem->Send (selfId, new TEvPrivate::TEvSourceDataReady (), 0 , 1 );
393
426
});
394
427
}
395
428
}
@@ -595,6 +628,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
595
628
const ui64 InputIndex;
596
629
TDqAsyncStats IngressStats;
597
630
const TTxId TxId;
631
+ TMetrics Metrics;
598
632
const i64 BufferSize;
599
633
const THolderFactory& HolderFactory;
600
634
const TString LogPrefix;
@@ -629,6 +663,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
629
663
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
630
664
const NActors::TActorId& computeActorId,
631
665
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
666
+ const ::NMonitoring::TDynamicCounterPtr& counters,
632
667
i64 bufferSize
633
668
)
634
669
{
@@ -653,15 +688,16 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
653
688
std::move (driver),
654
689
CreateCredentialsProviderFactoryForStructuredToken (credentialsFactory, token, addBearerToToken),
655
690
computeActorId,
691
+ counters,
656
692
bufferSize
657
693
);
658
694
659
695
return {actor, actor};
660
696
}
661
697
662
- void RegisterDqPqReadActorFactory (TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
698
+ void RegisterDqPqReadActorFactory (TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters ) {
663
699
factory.RegisterSource <NPq::NProto::TDqPqTopicSource>(" PqSource" ,
664
- [driver = std::move (driver), credentialsFactory = std::move (credentialsFactory)](
700
+ [driver = std::move (driver), credentialsFactory = std::move (credentialsFactory), counters ](
665
701
NPq::NProto::TDqPqTopicSource&& settings,
666
702
IDqAsyncIoFactory::TSourceArguments&& args)
667
703
{
@@ -678,6 +714,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv
678
714
credentialsFactory,
679
715
args.ComputeActorId ,
680
716
args.HolderFactory ,
717
+ counters,
681
718
PQReadDefaultFreeSpace);
682
719
});
683
720
0 commit comments