Skip to content

Commit 409b154

Browse files
authored
Fix heartbeat emitter (#9104)
1 parent f628a22 commit 409b154

File tree

2 files changed

+71
-32
lines changed

2 files changed

+71
-32
lines changed

ydb/core/persqueue/sourceid.cpp

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -530,55 +530,60 @@ void THeartbeatEmitter::Process(const TString& sourceId, THeartbeat&& heartbeat)
530530

531531
TMaybe<THeartbeat> THeartbeatEmitter::CanEmit() const {
532532
if (Storage.ExplicitSourceIds.size() != (Storage.SourceIdsWithHeartbeat.size() + NewSourceIdsWithHeartbeat.size())) {
533+
// there is no quorum
533534
return Nothing();
534535
}
535536

536537
if (SourceIdsByHeartbeat.empty()) {
538+
// there is no new heartbeats, nothing to emit
537539
return Nothing();
538540
}
539541

540-
if (!NewSourceIdsWithHeartbeat.empty()) { // just got quorum
541-
if (!Storage.SourceIdsByHeartbeat.empty() && Storage.SourceIdsByHeartbeat.begin()->first < SourceIdsByHeartbeat.begin()->first) {
542+
if (Storage.SourceIdsByHeartbeat.empty()) {
543+
// got quorum, memory state
544+
return GetFromDiff(SourceIdsByHeartbeat.begin());
545+
}
546+
547+
if (!NewSourceIdsWithHeartbeat.empty()) {
548+
// got quorum, mixed state
549+
if (Storage.SourceIdsByHeartbeat.begin()->first < SourceIdsByHeartbeat.begin()->first) {
542550
return GetFromStorage(Storage.SourceIdsByHeartbeat.begin());
543551
} else {
544552
return GetFromDiff(SourceIdsByHeartbeat.begin());
545553
}
546-
} else if (SourceIdsByHeartbeat.begin()->first > Storage.SourceIdsByHeartbeat.begin()->first) {
547-
auto storage = Storage.SourceIdsByHeartbeat.begin();
548-
auto diff = SourceIdsByHeartbeat.begin();
549-
550-
TMaybe<TRowVersion> newVersion;
551-
while (storage != Storage.SourceIdsByHeartbeat.end()) {
552-
const auto& [version, sourceIds] = *storage;
553-
554-
auto rest = sourceIds.size();
555-
for (const auto& sourceId : sourceIds) {
556-
auto it = Heartbeats.find(sourceId);
557-
if (it != Heartbeats.end() && it->second.Version > version && version <= diff->first) {
558-
--rest;
559-
} else {
560-
break;
561-
}
562-
}
554+
}
563555

564-
if (!rest) {
565-
if (++storage != Storage.SourceIdsByHeartbeat.end()) {
566-
newVersion = storage->first;
567-
} else {
568-
newVersion = diff->first;
569-
}
556+
TMaybe<TRowVersion> emitVersion;
557+
558+
for (auto it = Storage.SourceIdsByHeartbeat.begin(), end = Storage.SourceIdsByHeartbeat.end(); it != end; ++it) {
559+
const auto& [version, sourceIds] = *it;
560+
auto rest = sourceIds.size();
561+
562+
for (const auto& sourceId : sourceIds) {
563+
if (Heartbeats.contains(sourceId) && Heartbeats.at(sourceId).Version > version) {
564+
--rest;
570565
} else {
571566
break;
572567
}
573568
}
574569

575-
if (newVersion) {
576-
storage = Storage.SourceIdsByHeartbeat.find(*newVersion);
577-
if (storage != Storage.SourceIdsByHeartbeat.end()) {
578-
return GetFromStorage(storage);
579-
} else {
580-
return GetFromDiff(diff);
581-
}
570+
if (rest) {
571+
break;
572+
}
573+
574+
if (auto next = std::next(it); next != end && next->first < SourceIdsByHeartbeat.begin()->first) {
575+
emitVersion = next->first;
576+
} else {
577+
emitVersion = SourceIdsByHeartbeat.begin()->first;
578+
break;
579+
}
580+
}
581+
582+
if (emitVersion) {
583+
if (auto it = Storage.SourceIdsByHeartbeat.find(*emitVersion); it != Storage.SourceIdsByHeartbeat.end()) {
584+
return GetFromStorage(it);
585+
} else {
586+
return GetFromDiff(SourceIdsByHeartbeat.begin());
582587
}
583588
}
584589

ydb/core/persqueue/ut/sourceid_ut.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,40 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
458458
emitter.Process(TestSourceId(2), MakeHeartbeat(4));
459459
UNIT_ASSERT(!emitter.CanEmit().Defined());
460460
}
461+
462+
// gaps
463+
TSourceIdStorage storage2;
464+
storage2.RegisterSourceId(TestSourceId(1), MakeExplicitSourceIdInfo(++offset, MakeHeartbeat(1)));
465+
storage2.RegisterSourceId(TestSourceId(2), MakeExplicitSourceIdInfo(++offset, MakeHeartbeat(3)));
466+
{
467+
THeartbeatEmitter emitter(storage2);
468+
UNIT_ASSERT(!emitter.CanEmit().Defined());
469+
470+
emitter.Process(TestSourceId(1), MakeHeartbeat(2));
471+
emitter.Process(TestSourceId(2), MakeHeartbeat(4));
472+
{
473+
const auto heartbeat = emitter.CanEmit();
474+
UNIT_ASSERT(heartbeat.Defined());
475+
UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(2).Version);
476+
}
477+
}
478+
479+
// full update
480+
TSourceIdStorage storage3;
481+
storage3.RegisterSourceId(TestSourceId(1), MakeExplicitSourceIdInfo(++offset, MakeHeartbeat(1)));
482+
storage3.RegisterSourceId(TestSourceId(2), MakeExplicitSourceIdInfo(++offset, MakeHeartbeat(2)));
483+
{
484+
THeartbeatEmitter emitter(storage3);
485+
UNIT_ASSERT(!emitter.CanEmit().Defined());
486+
487+
emitter.Process(TestSourceId(1), MakeHeartbeat(3));
488+
emitter.Process(TestSourceId(2), MakeHeartbeat(4));
489+
{
490+
const auto heartbeat = emitter.CanEmit();
491+
UNIT_ASSERT(heartbeat.Defined());
492+
UNIT_ASSERT_VALUES_EQUAL(heartbeat->Version, MakeHeartbeat(3).Version);
493+
}
494+
}
461495
}
462496

463497
Y_UNIT_TEST(SourceIdMinSeqNo) {

0 commit comments

Comments
 (0)