Skip to content

Commit df61055

Browse files
authored
YQ-3850 Shared reading: fix interconnect subscribing / to stable (#11522)
1 parent 2e37a1c commit df61055

File tree

2 files changed

+111
-31
lines changed

2 files changed

+111
-31
lines changed

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 105 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,19 @@ struct TEvPrivate {
5454
EvCoordinatorPing = EvBegin + 20,
5555
EvUpdateMetrics,
5656
EvPrintStateToLog,
57+
EvTryConnect,
5758
EvEnd
5859
};
5960

6061
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
6162
struct TEvCoordinatorPing : NActors::TEventLocal<TEvCoordinatorPing, EvCoordinatorPing> {};
6263
struct TEvUpdateMetrics : public NActors::TEventLocal<TEvUpdateMetrics, EvUpdateMetrics> {};
6364
struct TEvPrintStateToLog : public NActors::TEventLocal<TEvPrintStateToLog, EvPrintStateToLog> {};
65+
struct TEvTryConnect : public NActors::TEventLocal<TEvTryConnect, EvTryConnect> {
66+
TEvTryConnect(ui32 nodeId = 0)
67+
: NodeId(nodeId) {}
68+
ui32 NodeId = 0;
69+
};
6470
};
6571

6672
struct TQueryStat {
@@ -119,6 +125,92 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
119125
}
120126
};
121127

128+
struct TNodesTracker{
129+
class TRetryState {
130+
public:
131+
TDuration GetNextDelay() {
132+
constexpr TDuration MaxDelay = TDuration::Seconds(10);
133+
constexpr TDuration MinDelay = TDuration::MilliSeconds(100); // from second retry
134+
TDuration ret = Delay; // The first delay is zero
135+
Delay = ClampVal(Delay * 2, MinDelay, MaxDelay);
136+
return ret ? RandomizeDelay(ret) : ret;
137+
}
138+
private:
139+
static TDuration RandomizeDelay(TDuration baseDelay) {
140+
const TDuration::TValue half = baseDelay.GetValue() / 2;
141+
return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half));
142+
}
143+
private:
144+
TDuration Delay; // The first time retry will be done instantly.
145+
};
146+
147+
struct TNodeState {
148+
bool Connected = false;
149+
bool RetryScheduled = false;
150+
TMaybe<TRetryState> RetryState;
151+
};
152+
public:
153+
void Init(const NActors::TActorId& selfId) {
154+
SelfId = selfId;
155+
}
156+
157+
void AddNode(ui32 nodeId) {
158+
if (Nodes.contains(nodeId)) {
159+
return;
160+
}
161+
HandleNodeDisconnected(nodeId);
162+
}
163+
164+
void TryConnect(ui32 nodeId) {
165+
auto& state = Nodes[nodeId];
166+
state.RetryScheduled = false;
167+
if (state.Connected) {
168+
return;
169+
}
170+
auto connectEvent = MakeHolder<NActors::TEvInterconnect::TEvConnectNode>();
171+
auto proxyId = NActors::TActivationContext::InterconnectProxy(nodeId);
172+
NActors::TActivationContext::Send(
173+
new NActors::IEventHandle(proxyId, SelfId, connectEvent.Release(), 0, 0));
174+
}
175+
176+
bool GetNodeConnected(ui32 nodeId) {
177+
return Nodes[nodeId].Connected;
178+
}
179+
180+
void HandleNodeConnected(ui32 nodeId) {
181+
auto& state = Nodes[nodeId];
182+
state.Connected = true;
183+
state.RetryState = Nothing();
184+
}
185+
186+
void HandleNodeDisconnected(ui32 nodeId) {
187+
auto& state = Nodes[nodeId];
188+
state.Connected = false;
189+
if (state.RetryScheduled) {
190+
return;
191+
}
192+
state.RetryScheduled = true;
193+
if (!state.RetryState) {
194+
state.RetryState.ConstructInPlace();
195+
}
196+
auto ev = MakeHolder<TEvPrivate::TEvTryConnect>(nodeId);
197+
auto delay = state.RetryState->GetNextDelay();
198+
NActors::TActivationContext::Schedule(delay, new NActors::IEventHandle(SelfId, SelfId, ev.Release()));
199+
}
200+
201+
void PrintInternalState(TStringStream& stream) const {
202+
stream << "Nodes states: \n";
203+
for (const auto& [nodeId, state] : Nodes) {
204+
stream << " id " << nodeId << " connected " << state.Connected << " retry scheduled " << state.RetryScheduled << "\n";
205+
}
206+
}
207+
208+
private:
209+
TMap<ui32, TNodeState> Nodes;
210+
NActors::TActorId SelfId;
211+
TString LogPrefix = "RowDispatcher: ";
212+
};
213+
122214

123215
NConfig::TRowDispatcherConfig Config;
124216
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
@@ -134,8 +226,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
134226
const ::NMonitoring::TDynamicCounterPtr Counters;
135227
TRowDispatcherMetrics Metrics;
136228
NYql::IPqGateway::TPtr PqGateway;
137-
THashSet<TActorId> InterconnectSessions;
138-
TMap<ui32, bool> NodeConnected;
229+
TNodesTracker NodesTracker;
139230

140231
struct ConsumerCounters {
141232
ui64 NewDataArrived = 0;
@@ -222,15 +313,14 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
222313
void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev);
223314

224315
void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev);
225-
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&);
316+
void Handle(const TEvPrivate::TEvTryConnect::TPtr&);
226317
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
227318
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
228319
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
229320
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
230321
void Handle(const NMon::TEvHttpInfo::TPtr&);
231322

232323
void DeleteConsumer(const ConsumerSessionKey& key);
233-
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
234324
void UpdateMetrics();
235325
TString GetInternalState();
236326

@@ -250,7 +340,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
250340
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
251341
hFunc(NFq::TEvRowDispatcher::TEvStatus, Handle);
252342
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
253-
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
343+
hFunc(TEvPrivate::TEvTryConnect, Handle);
254344
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
255345
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
256346
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
@@ -300,6 +390,7 @@ void TRowDispatcher::Bootstrap() {
300390
mon->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false,
301391
TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
302392
}
393+
NodesTracker.Init(SelfId());
303394
}
304395

305396
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
@@ -317,15 +408,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr&
317408

318409
void TRowDispatcher::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
319410
LOG_ROW_DISPATCHER_DEBUG("EvNodeConnected, node id " << ev->Get()->NodeId);
320-
NodeConnected[ev->Get()->NodeId] = true;
411+
NodesTracker.HandleNodeConnected(ev->Get()->NodeId);
321412
for (auto& [actorId, consumer] : Consumers) {
322413
consumer->EventsQueue.HandleNodeConnected(ev->Get()->NodeId);
323414
}
324415
}
325416

326417
void TRowDispatcher::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
327418
LOG_ROW_DISPATCHER_DEBUG("TEvNodeDisconnected, node id " << ev->Get()->NodeId);
328-
NodeConnected[ev->Get()->NodeId] = false;
419+
NodesTracker.HandleNodeDisconnected(ev->Get()->NodeId);
329420
for (auto& [actorId, consumer] : Consumers) {
330421
consumer->EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId);
331422
}
@@ -353,7 +444,7 @@ void TRowDispatcher::Handle(NActors::TEvents::TEvPong::TPtr&) {
353444

354445
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe::TPtr& ev) {
355446
LOG_ROW_DISPATCHER_DEBUG("TEvCoordinatorChangesSubscribe from " << ev->Sender);
356-
UpdateInterconnectSessions(ev->InterconnectSession);
447+
NodesTracker.AddNode(ev->Sender.NodeId());
357448
CoordinatorChangedSubscribers.insert(ev->Sender);
358449
if (!CoordinatorActorId) {
359450
return;
@@ -387,6 +478,7 @@ void TRowDispatcher::UpdateMetrics() {
387478

388479
TString TRowDispatcher::GetInternalState() {
389480
TStringStream str;
481+
NodesTracker.PrintInternalState(str);
390482
str << "Statistics:\n";
391483
for (auto& [key, sessionsInfo] : TopicSessions) {
392484
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId;
@@ -410,7 +502,7 @@ TString TRowDispatcher::GetInternalState() {
410502
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
411503
LOG_ROW_DISPATCHER_DEBUG("TEvStartSession from " << ev->Sender << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
412504
" partitionId " << ev->Get()->Record.GetPartitionId());
413-
UpdateInterconnectSessions(ev->InterconnectSession);
505+
NodesTracker.AddNode(ev->Sender.NodeId());
414506
TMaybe<ui64> readOffset;
415507
if (ev->Get()->Record.HasOffset()) {
416508
readOffset = ev->Get()->Record.GetOffset();
@@ -430,7 +522,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
430522
LOG_ROW_DISPATCHER_DEBUG("Topic session count " << topicSessionInfo.Sessions.size());
431523
Y_ENSURE(topicSessionInfo.Sessions.size() <= 1);
432524

433-
auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodeConnected[ev->Sender.NodeId()]);
525+
auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodesTracker.GetNodeConnected(ev->Sender.NodeId()));
434526
Consumers[key] = consumerInfo;
435527
ConsumersByEventQueueId[consumerInfo->EventQueueId] = consumerInfo;
436528
if (!consumerInfo->EventsQueue.OnEventReceived(ev)) {
@@ -575,14 +667,9 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClo
575667
}
576668
}
577669

578-
void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
579-
LOG_ROW_DISPATCHER_TRACE("TEvRetry " << ev->Get()->EventQueueId);
580-
auto it = ConsumersByEventQueueId.find(ev->Get()->EventQueueId);
581-
if (it == ConsumersByEventQueueId.end()) {
582-
LOG_ROW_DISPATCHER_WARN("No consumer with EventQueueId = " << ev->Get()->EventQueueId);
583-
return;
584-
}
585-
it->second->EventsQueue.Retry();
670+
void TRowDispatcher::Handle(const TEvPrivate::TEvTryConnect::TPtr& ev) {
671+
LOG_ROW_DISPATCHER_TRACE("TEvTryConnect to node id " << ev->Get()->NodeId);
672+
NodesTracker.TryConnect(ev->Get()->NodeId);
586673
}
587674

588675
void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) {
@@ -705,18 +792,6 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev
705792
}
706793
}
707794

708-
void TRowDispatcher::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) {
709-
if (!interconnectSession) {
710-
return;
711-
}
712-
auto sessionsIt = InterconnectSessions.find(interconnectSession);
713-
if (sessionsIt != InterconnectSessions.end()) {
714-
return;
715-
}
716-
Send(interconnectSession, new NActors::TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery);
717-
InterconnectSessions.insert(interconnectSession);
718-
}
719-
720795
} // namespace
721796

722797
////////////////////////////////////////////////////////////////////////////////

ydb/library/yql/dq/actors/common/retry_queue.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,12 @@ TDuration TRetryEventsQueue::TRetryState::RandomizeDelay(TDuration baseDelay) {
168168
}
169169

170170
void TRetryEventsQueue::PrintInternalState(TStringStream& stream) const {
171-
stream << "id " << EventQueueId << ", NextSeqNo "
171+
stream << "id " << EventQueueId;
172+
if (LocalRecipient) {
173+
stream << ", LocalRecipient\n";
174+
return;
175+
}
176+
stream << ", NextSeqNo "
172177
<< NextSeqNo << ", MyConfSeqNo " << MyConfirmedSeqNo << ", SeqNos " << ReceivedEventsSeqNos.size() << ", events size " << Events.size() << ", connected " << Connected << "\n";
173178
}
174179

0 commit comments

Comments
 (0)