Skip to content

Commit 0297d5d

Browse files
authored
Refactor building the list of partition locations (#20610)
1 parent 9800792 commit 0297d5d

File tree

1 file changed

+24
-13
lines changed

1 file changed

+24
-13
lines changed

ydb/core/persqueue/pqrb/read_balancer.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -722,11 +722,10 @@ void TPersQueueReadBalancer::HandleOnInit(TEvPersQueue::TEvGetPartitionsLocation
722722
}
723723

724724
void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx) {
725-
auto* evResponse = new TEvPersQueue::TEvGetPartitionsLocationResponse();
726725
const auto& request = ev->Get()->Record;
726+
auto evResponse = std::make_unique<TEvPersQueue::TEvGetPartitionsLocationResponse>();
727+
727728
auto addPartitionToResponse = [&](ui64 partitionId, ui64 tabletId) {
728-
auto* pResponse = evResponse->Record.AddLocations();
729-
pResponse->SetPartitionId(partitionId);
730729
if (PipesRequested.contains(tabletId)) {
731730
return false;
732731
}
@@ -735,36 +734,48 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetPartitionsLocation::TPtr
735734
GetPipeClient(tabletId, ctx);
736735
return false;
737736
}
737+
738+
auto* pResponse = evResponse->Record.AddLocations();
739+
pResponse->SetPartitionId(partitionId);
738740
pResponse->SetNodeId(iter->second.NodeId.GetRef());
739741
pResponse->SetGeneration(iter->second.Generation.GetRef());
740742

741-
PQ_LOG_D("addPartitionToResponse tabletId " << tabletId << ", partitionId " << partitionId
743+
PQ_LOG_D("The partition location was added to response: TabletId " << tabletId << ", PartitionId " << partitionId
742744
<< ", NodeId " << pResponse->GetNodeId() << ", Generation " << pResponse->GetGeneration());
745+
743746
return true;
744747
};
745-
auto sendResponse = [&](bool status) {
746-
evResponse->Record.SetStatus(status);
747-
ctx.Send(ev->Sender, evResponse);
748+
749+
auto sendError = [&]() {
750+
auto response = std::make_unique<TEvPersQueue::TEvGetPartitionsLocationResponse>();
751+
response->Record.SetStatus(false);
752+
ctx.Send(ev->Sender, response.release());
748753
};
749-
bool ok = true;
754+
750755
if (request.PartitionsSize() == 0) {
751756
if (!PipesRequested.empty() || TabletPipes.size() < TabletsInfo.size()) {
752757
// Do not have all pipes connected.
753-
return sendResponse(false);
758+
return sendError();
754759
}
755760
for (const auto& [partitionId, partitionInfo] : PartitionsInfo) {
756-
ok = addPartitionToResponse(partitionId, partitionInfo.TabletId) && ok;
761+
if (!addPartitionToResponse(partitionId, partitionInfo.TabletId)) {
762+
return sendError();
763+
}
757764
}
758765
} else {
759766
for (const auto& partitionInRequest : request.GetPartitions()) {
760767
auto partitionInfoIter = PartitionsInfo.find(partitionInRequest);
761768
if (partitionInfoIter == PartitionsInfo.end()) {
762-
return sendResponse(false);
769+
return sendError();
770+
}
771+
if (!addPartitionToResponse(partitionInRequest, partitionInfoIter->second.TabletId)) {
772+
return sendError();
763773
}
764-
ok = addPartitionToResponse(partitionInRequest, partitionInfoIter->second.TabletId) && ok;
765774
}
766775
}
767-
return sendResponse(ok);
776+
777+
evResponse->Record.SetStatus(true);
778+
ctx.Send(ev->Sender, evResponse.release());
768779
}
769780

770781

0 commit comments

Comments
 (0)