Skip to content

Commit 3fde25c

Browse files
authored
YQ-3871 Restart session on query stopping (if query read historical data) (#12135)
1 parent 4c4b273 commit 3fde25c

File tree

8 files changed

+494
-130
lines changed

8 files changed

+494
-130
lines changed

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
310310
void SendStatisticToRowDispatcher();
311311
void SendSessionError(TActorId readActorId, TStatus status);
312312
bool CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev);
313+
void RestartSessionIfOldestClient(const TClientsInfo& info);
313314

314315
private:
315316

@@ -750,10 +751,12 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
750751

751752
auto it = Clients.find(ev->Sender);
752753
if (it == Clients.end()) {
753-
LOG_ROW_DISPATCHER_DEBUG("Wrong ClientSettings");
754+
LOG_ROW_DISPATCHER_WARN("Ignore TEvStopSession from " << ev->Sender << ", no client");
754755
return;
755756
}
756757
auto& info = *it->second;
758+
RestartSessionIfOldestClient(info);
759+
757760
UnreadBytes -= info.UnreadBytes;
758761
Metrics.UnreadBytes->Sub(info.UnreadBytes);
759762
if (const auto formatIt = FormatHandlers.find(info.HandlerSettings); formatIt != FormatHandlers.end()) {
@@ -769,6 +772,41 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
769772
SubscribeOnNextEvent();
770773
}
771774

775+
void TTopicSession::RestartSessionIfOldestClient(const TClientsInfo& info) {
776+
// if we read historical data (because of this client), then we restart the session.
777+
778+
if (!ReadSession || !info.NextMessageOffset) {
779+
return;
780+
}
781+
TMaybe<ui64> minMessageOffset;
782+
for (auto& [readActorId, clientPtr] : Clients) {
783+
if (info.ReadActorId == readActorId || !clientPtr->NextMessageOffset) {
784+
continue;
785+
}
786+
if (!minMessageOffset) {
787+
minMessageOffset = clientPtr->NextMessageOffset;
788+
continue;
789+
}
790+
minMessageOffset = std::min(minMessageOffset, clientPtr->NextMessageOffset);
791+
}
792+
if (!minMessageOffset) {
793+
return;
794+
}
795+
796+
if (info.NextMessageOffset >= minMessageOffset) {
797+
return;
798+
}
799+
LOG_ROW_DISPATCHER_INFO("Client (on StopSession) has less offset (" << info.NextMessageOffset << ") than others clients (" << minMessageOffset << "), stop (restart) topic session");
800+
Metrics.RestartSessionByOffsets->Inc();
801+
++RestartSessionByOffsets;
802+
info.RestartSessionByOffsetsByQuery->Inc();
803+
StopReadSession();
804+
805+
if (!ReadSession) {
806+
Schedule(TDuration::Seconds(Config.GetTimeoutBeforeStartSessionSec()), new NFq::TEvPrivate::TEvCreateSession());
807+
}
808+
}
809+
772810
void TTopicSession::FatalError(TStatus status) {
773811
LOG_ROW_DISPATCHER_ERROR("FatalError: " << status.GetErrorMessage());
774812

0 commit comments

Comments
 (0)