Skip to content

Commit 2c99cf0

Browse files
authored
MonPage/ReconnectPeriod to q-stable-2024-07-08 (#11451)
1 parent 02234c6 commit 2c99cf0

File tree

7 files changed

+45
-12
lines changed

7 files changed

+45
-12
lines changed

ydb/core/fq/libs/config/protos/common.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ message TCommonConfig {
3131
bool DisableSslForGenericDataSources = 15;
3232
bool ShowQueryTimeline = 16;
3333
uint64 MaxQueryTimelineSize = 17; // default: 200KB
34+
string PqReconnectPeriod = 18; // default: disabled
3435
}

ydb/core/fq/libs/init/init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ void Init(
236236
appData->FunctionRegistry
237237
);
238238
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, NYql::CreatePqNativeGateway(std::move(pqServices)),
239-
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
239+
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), protoConfig.GetCommon().GetPqReconnectPeriod());
240240

241241
s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
242242
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
#include <ydb/library/yql/providers/dq/counters/counters.h>
1010
#include <ydb/library/yql/public/purecalc/common/interface.h>
1111

12+
#include <ydb/core/base/appdata_fwd.h>
1213
#include <ydb/core/fq/libs/actors/logging/log.h>
1314
#include <ydb/core/fq/libs/events/events.h>
15+
#include <ydb/core/mon/mon.h>
1416

1517
#include <ydb/core/fq/libs/row_dispatcher/actors_factory.h>
1618
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
@@ -223,11 +225,12 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
223225
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
224226
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
225227
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
228+
void Handle(const NMon::TEvHttpInfo::TPtr&);
226229

227230
void DeleteConsumer(const ConsumerSessionKey& key);
228231
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
229232
void UpdateMetrics();
230-
void PrintInternalState();
233+
TString GetInternalState();
231234

232235
STRICT_STFUNC(
233236
StateFunc, {
@@ -252,6 +255,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
252255
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
253256
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
254257
hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle);
258+
hFunc(NMon::TEvHttpInfo, Handle);
255259
})
256260
};
257261

@@ -287,6 +291,13 @@ void TRowDispatcher::Bootstrap() {
287291
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
288292
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
289293
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
294+
295+
NActors::TMon* mon = NKikimr::AppData()->Mon;
296+
if (mon) {
297+
::NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
298+
mon->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false,
299+
TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
300+
}
290301
}
291302

292303
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
@@ -372,7 +383,7 @@ void TRowDispatcher::UpdateMetrics() {
372383
}
373384
}
374385

375-
void TRowDispatcher::PrintInternalState() {
386+
TString TRowDispatcher::GetInternalState() {
376387
TStringStream str;
377388
str << "Statistics:\n";
378389
for (auto& [key, sessionsInfo] : TopicSessions) {
@@ -390,7 +401,7 @@ void TRowDispatcher::PrintInternalState() {
390401
}
391402
}
392403
}
393-
LOG_ROW_DISPATCHER_DEBUG(str.Str());
404+
return str.Str();
394405
}
395406

396407
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
@@ -632,10 +643,22 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
632643
}
633644

634645
void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) {
635-
PrintInternalState();
646+
LOG_ROW_DISPATCHER_DEBUG(GetInternalState());
636647
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
637648
}
638649

650+
void TRowDispatcher::Handle(const NMon::TEvHttpInfo::TPtr& ev) {
651+
TStringStream str;
652+
HTML(str) {
653+
PRE() {
654+
str << "Current state:" << Endl;
655+
str << GetInternalState() << Endl;
656+
str << Endl;
657+
}
658+
}
659+
Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
660+
}
661+
639662
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {
640663
LOG_ROW_DISPATCHER_TRACE("TEvSessionStatistic from " << ev->Sender);
641664
const auto& key = ev->Get()->Stat.SessionKey;

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
109109
if (Settings.HasOffset()) {
110110
NextMessageOffset = Settings.GetOffset();
111111
}
112+
Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod));
112113
}
113114
NFq::NRowDispatcherProto::TEvStartSession Settings;
114115
NActors::TActorId ReadActorId;
@@ -119,6 +120,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
119120
TMaybe<ui64> NextMessageOffset;
120121
ui64 LastSendedNextMessageOffset = 0;
121122
TVector<ui64> FieldsIds;
123+
TDuration ReconnectPeriod;
122124
};
123125

124126
struct TTopicEventProcessor {
@@ -239,6 +241,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
239241
hFunc(NFq::TEvPrivate::TEvStatus, Handle);
240242
hFunc(NFq::TEvPrivate::TEvDataFiltered, Handle);
241243
hFunc(NFq::TEvPrivate::TEvSendStatistic, Handle);
244+
hFunc(NFq::TEvPrivate::TEvReconnectSession, Handle);
242245
hFunc(TEvRowDispatcher::TEvGetNextBatch, Handle);
243246
hFunc(NFq::TEvRowDispatcher::TEvStartSession, Handle);
244247
sFunc(NFq::TEvPrivate::TEvStartParsing, DoParsing);
@@ -387,15 +390,15 @@ void TTopicSession::CreateTopicSession() {
387390
SubscribeOnNextEvent();
388391
}
389392

390-
if (!InflightReconnect) {
393+
if (!InflightReconnect && Clients) {
391394
// Use any sourceParams.
392-
const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second.Settings.GetSource();
393-
Y_UNUSED(TDuration::TryParse(sourceParams.GetReconnectPeriod(), ReconnectPeriod));
395+
ReconnectPeriod = Clients.begin()->second.ReconnectPeriod;
394396
if (ReconnectPeriod != TDuration::Zero()) {
397+
LOG_ROW_DISPATCHER_INFO("ReconnectPeriod " << ReconnectPeriod.ToString());
395398
Metrics.ReconnectRate->Inc();
396399
Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession());
400+
InflightReconnect = true;
397401
}
398-
InflightReconnect = true;
399402
}
400403
}
401404

ydb/core/fq/libs/row_dispatcher/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ SRCS(
1414
PEERDIR(
1515
contrib/libs/fmt
1616
contrib/libs/simdjson
17+
ydb/core/base
1718
ydb/core/fq/libs/actors/logging
1819
ydb/core/fq/libs/config/protos
1920
ydb/core/fq/libs/control_plane_storage
2021
ydb/core/fq/libs/row_dispatcher/events
2122
ydb/core/fq/libs/shared_resources
2223
ydb/core/fq/libs/ydb
24+
ydb/core/mon
2325
ydb/library/actors/core
2426
ydb/library/security
2527
ydb/library/yql/dq/actors/common

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -655,14 +655,18 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
655655
return {actor, actor};
656656
}
657657

658-
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters) {
658+
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, const TString& reconnectPeriod) {
659659
factory.RegisterSource<NPq::NProto::TDqPqTopicSource>("PqSource",
660-
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway](
660+
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, reconnectPeriod](
661661
NPq::NProto::TDqPqTopicSource&& settings,
662662
IDqAsyncIoFactory::TSourceArguments&& args)
663663
{
664664
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER));
665665

666+
if (reconnectPeriod) {
667+
settings.SetReconnectPeriod(reconnectPeriod);
668+
}
669+
666670
if (!settings.GetSharedReading()) {
667671
return CreateDqPqReadActor(
668672
std::move(settings),

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,6 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
4040
i64 bufferSize = PQReadDefaultFreeSpace
4141
);
4242

43-
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>());
43+
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), const TString& reconnectPeriod = {});
4444

4545
} // namespace NYql::NDq

0 commit comments

Comments
 (0)