Skip to content

Commit 2713098

Browse files
An empty list of consumers for the background partition (#15960)
1 parent 093e4ee commit 2713098

File tree

8 files changed

+128
-3
lines changed

8 files changed

+128
-3
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2757,7 +2757,7 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
27572757

27582758
TString TPartition::GetKeyConfig() const
27592759
{
2760-
return Sprintf("_config_%u", Partition.OriginalPartitionId);
2760+
return Sprintf("_config_%u", Partition.InternalPartitionId);
27612761
}
27622762

27632763
void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)

ydb/core/persqueue/partition_init.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
200200
Y_ABORT("bad status");
201201
};
202202

203+
// There should be no consumers in the configuration of the background partition. When creating a partition,
204+
// the PQ tablet specifically removes all consumer settings from the config.
205+
Y_ABORT_UNLESS(!Partition()->IsSupportive() ||
206+
(Partition()->Config.GetConsumers().empty() && Partition()->TabletConfig.GetConsumers().empty()));
207+
203208
Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition.OriginalPartitionId);
204209
Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config);
205210

@@ -395,6 +400,7 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
395400
case NKikimrProto::OVERRUN: {
396401
auto& sourceIdStorage = Partition()->SourceIdStorage;
397402
auto& usersInfoStorage = Partition()->UsersInfoStorage;
403+
const bool isSupportive = Partition()->IsSupportive();
398404

399405
for (ui32 i = 0; i < range.PairSize(); ++i) {
400406
const auto& pair = range.GetPair(i);
@@ -416,9 +422,9 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
416422
sourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), now);
417423
} else if (type == TKeyPrefix::MarkProtoSourceId) {
418424
sourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), now);
419-
} else if (type == TKeyPrefix::MarkUser) {
425+
} else if ((type == TKeyPrefix::MarkUser) && !isSupportive) {
420426
usersInfoStorage->Parse(*key, pair.GetValue(), ctx);
421-
} else if (type == TKeyPrefix::MarkUserDeprecated) {
427+
} else if ((type == TKeyPrefix::MarkUserDeprecated) && !isSupportive) {
422428
usersInfoStorage->ParseDeprecated(*key, pair.GetValue(), ctx);
423429
}
424430
}

ydb/core/persqueue/partition_read.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ void TPartition::SendReadingFinished(const TString& consumer) {
5151
}
5252

5353
void TPartition::FillReadFromTimestamps(const TActorContext& ctx) {
54+
if (IsSupportive()) {
55+
return;
56+
}
57+
5458
TSet<TString> hasReadRule;
5559

5660
for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {

ydb/core/persqueue/pq_impl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,7 @@ NKikimrPQ::TPQTabletConfig TPersQueue::MakeSupportivePartitionConfig() const
865865
partitionConfig.MutableReadRuleServiceTypes()->Clear();
866866
partitionConfig.MutableReadRuleVersions()->Clear();
867867
partitionConfig.MutableReadRuleGenerations()->Clear();
868+
partitionConfig.MutableConsumers()->Clear();
868869

869870
return partitionConfig;
870871
}
Binary file not shown.
Binary file not shown.

ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <library/cpp/logger/stream.h>
1818
#include <library/cpp/testing/unittest/registar.h>
19+
#include <library/cpp/streams/bzip2/bzip2.h>
1920

2021
namespace NYdb::NTopic::NTests {
2122

@@ -87,6 +88,7 @@ class TFixture : public NUnitTest::TBaseFixture {
8788
TTopicDescription DescribeTopic(const TString& path);
8889

8990
void AddConsumer(const TString& topicPath, const TVector<TString>& consumers);
91+
void DropConsumer(const TString& topicPath, const TVector<TString>& consumers);
9092
void AlterAutoPartitioning(const TString& topicPath,
9193
ui64 minActivePartitions,
9294
ui64 maxActivePartitions,
@@ -148,6 +150,9 @@ class TFixture : public NUnitTest::TBaseFixture {
148150
void RestartLongTxService();
149151
void RestartPQTablet(const TString& topicPath, ui32 partition);
150152
void DumpPQTabletKeys(const TString& topicName, ui32 partition);
153+
void PQTabletPrepareFromResource(const TString& topicPath,
154+
ui32 partitionId,
155+
const TString& resourceName);
151156

152157
void DeleteSupportivePartition(const TString& topicName,
153158
ui32 partition);
@@ -483,6 +488,20 @@ void TFixture::AddConsumer(const TString& topicPath,
483488
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
484489
}
485490

491+
void TFixture::DropConsumer(const TString& topicPath,
492+
const TVector<TString>& consumers)
493+
{
494+
NTopic::TTopicClient client(GetDriver());
495+
NTopic::TAlterTopicSettings settings;
496+
497+
for (const auto& consumer : consumers) {
498+
settings.AppendDropConsumers(consumer);
499+
}
500+
501+
auto result = client.AlterTopic(topicPath, settings).GetValueSync();
502+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
503+
}
504+
486505
void TFixture::AlterAutoPartitioning(const TString& topicPath,
487506
ui64 minActivePartitions,
488507
ui64 maxActivePartitions,
@@ -1843,6 +1862,51 @@ void TFixture::DumpPQTabletKeys(const TString& topicName)
18431862
}
18441863
}
18451864

1865+
void TFixture::PQTabletPrepareFromResource(const TString& topicPath,
1866+
ui32 partitionId,
1867+
const TString& resourceName)
1868+
{
1869+
auto& runtime = Setup->GetRuntime();
1870+
TActorId edge = runtime.AllocateEdgeActor();
1871+
ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicPath, partitionId);
1872+
1873+
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
1874+
size_t count = 0;
1875+
1876+
for (TStringStream stream(NResource::Find(resourceName)); true; ++count) {
1877+
TString key, encoded;
1878+
1879+
if (!stream.ReadTo(key, ' ')) {
1880+
break;
1881+
}
1882+
encoded = stream.ReadLine();
1883+
1884+
auto decoded = Base64Decode(encoded);
1885+
TStringInput decodedStream(decoded);
1886+
TBZipDecompress decompressor(&decodedStream);
1887+
1888+
auto* cmd = request->Record.AddCmdWrite();
1889+
cmd->SetKey(key);
1890+
cmd->SetValue(decompressor.ReadAll());
1891+
}
1892+
1893+
runtime.SendToPipe(tabletId, edge, request.Release(), 0, GetPipeConfigWithRetries());
1894+
1895+
TAutoPtr<IEventHandle> handle;
1896+
auto* response = runtime.GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
1897+
UNIT_ASSERT(response);
1898+
UNIT_ASSERT(response->Record.HasStatus());
1899+
UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
1900+
1901+
UNIT_ASSERT_VALUES_EQUAL(response->Record.WriteResultSize(), count);
1902+
1903+
for (size_t i = 0; i < response->Record.WriteResultSize(); ++i) {
1904+
const auto &result = response->Record.GetWriteResult(i);
1905+
UNIT_ASSERT(result.HasStatus());
1906+
UNIT_ASSERT_EQUAL(result.GetStatus(), NKikimrProto::OK);
1907+
}
1908+
}
1909+
18461910
void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestDescription& d)
18471911
{
18481912
for (auto& topic : d.Topics) {
@@ -3059,6 +3123,51 @@ Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
30593123
}
30603124
}
30613125

3126+
Y_UNIT_TEST_F(The_Configuration_Is_Changing_As_We_Write_To_The_Topic, TFixture)
3127+
{
3128+
// To test that you can change the topic configuration while writing to the partition
3129+
3130+
CreateTopic("topic_A", TEST_CONSUMER, 2);
3131+
3132+
AddConsumer("topic_A", {"consumer1", "consumer2"});
3133+
3134+
auto session = CreateTableSession();
3135+
auto tx = BeginTx(session);
3136+
3137+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, TString(100, 'x'), &tx, 0);
3138+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, TString(100, 'x'), &tx, 1);
3139+
3140+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1);
3141+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2);
3142+
3143+
RestartPQTablet("topic_A", 0);
3144+
3145+
DropConsumer("topic_A", {"consumer1"});
3146+
3147+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_3, TString(100, 'x'), &tx, 0);
3148+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_3);
3149+
3150+
RestartPQTablet("topic_A", 0);
3151+
3152+
CommitTx(tx);
3153+
3154+
CheckTabletKeys("topic_A");
3155+
}
3156+
3157+
Y_UNIT_TEST_F(The_Transaction_Starts_On_One_Version_And_Ends_On_The_Other, TFixture)
3158+
{
3159+
// In the test, we check the compatibility between versions `24-4-2` and `24-4-*/25-1-*`. To do this, the data
3160+
// obtained on the `24-4-2` version is loaded into the PQ tablets.
3161+
3162+
CreateTopic("topic_A", TEST_CONSUMER, 2);
3163+
3164+
PQTabletPrepareFromResource("topic_A", 0, "topic_A_partition_0_v24-4-2.dat");
3165+
PQTabletPrepareFromResource("topic_A", 1, "topic_A_partition_1_v24-4-2.dat");
3166+
3167+
RestartPQTablet("topic_A", 0);
3168+
RestartPQTablet("topic_A", 1);
3169+
}
3170+
30623171
}
30633172

30643173
}

ydb/public/sdk/cpp/src/client/topic/ut/ya.make

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,9 @@ SRCS(
4040
trace_ut.cpp
4141
)
4242

43+
RESOURCE(
44+
ydb/public/sdk/cpp/src/client/topic/ut/resources/topic_A_partition_0_v24-4-2.dat topic_A_partition_0_v24-4-2.dat
45+
ydb/public/sdk/cpp/src/client/topic/ut/resources/topic_A_partition_1_v24-4-2.dat topic_A_partition_1_v24-4-2.dat
46+
)
47+
4348
END()

0 commit comments

Comments
 (0)