Skip to content

Commit 63599f6

Browse files
authored
Fix direct write status code (#7735)
1 parent 8f1e398 commit 63599f6

File tree

3 files changed

+8
-7
lines changed

3 files changed

+8
-7
lines changed

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,8 @@ void TPartitionFamily::AfterRelease() {
285285
Partitions.clear();
286286
Partitions.insert(Partitions.end(), RootPartitions.begin(), RootPartitions.end());
287287

288+
LockedPartitions.clear();
289+
288290
ClassifyPartitions();
289291
UpdatePartitionMapping(Partitions);
290292
// After reducing the number of partitions in the family, the list of reading sessions that can read this family may expand.
@@ -464,7 +466,7 @@ bool TPartitionFamily::PossibleForBalance(TSession* session) {
464466

465467
void TPartitionFamily::ClassifyPartitions() {
466468
auto [activePartitionCount, inactivePartitionCount] = ClassifyPartitions(Partitions);
467-
ChangePartitionCounters(activePartitionCount, inactivePartitionCount);
469+
ChangePartitionCounters(activePartitionCount - ActivePartitionCount, inactivePartitionCount - InactivePartitionCount);
468470
}
469471

470472
template<typename TPartitions>
@@ -1792,10 +1794,9 @@ void TBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr& ev, const TAc
17921794
pi->SetPartition(partitionId);
17931795

17941796
auto* family = consumer->FindFamily(partitionId);
1795-
if (family && family->LockedPartitions.contains(partitionId)) {
1797+
if (family && family->Session && family->LockedPartitions.contains(partitionId)) {
17961798
auto* session = family->Session;
17971799

1798-
Y_ABORT_UNLESS(session != nullptr);
17991800
pi->SetClientNode(session->ClientNode);
18001801
pi->SetProxyNodeId(session->ProxyNodeId);
18011802
pi->SetSession(session->SessionName);

ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ class TAbstractPartitionChooserActor: public TActorBootstrapped<TDerived> {
217217
TThis::Become(&TThis::StateCheckPartition);
218218

219219
if (!Partition) {
220-
return ReplyError(ErrorCode::INITIALIZING, "Partition not choosed", ctx);
220+
return ReplyError(TThis::PreferedPartition ? ErrorCode::WRITE_ERROR_PARTITION_INACTIVE : ErrorCode::INITIALIZING, "Partition not choosed", ctx);
221221
}
222222

223223
PartitionHelper.Open(Partition->TabletId, ctx);

ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class TSMPartitionChooserActor: public TAbstractPartitionChooserActor<TSMPartiti
7171

7272
const auto* node = Graph.GetPartition(TThis::TableHelper.PartitionId().value());
7373
if (!node) {
74-
// The partition where the writting was performed earlier has already been deleted.
74+
// The partition where the writting was performed earlier has already been deleted.
7575
// We can write without taking into account the hierarchy of the partition.
7676
TThis::Partition = BoundaryPartition;
7777
return OnPartitionChosen(ctx);
@@ -109,7 +109,7 @@ class TSMPartitionChooserActor: public TAbstractPartitionChooserActor<TSMPartiti
109109
void GetOwnershipFast(const TActorContext &ctx) {
110110
TThis::Become(&TThis::StateOwnershipFast);
111111
if (!BoundaryPartition) {
112-
return TThis::ReplyError(ErrorCode::INITIALIZING, "A partition not choosed", ctx);
112+
return TThis::ReplyError(TThis::PreferedPartition ? ErrorCode::WRITE_ERROR_PARTITION_INACTIVE : ErrorCode::INITIALIZING, "A partition not choosed", ctx);
113113
}
114114

115115
DEBUG("GetOwnershipFast Partition=" << BoundaryPartition->PartitionId << " TabletId=" << BoundaryPartition->TabletId);
@@ -120,7 +120,7 @@ class TSMPartitionChooserActor: public TAbstractPartitionChooserActor<TSMPartiti
120120

121121
void HandleFast(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse::TPtr& ev, const NActors::TActorContext& ctx) {
122122
TThis::PartitionHelper.Close(ctx);
123-
if (NKikimrPQ::ETopicPartitionStatus::Active == ev->Get()->Record.GetStatus()
123+
if (NKikimrPQ::ETopicPartitionStatus::Active == ev->Get()->Record.GetStatus()
124124
&& ev->Get()->Record.HasSeqNo()
125125
&& ev->Get()->Record.GetSeqNo() > 0) {
126126
// Fast path: the partition ative and already written

0 commit comments

Comments
 (0)