Skip to content

Commit a4b9fee

Browse files
authored
YQ-3779 Remove FlagSubscribeOnSession (use TEvSubscribe) / to stable (#11088)
1 parent e08e74c commit a4b9fee

File tree

2 files changed

+37
-6
lines changed

2 files changed

+37
-6
lines changed

ydb/core/fq/libs/row_dispatcher/coordinator.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
8181
THashMap<TPartitionKey, TActorId, TPartitionKeyHash> PartitionLocations;
8282
TCoordinatorMetrics Metrics;
8383
ui64 LocationRandomCounter = 0;
84+
THashSet<TActorId> InterconnectSessions;
8485

8586
public:
8687
TActorCoordinator(
@@ -116,6 +117,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
116117
void AddRowDispatcher(NActors::TActorId actorId, bool isLocal);
117118
void PrintInternalState();
118119
NActors::TActorId GetAndUpdateLocation(const TPartitionKey& key);
120+
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
119121
};
120122

121123
TActorCoordinator::TActorCoordinator(
@@ -165,11 +167,24 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal
165167
RowDispatchers.emplace(actorId, RowDispatcherInfo{true, isLocal});
166168
}
167169

170+
void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) {
171+
if (!interconnectSession) {
172+
return;
173+
}
174+
auto sessionsIt = InterconnectSessions.find(interconnectSession);
175+
if (sessionsIt != InterconnectSessions.end()) {
176+
return;
177+
}
178+
Send(interconnectSession, new NActors::TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery);
179+
InterconnectSessions.insert(interconnectSession);
180+
}
181+
168182
void TActorCoordinator::Handle(NActors::TEvents::TEvPing::TPtr& ev) {
169183
LOG_ROW_DISPATCHER_TRACE("TEvPing received, " << ev->Sender);
184+
UpdateInterconnectSessions(ev->InterconnectSession);
170185
AddRowDispatcher(ev->Sender, false);
171186
LOG_ROW_DISPATCHER_TRACE("Send TEvPong to " << ev->Sender);
172-
Send(ev->Sender, new NActors::TEvents::TEvPong(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
187+
Send(ev->Sender, new NActors::TEvents::TEvPong(), IEventHandle::FlagTrackDelivery);
173188
}
174189

175190
void TActorCoordinator::PrintInternalState() {
@@ -247,6 +262,7 @@ NActors::TActorId TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& k
247262

248263
void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) {
249264
const auto source = ev->Get()->Record.GetSource();
265+
UpdateInterconnectSessions(ev->InterconnectSession);
250266

251267
TStringStream str;
252268
str << "TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: ";
@@ -281,7 +297,7 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
281297
}
282298

283299
LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << ev->Sender);
284-
Send(ev->Sender, response.release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ev->Cookie);
300+
Send(ev->Sender, response.release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
285301
PrintInternalState();
286302
}
287303

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
127127
const ::NMonitoring::TDynamicCounterPtr Counters;
128128
TRowDispatcherMetrics Metrics;
129129
NYql::IPqGateway::TPtr PqGateway;
130+
THashSet<TActorId> InterconnectSessions;
130131

131132
struct ConsumerCounters {
132133
ui64 NewDataArrived = 0;
@@ -216,6 +217,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
216217
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
217218

218219
void DeleteConsumer(const ConsumerSessionKey& key);
220+
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
219221
void UpdateMetrics();
220222

221223
STRICT_STFUNC(
@@ -279,12 +281,12 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr&
279281
LOG_ROW_DISPATCHER_DEBUG("Coordinator changed, old leader " << CoordinatorActorId << ", new " << ev->Get()->CoordinatorActorId);
280282

281283
CoordinatorActorId = ev->Get()->CoordinatorActorId;
282-
Send(*CoordinatorActorId, new NActors::TEvents::TEvPing(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
284+
Send(*CoordinatorActorId, new NActors::TEvents::TEvPing(), IEventHandle::FlagTrackDelivery);
283285
for (auto actorId : CoordinatorChangedSubscribers) {
284286
Send(
285287
actorId,
286288
new NFq::TEvRowDispatcher::TEvCoordinatorChanged(ev->Get()->CoordinatorActorId),
287-
IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
289+
IEventHandle::FlagTrackDelivery);
288290
}
289291
}
290292

@@ -324,11 +326,12 @@ void TRowDispatcher::Handle(NActors::TEvents::TEvPong::TPtr&) {
324326

325327
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe::TPtr& ev) {
326328
LOG_ROW_DISPATCHER_DEBUG("TEvCoordinatorChangesSubscribe from " << ev->Sender);
329+
UpdateInterconnectSessions(ev->InterconnectSession);
327330
CoordinatorChangedSubscribers.insert(ev->Sender);
328331
if (!CoordinatorActorId) {
329332
return;
330333
}
331-
Send(ev->Sender, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(*CoordinatorActorId), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
334+
Send(ev->Sender, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(*CoordinatorActorId), IEventHandle::FlagTrackDelivery);
332335
}
333336

334337
void TRowDispatcher::UpdateMetrics() {
@@ -372,7 +375,7 @@ void TRowDispatcher::UpdateMetrics() {
372375
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
373376
LOG_ROW_DISPATCHER_DEBUG("TEvStartSession from " << ev->Sender << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
374377
" partitionId " << ev->Get()->Record.GetPartitionId());
375-
378+
UpdateInterconnectSessions(ev->InterconnectSession);
376379
TMaybe<ui64> readOffset;
377380
if (ev->Get()->Record.HasOffset()) {
378381
readOffset = ev->Get()->Record.GetOffset();
@@ -634,6 +637,18 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev
634637
}
635638
}
636639

640+
void TRowDispatcher::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) {
641+
if (!interconnectSession) {
642+
return;
643+
}
644+
auto sessionsIt = InterconnectSessions.find(interconnectSession);
645+
if (sessionsIt != InterconnectSessions.end()) {
646+
return;
647+
}
648+
Send(interconnectSession, new NActors::TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery);
649+
InterconnectSessions.insert(interconnectSession);
650+
}
651+
637652
} // namespace
638653

639654
////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)