Skip to content

Commit f24c105

Browse files
authored
Revert "Complex messages expiration in the topic (#16413)" (#16628)
1 parent 56d008b commit f24c105

File tree

7 files changed

+10
-61
lines changed

7 files changed

+10
-61
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,6 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
429429
const TDuration lifetimeLimit{TDuration::Seconds(partConfig.GetLifetimeSeconds())};
430430

431431
const bool hasStorageLimit = partConfig.HasStorageLimitBytes();
432-
const auto hasLifetime = !hasStorageLimit || (partConfig.HasLifetimeSeconds() && partConfig.GetLifetimeSeconds() > 0);
433432
const auto now = ctx.Now();
434433
const ui64 importantConsumerMinOffset = ImportantClientsMinOffset();
435434

@@ -447,11 +446,15 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont
447446
}
448447

449448
auto& firstKey = DataKeysBody.front();
450-
451-
auto expiredByLifetime = hasLifetime && now >= firstKey.Timestamp + lifetimeLimit;
452-
auto expiredByStorageLimit = hasStorageLimit && BodySize > firstKey.Size && (BodySize - firstKey.Size) >= partConfig.GetStorageLimitBytes();
453-
if (!expiredByLifetime && !expiredByStorageLimit) {
454-
break;
449+
if (hasStorageLimit) {
450+
const auto bodySize = BodySize - firstKey.Size;
451+
if (bodySize < partConfig.GetStorageLimitBytes()) {
452+
break;
453+
}
454+
} else {
455+
if (now < firstKey.Timestamp + lifetimeLimit) {
456+
break;
457+
}
455458
}
456459

457460
BodySize -= firstKey.Size;

ydb/core/persqueue/ut/utils_ut.cpp

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -76,39 +76,6 @@ Y_UNIT_TEST_SUITE(TPQUtilsTest) {
7676
UNIT_ASSERT_VALUES_EQUAL(r, 2);
7777
}
7878
}
79-
80-
Y_UNIT_TEST(Migration_Lifetime) {
81-
{
82-
NKikimrPQ::TPQTabletConfig config;
83-
config.MutablePartitionConfig()->SetLifetimeSeconds(123);
84-
NKikimr::NPQ::Migrate(config);
85-
86-
UNIT_ASSERT_VALUES_EQUAL(true, config.GetMigrations().GetLifetime());
87-
UNIT_ASSERT_VALUES_EQUAL(false, config.GetPartitionConfig().HasStorageLimitBytes());
88-
UNIT_ASSERT_VALUES_EQUAL(123, config.GetPartitionConfig().GetLifetimeSeconds());
89-
}
90-
{
91-
NKikimrPQ::TPQTabletConfig config;
92-
config.MutablePartitionConfig()->SetLifetimeSeconds(123);
93-
config.MutablePartitionConfig()->SetStorageLimitBytes(456);
94-
NKikimr::NPQ::Migrate(config);
95-
96-
UNIT_ASSERT_VALUES_EQUAL(true, config.GetMigrations().GetLifetime());
97-
UNIT_ASSERT_VALUES_EQUAL(456, config.GetPartitionConfig().GetStorageLimitBytes());
98-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Days(3650).Seconds(), config.GetPartitionConfig().GetLifetimeSeconds());
99-
}
100-
{
101-
NKikimrPQ::TPQTabletConfig config;
102-
config.MutableMigrations()->SetLifetime(true);
103-
config.MutablePartitionConfig()->SetLifetimeSeconds(123);
104-
config.MutablePartitionConfig()->SetStorageLimitBytes(456);
105-
NKikimr::NPQ::Migrate(config);
106-
107-
UNIT_ASSERT_VALUES_EQUAL(true, config.GetMigrations().GetLifetime());
108-
UNIT_ASSERT_VALUES_EQUAL(456, config.GetPartitionConfig().GetStorageLimitBytes());
109-
UNIT_ASSERT_VALUES_EQUAL(123, config.GetPartitionConfig().GetLifetimeSeconds());
110-
}
111-
}
11279
}
11380

11481
}

ydb/core/persqueue/utils.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,6 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) {
116116
config.AddAllPartitions()->CopyFrom(partition);
117117
}
118118
}
119-
120-
if (!config.GetMigrations().GetLifetime()) {
121-
if (config.GetPartitionConfig().HasStorageLimitBytes()) {
122-
config.MutablePartitionConfig()->SetLifetimeSeconds(TDuration::Days(3650).Seconds());
123-
}
124-
}
125-
config.MutableMigrations()->SetLifetime(true);
126119
}
127120

128121
bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) {

ydb/core/protos/pqconfig.proto

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ message TPQTabletConfig {
421421
optional uint32 MinPartitionCount = 1 [default = 1];
422422
// The maximum number of partitions that will be supported by the strategy. The strategy will not create partitions if the specified
423423
// amount is reached, even if the load exceeds the current capabilities of the topic.
424-
optional uint32 MaxPartitionCount = 2 [default = 1];
424+
optional uint32 MaxPartitionCount = 2 [default = 1];;
425425
optional uint32 ScaleThresholdSeconds = 3 [default = 300];
426426
optional uint32 ScaleUpPartitionWriteSpeedThresholdPercent = 4 [default = 80];
427427
optional uint32 ScaleDownPartitionWriteSpeedThresholdPercent = 5 [default = 20];
@@ -433,11 +433,6 @@ message TPQTabletConfig {
433433
repeated TPartition AllPartitions = 36; // filled by schemeshard
434434

435435
optional TOffloadConfig OffloadConfig = 38;
436-
437-
message TMigrations {
438-
optional bool Lifetime = 1 [default = false];
439-
}
440-
optional TMigrations Migrations = 39;
441436
}
442437

443438
message THeartbeat {

ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,6 @@ class TAlterPQ: public TSubOperation {
217217
TPath::Resolve(alterConfig.GetOffloadConfig().GetIncrementalBackup().GetDstPath(), context.SS).Base()->PathId.ToProto(pathId);
218218
}
219219

220-
alterConfig.MutableMigrations()->CopyFrom(tabletConfig->GetMigrations());
221-
222220
alterConfig.MutablePartitionKeySchema()->Swap(tabletConfig->MutablePartitionKeySchema());
223221
Y_PROTOBUF_SUPPRESS_NODISCARD alterConfig.SerializeToString(&params->TabletConfig);
224222
alterConfig.Swap(tabletConfig);
@@ -559,7 +557,6 @@ class TAlterPQ: public TSubOperation {
559557
}
560558

561559
NKikimrPQ::TPQTabletConfig tabletConfig = topic->GetTabletConfig();
562-
NKikimr::NPQ::Migrate(tabletConfig);
563560
NKikimrPQ::TPQTabletConfig newTabletConfig = tabletConfig;
564561

565562
TTopicInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr);

ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,6 @@ TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
168168
}
169169

170170
NKikimrPQ::TPQTabletConfig tabletConfig = op.GetPQTabletConfig();
171-
172-
tabletConfig.MutableMigrations()->SetLifetime(true);
173-
174171
tabletConfig.ClearPartitionIds();
175172
tabletConfig.ClearPartitions();
176173

ydb/services/persqueue_v1/persqueue_ut.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5428,9 +5428,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
54285428
Version: 567
54295429
Important: true
54305430
}
5431-
Migrations {
5432-
Lifetime: true
5433-
}
54345431
}
54355432
ErrorCode: OK
54365433
}

0 commit comments

Comments
 (0)