Skip to content

Commit 5f4cbde

Browse files
The consumer's generation number is not stored in the transaction (ydb-platform#9555)
1 parent 4bd5031 commit 5f4cbde

File tree

3 files changed

+61
-22
lines changed

3 files changed

+61
-22
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,8 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
715715
{
716716
Config = newConfig;
717717

718+
PQ_LOG_D("Apply new config " << Config.ShortDebugString());
719+
718720
ui32 cacheSize = CACHE_SIZE;
719721
if (Config.HasCacheSize()) {
720722
cacheSize = Config.GetCacheSize();
@@ -1630,6 +1632,32 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
16301632
Y_ABORT_UNLESS(topicConverter->IsValid(), "%s", topicConverter->GetReason().c_str());
16311633
}
16321634

1635+
void TPersQueue::UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const
1636+
{
1637+
Y_ABORT_UNLESS(cfg.HasVersion());
1638+
const int curConfigVersion = cfg.GetVersion();
1639+
1640+
// set rr generation for provided read rules
1641+
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
1642+
for (const auto& c : Config.GetConsumers()) {
1643+
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
1644+
}
1645+
1646+
for (auto& c : *cfg.MutableConsumers()) {
1647+
auto it = existed.find(c.GetName());
1648+
ui64 generation = 0;
1649+
if (it != existed.end() && it->second.first == c.GetVersion()) {
1650+
generation = it->second.second;
1651+
} else {
1652+
generation = curConfigVersion;
1653+
}
1654+
c.SetGeneration(generation);
1655+
if (ReadRuleCompatible()) {
1656+
cfg.AddReadRuleGenerations(generation);
1657+
}
1658+
}
1659+
}
1660+
16331661
void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
16341662
{
16351663
const auto& record = ev->GetRecord();
@@ -1642,7 +1670,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
16421670
NKikimrPQ::TPQTabletConfig cfg = record.GetTabletConfig();
16431671

16441672
Y_ABORT_UNLESS(cfg.HasVersion());
1645-
int curConfigVersion = cfg.GetVersion();
1673+
const int curConfigVersion = cfg.GetVersion();
16461674

16471675
if (curConfigVersion == oldConfigVersion) { //already applied
16481676
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
@@ -1741,27 +1769,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
17411769

17421770
Migrate(cfg);
17431771

1744-
// set rr generation for provided read rules
1745-
{
1746-
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
1747-
for (const auto& c : Config.GetConsumers()) {
1748-
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
1749-
}
1750-
1751-
for (auto& c : *cfg.MutableConsumers()) {
1752-
auto it = existed.find(c.GetName());
1753-
ui64 generation = 0;
1754-
if (it != existed.end() && it->second.first == c.GetVersion()) {
1755-
generation = it->second.second;
1756-
} else {
1757-
generation = curConfigVersion;
1758-
}
1759-
c.SetGeneration(generation);
1760-
if (ReadRuleCompatible()) {
1761-
cfg.AddReadRuleGenerations(generation);
1762-
}
1763-
}
1764-
}
1772+
UpdateReadRuleGenerations(cfg);
17651773

17661774
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
17671775
<< " Config update version " << cfg.GetVersion() << "(current " << Config.GetVersion() << ") received from actor " << sender
@@ -3727,6 +3735,10 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
37273735
tx.OnProposeTransaction(event, GetAllowedStep(),
37283736
TabletID());
37293737

3738+
if (tx.Kind == NKikimrPQ::TTransaction::KIND_CONFIG) {
3739+
UpdateReadRuleGenerations(tx.TabletConfig);
3740+
}
3741+
37303742
if (tx.WriteId.Defined()) {
37313743
const TWriteId& writeId = *tx.WriteId;
37323744
Y_ABORT_UNLESS(TxWrites.contains(writeId),

ydb/core/persqueue/pq_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
530530

531531
bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const;
532532
void DeleteWriteId(const TMaybe<TWriteId>& writeId);
533+
534+
void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const;
533535
};
534536

535537

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class TFixture : public NUnitTest::TBaseFixture {
3939

4040
void SetUp(NUnitTest::TTestContext&) override;
4141

42+
virtual bool GetEnablePQConfigTransactionsAtSchemeShard() const;
43+
4244
NTable::TSession CreateTableSession();
4345
NTable::TTransaction BeginTx(NTable::TSession& session);
4446
void CommitTx(NTable::TTransaction& tx, EStatus status = EStatus::SUCCESS);
@@ -62,6 +64,8 @@ class TFixture : public NUnitTest::TBaseFixture {
6264
std::optional<size_t> maxPartitionCount = std::nullopt);
6365
void DescribeTopic(const TString& path);
6466

67+
void AddConsumer(const TString& topic, const TVector<TString>& consumers);
68+
6569
void WriteToTopicWithInvalidTxId(bool invalidTxId);
6670

6771
TTopicWriteSessionPtr CreateTopicWriteSession(const TString& topicPath,
@@ -192,11 +196,18 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
192196
{
193197
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
194198
settings.SetEnableTopicServiceTx(true);
199+
settings.SetEnablePQConfigTransactionsAtSchemeShard(GetEnablePQConfigTransactionsAtSchemeShard());
200+
195201
Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
196202

197203
Driver = std::make_unique<TDriver>(Setup->MakeDriver());
198204
}
199205

206+
bool TFixture::GetEnablePQConfigTransactionsAtSchemeShard() const
207+
{
208+
return true;
209+
}
210+
200211
NTable::TSession TFixture::CreateTableSession()
201212
{
202213
NTable::TTableClient client(GetDriver());
@@ -323,6 +334,20 @@ void TFixture::CreateTopic(const TString& path,
323334
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
324335
}
325336

337+
void TFixture::AddConsumer(const TString& path,
338+
const TVector<TString>& consumers)
339+
{
340+
NTopic::TTopicClient client(GetDriver());
341+
NTopic::TAlterTopicSettings settings;
342+
343+
for (const auto& consumer : consumers) {
344+
settings.BeginAddConsumer(consumer);
345+
}
346+
347+
auto result = client.AlterTopic(path, settings).GetValueSync();
348+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
349+
}
350+
326351
void TFixture::DescribeTopic(const TString& path)
327352
{
328353
Setup->DescribeTopic(path);

0 commit comments

Comments
 (0)