Skip to content

Commit e8e7d78

Browse files
authored
Reconnect period (#11343)
1 parent f6cd127 commit e8e7d78

File tree

8 files changed

+64
-1
lines changed

8 files changed

+64
-1
lines changed

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ struct TTopicSessionMetrics {
3333
InFlyAsyncInputData = SubGroup->GetCounter("InFlyAsyncInputData");
3434
RowsRead = SubGroup->GetCounter("RowsRead", true);
3535
InFlySubscribe = SubGroup->GetCounter("InFlySubscribe");
36+
ReconnectRate = SubGroup->GetCounter("ReconnectRate", true);
3637
}
3738

3839
~TTopicSessionMetrics() {
@@ -43,6 +44,7 @@ struct TTopicSessionMetrics {
4344
::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
4445
::NMonitoring::TDynamicCounters::TCounterPtr RowsRead;
4546
::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
47+
::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate;
4648
};
4749

4850
struct TEvPrivate {
@@ -56,6 +58,7 @@ struct TEvPrivate {
5658
EvDataFiltered,
5759
EvSendStatistic,
5860
EvStartParsing,
61+
EvReconnectSession,
5962
EvEnd
6063
};
6164
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
@@ -66,6 +69,7 @@ struct TEvPrivate {
6669
struct TEvSendStatistic : public NActors::TEventLocal<TEvSendStatistic, EvSendStatistic> {};
6770
struct TEvStatus : public NActors::TEventLocal<TEvStatus, EvStatus> {};
6871
struct TEvStartParsing : public NActors::TEventLocal<TEvStartParsing, EvStartParsing> {};
72+
struct TEvReconnectSession : public NActors::TEventLocal<TEvReconnectSession, EvReconnectSession> {};
6973

7074
struct TEvDataFiltered : public NActors::TEventLocal<TEvDataFiltered, EvDataFiltered> {
7175
explicit TEvDataFiltered(ui64 offset)
@@ -136,6 +140,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
136140
TParserInputType InputType;
137141
};
138142

143+
bool InflightReconnect = false;
144+
TDuration ReconnectPeriod;
139145
const TString TopicPath;
140146
const TString Endpoint;
141147
const TString Database;
@@ -205,6 +211,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
205211

206212
void Handle(NFq::TEvPrivate::TEvPqEventsReady::TPtr&);
207213
void Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&);
214+
void Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&);
208215
void Handle(NFq::TEvPrivate::TEvDataAfterFilteration::TPtr&);
209216
void Handle(NFq::TEvPrivate::TEvStatus::TPtr&);
210217
void Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr&);
@@ -375,6 +382,17 @@ void TTopicSession::CreateTopicSession() {
375382
ReadSession = GetTopicClient(sourceParams).CreateReadSession(GetReadSessionSettings(sourceParams));
376383
SubscribeOnNextEvent();
377384
}
385+
386+
if (!InflightReconnect) {
387+
// Use any sourceParams.
388+
const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second.Settings.GetSource();
389+
Y_UNUSED(TDuration::TryParse(sourceParams.GetReconnectPeriod(), ReconnectPeriod));
390+
if (ReconnectPeriod != TDuration::Zero()) {
391+
Metrics.ReconnectRate->Inc();
392+
Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession());
393+
}
394+
InflightReconnect = true;
395+
}
378396
}
379397

380398
void TTopicSession::Handle(NFq::TEvPrivate::TEvPqEventsReady::TPtr&) {
@@ -432,6 +450,17 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvStatus::TPtr&) {
432450
}
433451
}
434452

453+
void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) {
454+
Metrics.ReconnectRate->Inc();
455+
TInstant minTime = GetMinStartingMessageTimestamp();
456+
LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, Path " << TopicPath
457+
<< ", StartingMessageTimestamp " << minTime
458+
<< ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer());
459+
StopReadSession();
460+
CreateTopicSession();
461+
Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession());
462+
}
463+
435464
void TTopicSession::Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr& ev) {
436465
LOG_ROW_DISPATCHER_TRACE("TEvDataFiltered, last offset " << ev->Get()->Offset);
437466
for (auto& [actorId, info] : Clients) {

ydb/library/yql/providers/common/proto/gateways_config.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,8 @@ message TPqClusterConfig {
326326
optional bool AddBearerToToken = 11; // whether to use prefix "Bearer " in token
327327
optional string DatabaseId = 12;
328328
repeated TAttr Settings = 100;
329-
optional bool SharedReading = 101;
329+
optional bool SharedReading = 101;
330+
optional string ReconnectPeriod = 102; // disabled by default, example of a parameter: 5m
330331
}
331332

332333
message TPqGatewayConfig {

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ struct TEvPrivate {
7474
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
7575

7676
EvSourceDataReady = EvBegin,
77+
EvReconnectSession,
7778

7879
EvEnd
7980
};
@@ -83,6 +84,7 @@ struct TEvPrivate {
8384
// Events
8485

8586
struct TEvSourceDataReady : public TEventLocal<TEvSourceDataReady, EvSourceDataReady> {};
87+
struct TEvReconnectSession : public TEventLocal<TEvReconnectSession, EvReconnectSession> {};
8688
};
8789

8890
} // namespace
@@ -98,6 +100,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
98100
InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData");
99101
InFlySubscribe = task->GetCounter("InFlySubscribe");
100102
AsyncInputDataRate = task->GetCounter("AsyncInputDataRate", true);
103+
ReconnectRate = task->GetCounter("ReconnectRate", true);
101104
}
102105

103106
~TMetrics() {
@@ -110,6 +113,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
110113
::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
111114
::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
112115
::NMonitoring::TDynamicCounters::TCounterPtr AsyncInputDataRate;
116+
::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate;
113117
};
114118

115119
public:
@@ -139,6 +143,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
139143
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
140144
, PqGateway(pqGateway)
141145
{
146+
Y_UNUSED(TDuration::TryParse(SourceParams.GetReconnectPeriod(), ReconnectPeriod));
142147
MetadataFields.reserve(SourceParams.MetadataFieldsSize());
143148
TPqMetaExtractor fieldsExtractor;
144149
for (const auto& fieldName : SourceParams.GetMetadataFields()) {
@@ -209,6 +214,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
209214
private:
210215
STRICT_STFUNC(StateFunc,
211216
hFunc(TEvPrivate::TEvSourceDataReady, Handle);
217+
hFunc(TEvPrivate::TEvReconnectSession, Handle);
212218
)
213219

214220
void Handle(TEvPrivate::TEvSourceDataReady::TPtr& ev) {
@@ -222,6 +228,18 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
222228
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
223229
}
224230

231+
void Handle(TEvPrivate::TEvReconnectSession::TPtr&) {
232+
SRC_LOG_D("SessionId: " << GetSessionId() << ", Reconnect epoch: " << Metrics.ReconnectRate->Val());
233+
Metrics.ReconnectRate->Inc();
234+
if (ReadSession) {
235+
ReadSession->Close(TDuration::Zero());
236+
ReadSession.reset();
237+
ReadyBuffer = std::queue<TReadyBatch>{}; // clear read buffer
238+
}
239+
240+
Schedule(ReconnectPeriod, new TEvPrivate::TEvReconnectSession());
241+
}
242+
225243
// IActor & IDqComputeActorAsyncInput
226244
void PassAway() override { // Is called from Compute Actor
227245
std::queue<TReadyBatch> empty;
@@ -259,6 +277,12 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
259277
const auto now = TInstant::Now();
260278
MaybeScheduleNextIdleCheck(now);
261279

280+
if (!InflightReconnect && ReconnectPeriod != TDuration::Zero()) {
281+
Metrics.ReconnectRate->Inc();
282+
Schedule(ReconnectPeriod, new TEvPrivate::TEvSourceDataReady());
283+
InflightReconnect = true;
284+
}
285+
262286
i64 usedSpace = 0;
263287
if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) {
264288
return usedSpace;
@@ -565,6 +589,8 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
565589
};
566590

567591
private:
592+
bool InflightReconnect = false;
593+
TDuration ReconnectPeriod;
568594
TMetrics Metrics;
569595
const i64 BufferSize;
570596
const THolderFactory& HolderFactory;

ydb/library/yql/providers/pq/common/yql_names.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ constexpr TStringBuf WatermarksEnableSetting = "WatermarksEnable";
1515
constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs";
1616
constexpr TStringBuf WatermarksLateArrivalDelayUsSetting = "WatermarksLateArrivalDelayUs";
1717
constexpr TStringBuf WatermarksIdlePartitionsSetting = "WatermarksIdlePartitions";
18+
constexpr TStringBuf ReconnectPeriod = "ReconnectPeriod";
1819

1920
} // namespace NYql

ydb/library/yql/providers/pq/proto/dq_io.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ message TDqPqTopicSource {
3737
repeated string ColumnTypes = 13;
3838
string Predicate = 14;
3939
bool SharedReading = 15;
40+
string ReconnectPeriod = 16; // disabled by default, example of a parameter: 5m
4041
}
4142

4243
message TDqPqTopicSink {

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ class TPqDqIntegration: public TDqIntegrationBase {
221221
srcDesc.SetEndpoint(TString(Value(setting)));
222222
} else if (name == SharedReading) {
223223
sharedReading = FromString<bool>(Value(setting));
224+
} else if (name == ReconnectPeriod) {
225+
srcDesc.SetReconnectPeriod(TString(Value(setting)));
224226
} else if (name == Format) {
225227
format = TString(Value(setting));
226228
} else if (name == UseSslSetting) {
@@ -338,6 +340,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
338340

339341
Add(props, EndpointSetting, clusterConfiguration->Endpoint, pos, ctx);
340342
Add(props, SharedReading, ToString(clusterConfiguration->SharedReading), pos, ctx);
343+
Add(props, ReconnectPeriod, ToString(clusterConfiguration->ReconnectPeriod), pos, ctx);
341344
Add(props, Format, format, pos, ctx);
342345

343346

ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ void TPqConfiguration::Init(
4343
clusterSettings.UseSsl = cluster.GetUseSsl();
4444
clusterSettings.AddBearerToToken = cluster.GetAddBearerToToken();
4545
clusterSettings.SharedReading = cluster.GetSharedReading();
46+
clusterSettings.ReconnectPeriod = cluster.GetReconnectPeriod();
4647

4748
const TString authToken = typeCtx->Credentials->FindCredentialContent("cluster:default_" + clusterSettings.ClusterName, "default_pq", cluster.GetToken());
4849
clusterSettings.AuthToken = authToken;

ydb/library/yql/providers/pq/provider/yql_pq_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ struct TPqClusterConfigurationSettings {
3030
TString AuthToken;
3131
bool AddBearerToToken = false;
3232
bool SharedReading = false;
33+
TString ReconnectPeriod;
3334
};
3435

3536
struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher {

0 commit comments

Comments
 (0)