Skip to content

Commit fff1a83

Browse files
authored
disable partition spliting and partition merging by load for mirrored… (#20727)
1 parent f799985 commit fff1a83

File tree

7 files changed

+12
-7
lines changed

7 files changed

+12
-7
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
624624

625625
ScheduleUpdateAvailableSize(ctx);
626626

627-
if (Config.GetPartitionConfig().HasMirrorFrom()) {
627+
if (MirroringEnabled(Config)) {
628628
CreateMirrorerActor();
629629
}
630630

@@ -2892,7 +2892,7 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
28922892
Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
28932893
TotalPartitionWriteSpeed = config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond();
28942894

2895-
if (Config.GetPartitionConfig().HasMirrorFrom()) {
2895+
if (MirroringEnabled(Config)) {
28962896
if (Mirrorer) {
28972897
ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter,
28982898
Config));

ydb/core/persqueue/partition_init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -933,7 +933,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
933933
}
934934

935935
void TPartition::Initialize(const TActorContext& ctx) {
936-
if (Config.GetPartitionConfig().HasMirrorFrom()) {
936+
if (MirroringEnabled(Config)) {
937937
ManageWriteTimestampEstimate = !Config.GetPartitionConfig().GetMirrorFrom().GetSyncWriteTime();
938938
} else {
939939
ManageWriteTimestampEstimate = IsLocalDC;

ydb/core/persqueue/partition_write.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
599599
AnswerCurrentWrites(ctx);
600600
SyncMemoryStateWithKVState(ctx);
601601

602-
if (SplitMergeEnabled(Config) && !IsSupportive()) {
602+
if (SplitMergeEnabled(Config) && !IsSupportive() && !MirroringEnabled(Config)) {
603603
SplitMergeAvgWriteBytes->Update(writeNewSizeFull, now);
604604
auto needScaling = CheckScaleStatus(ctx);
605605
ChangeScaleStatusIfNeeded(needScaling);
@@ -683,7 +683,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
683683
return sum + msg.Data.size();
684684
});
685685

686-
bool mirroredPartition = Config.GetPartitionConfig().HasMirrorFrom();
686+
bool mirroredPartition = MirroringEnabled(Config);
687687

688688
if (mirroredPartition && !ev->Get()->OwnerCookie.empty()) {
689689
ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::BAD_REQUEST,

ydb/core/persqueue/pq_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2029,7 +2029,7 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, NWilson::TTraceId
20292029

20302030
TVector<TEvPQ::TEvWrite::TMsg> msgs;
20312031

2032-
bool mirroredPartition = Config.GetPartitionConfig().HasMirrorFrom();
2032+
bool mirroredPartition = MirroringEnabled(Config);
20332033

20342034
if (!req.GetIsDirectWrite()) {
20352035
if (!req.HasMessageNo()) {

ydb/core/persqueue/utils.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config) {
3333
return config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond();
3434
}
3535

36+
bool MirroringEnabled(const NKikimrPQ::TPQTabletConfig& config) {
37+
return config.GetPartitionConfig().HasMirrorFrom();
38+
}
39+
3640
bool SplitMergeEnabled(const NKikimrPQ::TPQTabletConfig& config) {
3741
return config.has_partitionstrategy() && config.partitionstrategy().has_partitionstrategytype() && config.partitionstrategy().partitionstrategytype() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED;
3842
}

ydb/core/persqueue/utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace NKikimr::NPQ {
1111
ui64 TopicPartitionReserveSize(const NKikimrPQ::TPQTabletConfig& config);
1212
ui64 TopicPartitionReserveThroughput(const NKikimrPQ::TPQTabletConfig& config);
1313

14+
bool MirroringEnabled(const NKikimrPQ::TPQTabletConfig& config);
1415
bool SplitMergeEnabled(const NKikimrPQ::TPQTabletConfig& config);
1516

1617
size_t CountActivePartitions(const ::google::protobuf::RepeatedPtrField< ::NKikimrPQ::TPQTabletConfig_TPartition >& partitions);

ydb/services/persqueue_v1/actors/schema_actors.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T
163163
rr->set_service_type(pqConfig.GetDefaultClientServiceType().GetName());
164164
}
165165
}
166-
if (partConfig.HasMirrorFrom()) {
166+
if (NPQ::MirroringEnabled(config)) {
167167
auto rmr = settings->mutable_remote_mirror_rule();
168168
TStringBuilder endpoint;
169169
if (partConfig.GetMirrorFrom().GetUseSecureConnection()) {

0 commit comments

Comments
 (0)