Skip to content

Commit f6b4b02

Browse files
authored
ReconnectPeriod cleanup (#11415)
1 parent e2103f9 commit f6b4b02

File tree

5 files changed

+16
-8
lines changed

5 files changed

+16
-8
lines changed

ydb/core/fq/libs/config/protos/common.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ message TCommonConfig {
3131
bool DisableSslForGenericDataSources = 15;
3232
bool ShowQueryTimeline = 16;
3333
uint64 MaxQueryTimelineSize = 17; // default: 200KB
34+
string PqReconnectPeriod = 18; // default: disabled
3435
}

ydb/core/fq/libs/init/init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ void Init(
224224
appData->FunctionRegistry
225225
);
226226
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, NYql::CreatePqNativeGateway(std::move(pqServices)),
227-
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
227+
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), protoConfig.GetCommon().GetPqReconnectPeriod());
228228

229229
s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
230230
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
109109
if (Settings.HasOffset()) {
110110
NextMessageOffset = Settings.GetOffset();
111111
}
112+
Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod));
112113
}
113114
NFq::NRowDispatcherProto::TEvStartSession Settings;
114115
NActors::TActorId ReadActorId;
@@ -119,6 +120,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
119120
TMaybe<ui64> NextMessageOffset;
120121
ui64 LastSendedNextMessageOffset = 0;
121122
TVector<ui64> FieldsIds;
123+
TDuration ReconnectPeriod;
122124
};
123125

124126
struct TTopicEventProcessor {
@@ -239,6 +241,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
239241
hFunc(NFq::TEvPrivate::TEvStatus, Handle);
240242
hFunc(NFq::TEvPrivate::TEvDataFiltered, Handle);
241243
hFunc(NFq::TEvPrivate::TEvSendStatistic, Handle);
244+
hFunc(NFq::TEvPrivate::TEvReconnectSession, Handle);
242245
hFunc(TEvRowDispatcher::TEvGetNextBatch, Handle);
243246
hFunc(NFq::TEvRowDispatcher::TEvStartSession, Handle);
244247
sFunc(NFq::TEvPrivate::TEvStartParsing, DoParsing);
@@ -387,15 +390,15 @@ void TTopicSession::CreateTopicSession() {
387390
SubscribeOnNextEvent();
388391
}
389392

390-
if (!InflightReconnect) {
393+
if (!InflightReconnect && Clients) {
391394
// Use any sourceParams.
392-
const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second.Settings.GetSource();
393-
Y_UNUSED(TDuration::TryParse(sourceParams.GetReconnectPeriod(), ReconnectPeriod));
395+
ReconnectPeriod = Clients.begin()->second.ReconnectPeriod;
394396
if (ReconnectPeriod != TDuration::Zero()) {
397+
LOG_ROW_DISPATCHER_INFO("ReconnectPeriod " << ReconnectPeriod.ToString());
395398
Metrics.ReconnectRate->Inc();
396399
Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession());
400+
InflightReconnect = true;
397401
}
398-
InflightReconnect = true;
399402
}
400403
}
401404

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -655,14 +655,18 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
655655
return {actor, actor};
656656
}
657657

658-
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters) {
658+
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, const TString& reconnectPeriod) {
659659
factory.RegisterSource<NPq::NProto::TDqPqTopicSource>("PqSource",
660-
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway](
660+
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, reconnectPeriod](
661661
NPq::NProto::TDqPqTopicSource&& settings,
662662
IDqAsyncIoFactory::TSourceArguments&& args)
663663
{
664664
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER));
665665

666+
if (reconnectPeriod) {
667+
settings.SetReconnectPeriod(reconnectPeriod);
668+
}
669+
666670
if (!settings.GetSharedReading()) {
667671
return CreateDqPqReadActor(
668672
std::move(settings),

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,6 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
4040
i64 bufferSize = PQReadDefaultFreeSpace
4141
);
4242

43-
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>());
43+
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), const TString& reconnectPeriod = {});
4444

4545
} // namespace NYql::NDq

0 commit comments

Comments
 (0)