Skip to content

Commit 6b10651

Browse files
Alek5andr-KotovGazizonoki
authored andcommitted
Moved commit "The consumer's generation number is not stored in the transaction" from ydb repo
1 parent c929e9e commit 6b10651

File tree

1 file changed

+116
-1
lines changed

1 file changed

+116
-1
lines changed

src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb-cpp-sdk/client/topic/client.h>
44
#include <ydb-cpp-sdk/client/table/table.h>
55
#include <src/client/persqueue_public/ut/ut_utils/ut_utils.h>
6+
#include <ydb/core/cms/console/console.h>
67
#include <ydb/core/keyvalue/keyvalue_events.h>
78
#include <ydb/core/persqueue/key.h>
89
#include <ydb/core/persqueue/blob.h>
@@ -42,8 +43,14 @@ class TFixture : public NUnitTest::TBaseFixture {
4243
void WaitForEvent();
4344
};
4445

46+
struct TFeatureFlags {
47+
bool EnablePQConfigTransactionsAtSchemeShard = true;
48+
};
49+
4550
void SetUp(NUnitTest::TTestContext&) override;
4651

52+
void NotifySchemeShard(const TFeatureFlags& flags);
53+
4754
NTable::TSession CreateTableSession();
4855
NTable::TTransaction BeginTx(NTable::TSession& session);
4956
void CommitTx(NTable::TTransaction& tx, EStatus status = EStatus::SUCCESS);
@@ -65,9 +72,10 @@ class TFixture : public NUnitTest::TBaseFixture {
6572
const TString& consumer = TEST_CONSUMER,
6673
size_t partitionCount = 1,
6774
std::optional<size_t> maxPartitionCount = std::nullopt);
68-
6975
void DescribeTopic(const TString& path);
7076

77+
void AddConsumer(const TString& topic, const TVector<TString>& consumers);
78+
7179
void WriteToTopicWithInvalidTxId(bool invalidTxId);
7280

7381
TTopicWriteSessionPtr CreateTopicWriteSession(const TString& topicPath,
@@ -102,6 +110,8 @@ class TFixture : public NUnitTest::TBaseFixture {
102110
NYdb::EStatus status);
103111
void CloseTopicWriteSession(const TString& topicPath,
104112
const TString& messageGroupId);
113+
void CloseTopicReadSession(const TString& topicPath,
114+
const TString& consumerName);
105115

106116
enum EEndOfTransaction {
107117
Commit,
@@ -182,6 +192,8 @@ class TFixture : public NUnitTest::TBaseFixture {
182192
ui64 tabletId,
183193
const NPQ::TWriteId& writeId);
184194

195+
ui64 GetSchemeShardTabletId(const TActorId& actorId);
196+
185197
std::unique_ptr<TTopicSdkTestSetup> Setup;
186198
std::unique_ptr<TDriver> Driver;
187199

@@ -199,11 +211,27 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
199211
{
200212
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
201213
settings.SetEnableTopicServiceTx(true);
214+
202215
Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
203216

204217
Driver = std::make_unique<TDriver>(Setup->MakeDriver());
205218
}
206219

220+
void TFixture::NotifySchemeShard(const TFeatureFlags& flags)
221+
{
222+
auto request = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>();
223+
*request->Record.MutableConfig() = *Setup->GetServer().ServerSettings.AppConfig;
224+
request->Record.MutableConfig()->MutableFeatureFlags()->SetEnablePQConfigTransactionsAtSchemeShard(flags.EnablePQConfigTransactionsAtSchemeShard);
225+
226+
auto& runtime = Setup->GetRuntime();
227+
auto actorId = runtime.AllocateEdgeActor();
228+
229+
ui64 ssId = GetSchemeShardTabletId(actorId);
230+
231+
runtime.SendToPipe(ssId, actorId, request.release());
232+
runtime.GrabEdgeEvent<NConsole::TEvConsole::TEvConfigNotificationResponse>();
233+
}
234+
207235
NTable::TSession TFixture::CreateTableSession()
208236
{
209237
NTable::TTableClient client(GetDriver());
@@ -330,6 +358,20 @@ void TFixture::CreateTopic(const TString& path,
330358
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
331359
}
332360

361+
void TFixture::AddConsumer(const TString& path,
362+
const TVector<TString>& consumers)
363+
{
364+
NTopic::TTopicClient client(GetDriver());
365+
NTopic::TAlterTopicSettings settings;
366+
367+
for (const auto& consumer : consumers) {
368+
settings.BeginAddConsumer(consumer);
369+
}
370+
371+
auto result = client.AlterTopic(path, settings).GetValueSync();
372+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
373+
}
374+
333375
void TFixture::DescribeTopic(const TString& path)
334376
{
335377
Setup->DescribeTopic(path);
@@ -664,6 +706,13 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath,
664706
TopicWriteSessions.erase(key);
665707
}
666708

709+
void TFixture::CloseTopicReadSession(const TString& topicPath,
710+
const TString& consumerName)
711+
{
712+
Y_UNUSED(consumerName);
713+
TopicReadSessions.erase(topicPath);
714+
}
715+
667716
void TFixture::WriteToTopic(const TString& topicPath,
668717
const TString& messageGroupId,
669718
const TString& message,
@@ -780,6 +829,37 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
780829
UNIT_ASSERT(context.AckCount() <= context.WriteCount);
781830
}
782831

832+
ui64 TFixture::GetSchemeShardTabletId(const TActorId& actorId)
833+
{
834+
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
835+
navigate->DatabaseName = "/Root";
836+
837+
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
838+
entry.Path = SplitPath("/Root");
839+
entry.SyncVersion = true;
840+
entry.ShowPrivatePath = true;
841+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
842+
843+
navigate->ResultSet.push_back(std::move(entry));
844+
//navigate->UserToken = "root@builtin";
845+
navigate->Cookie = 12345;
846+
847+
auto& runtime = Setup->GetRuntime();
848+
849+
runtime.Send(MakeSchemeCacheID(), actorId,
850+
new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()),
851+
0,
852+
true);
853+
auto response = runtime.GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySetResult>();
854+
855+
UNIT_ASSERT_VALUES_EQUAL(response->Request->Cookie, 12345);
856+
UNIT_ASSERT_VALUES_EQUAL(response->Request->ErrorCount, 0);
857+
858+
auto& front = response->Request->ResultSet.front();
859+
860+
return front.Self->Info.GetSchemeshardId();
861+
}
862+
783863
ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition)
784864
{
785865
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
@@ -2017,6 +2097,41 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
20172097
WriteMessagesInTx(0, 1);
20182098
}
20192099

2100+
Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
2101+
{
2102+
// There was a server
2103+
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false});
2104+
2105+
// Users have created their own topic on it
2106+
CreateTopic(TEST_TOPIC);
2107+
2108+
// And they wrote their messages into it
2109+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-1");
2110+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-2");
2111+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-3");
2112+
2113+
// And he had a consumer
2114+
AddConsumer(TEST_TOPIC, {"consumer-1"});
2115+
2116+
// We read messages from the topic and committed offsets
2117+
auto messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
2118+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
2119+
CloseTopicReadSession(TEST_TOPIC, "consumer-1");
2120+
2121+
// And then the Logbroker team turned on the feature flag
2122+
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true});
2123+
2124+
// Users continued to write to the topic
2125+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4");
2126+
2127+
// Users have added new consumers
2128+
AddConsumer(TEST_TOPIC, {"consumer-2"});
2129+
2130+
// And they wanted to continue reading their messages
2131+
messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
2132+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
2133+
}
2134+
20202135
}
20212136

20222137
}

0 commit comments

Comments
 (0)