Skip to content

Commit 630decb

Browse files
The keys in the block cache (#13553)
1 parent 3635567 commit 630decb

File tree

1 file changed

+91
-0
lines changed

1 file changed

+91
-0
lines changed

src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <ydb/core/persqueue/key.h>
1010
#include <ydb/core/persqueue/blob.h>
1111
#include <ydb/core/persqueue/events/global.h>
12+
#include <ydb/core/persqueue/pq_l2_service.h>
1213
#include <ydb/core/tx/long_tx_service/public/events.h>
1314

1415
#include <ydb/core/persqueue/ut/common/autoscaling_ut_common.h>
@@ -237,6 +238,8 @@ class TFixture : public NUnitTest::TBaseFixture {
237238
virtual bool GetEnableHtapTx() const;
238239
virtual bool GetAllowOlapDataQuery() const;
239240

241+
size_t GetPQCacheRenameKeysCount();
242+
240243
private:
241244
template<class E>
242245
E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx);
@@ -248,6 +251,8 @@ class TFixture : public NUnitTest::TBaseFixture {
248251
ui32 partition);
249252
std::vector<std::string> GetTabletKeys(const TActorId& actorId,
250253
ui64 tabletId);
254+
std::vector<std::string> GetPQTabletDataKeys(const TActorId& actorId,
255+
ui64 tabletId);
251256
NPQ::TWriteId GetTransactionWriteId(const TActorId& actorId,
252257
ui64 tabletId);
253258
void SendLongTxLockStatus(const TActorId& actorId,
@@ -1084,6 +1089,43 @@ std::vector<std::string> TFixture::GetTabletKeys(const TActorId& actorId,
10841089
return keys;
10851090
}
10861091

1092+
std::vector<std::string> TFixture::GetPQTabletDataKeys(const TActorId& actorId,
1093+
ui64 tabletId)
1094+
{
1095+
using namespace NKikimr::NPQ;
1096+
1097+
std::vector<std::string> keys;
1098+
1099+
for (const auto& key : GetTabletKeys(actorId, tabletId)) {
1100+
if (key.empty() ||
1101+
((std::tolower(key.front()) != TKeyPrefix::TypeData) &&
1102+
(std::tolower(key.front()) != TKeyPrefix::TypeTmpData))) {
1103+
continue;
1104+
}
1105+
1106+
keys.push_back(key);
1107+
}
1108+
1109+
return keys;
1110+
}
1111+
1112+
size_t TFixture::GetPQCacheRenameKeysCount()
1113+
{
1114+
using namespace NKikimr::NPQ;
1115+
1116+
auto& runtime = Setup->GetRuntime();
1117+
TActorId edge = runtime.AllocateEdgeActor();
1118+
1119+
auto request = MakeHolder<TEvPqCache::TEvCacheKeysRequest>();
1120+
1121+
runtime.Send(MakePersQueueL2CacheID(), edge, request.Release());
1122+
1123+
TAutoPtr<IEventHandle> handle;
1124+
auto* result = runtime.GrabEdgeEvent<TEvPqCache::TEvCacheKeysResponse>(handle);
1125+
1126+
return result->RenamedKeys;
1127+
}
1128+
10871129
void TFixture::RestartLongTxService()
10881130
{
10891131
auto& runtime = Setup->GetRuntime();
@@ -2579,6 +2621,55 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture)
25792621
UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2);
25802622
}
25812623

2624+
Y_UNIT_TEST_F(WriteToTopic_Demo_50, TFixture)
2625+
{
2626+
// We write to the topic in the transaction. When a transaction is committed, the keys in the blob
2627+
// cache are renamed.
2628+
CreateTopic("topic_A", TEST_CONSUMER);
2629+
CreateTopic("topic_B", TEST_CONSUMER);
2630+
2631+
TString message(128_KB, 'x');
2632+
2633+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message);
2634+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1);
2635+
2636+
auto session = CreateTableSession();
2637+
2638+
// tx #1
2639+
// After the transaction commit, there will be no large blobs in the batches. The number of renames
2640+
// will not change in the cache.
2641+
auto tx = BeginTx(session);
2642+
2643+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx);
2644+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx);
2645+
2646+
UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0);
2647+
2648+
CommitTx(tx, EStatus::SUCCESS);
2649+
2650+
Sleep(TDuration::Seconds(5));
2651+
2652+
UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0);
2653+
2654+
// tx #2
2655+
// After the commit, the party will rename one big blob
2656+
tx = BeginTx(session);
2657+
2658+
for (unsigned i = 0; i < 80; ++i) {
2659+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx);
2660+
}
2661+
2662+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx);
2663+
2664+
UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0);
2665+
2666+
CommitTx(tx, EStatus::SUCCESS);
2667+
2668+
Sleep(TDuration::Seconds(5));
2669+
2670+
UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 1);
2671+
}
2672+
25822673
class TFixtureSinks : public TFixture {
25832674
protected:
25842675
void CreateRowTable(const TString& path);

0 commit comments

Comments
 (0)