9
9
#include < ydb/core/persqueue/key.h>
10
10
#include < ydb/core/persqueue/blob.h>
11
11
#include < ydb/core/persqueue/events/global.h>
12
+ #include < ydb/core/persqueue/pq_l2_service.h>
12
13
#include < ydb/core/tx/long_tx_service/public/events.h>
13
14
14
15
#include < ydb/core/persqueue/ut/common/autoscaling_ut_common.h>
@@ -237,6 +238,8 @@ class TFixture : public NUnitTest::TBaseFixture {
237
238
virtual bool GetEnableHtapTx () const ;
238
239
virtual bool GetAllowOlapDataQuery () const ;
239
240
241
+ size_t GetPQCacheRenameKeysCount ();
242
+
240
243
private:
241
244
template <class E >
242
245
E ReadEvent (TTopicReadSessionPtr reader, NTable::TTransaction& tx);
@@ -248,6 +251,8 @@ class TFixture : public NUnitTest::TBaseFixture {
248
251
ui32 partition);
249
252
std::vector<std::string> GetTabletKeys (const TActorId& actorId,
250
253
ui64 tabletId);
254
+ std::vector<std::string> GetPQTabletDataKeys (const TActorId& actorId,
255
+ ui64 tabletId);
251
256
NPQ::TWriteId GetTransactionWriteId (const TActorId& actorId,
252
257
ui64 tabletId);
253
258
void SendLongTxLockStatus (const TActorId& actorId,
@@ -1084,6 +1089,43 @@ std::vector<std::string> TFixture::GetTabletKeys(const TActorId& actorId,
1084
1089
return keys;
1085
1090
}
1086
1091
1092
+ std::vector<std::string> TFixture::GetPQTabletDataKeys (const TActorId& actorId,
1093
+ ui64 tabletId)
1094
+ {
1095
+ using namespace NKikimr ::NPQ;
1096
+
1097
+ std::vector<std::string> keys;
1098
+
1099
+ for (const auto & key : GetTabletKeys (actorId, tabletId)) {
1100
+ if (key.empty () ||
1101
+ ((std::tolower (key.front ()) != TKeyPrefix::TypeData) &&
1102
+ (std::tolower (key.front ()) != TKeyPrefix::TypeTmpData))) {
1103
+ continue ;
1104
+ }
1105
+
1106
+ keys.push_back (key);
1107
+ }
1108
+
1109
+ return keys;
1110
+ }
1111
+
1112
+ size_t TFixture::GetPQCacheRenameKeysCount ()
1113
+ {
1114
+ using namespace NKikimr ::NPQ;
1115
+
1116
+ auto & runtime = Setup->GetRuntime ();
1117
+ TActorId edge = runtime.AllocateEdgeActor ();
1118
+
1119
+ auto request = MakeHolder<TEvPqCache::TEvCacheKeysRequest>();
1120
+
1121
+ runtime.Send (MakePersQueueL2CacheID (), edge, request.Release ());
1122
+
1123
+ TAutoPtr<IEventHandle> handle;
1124
+ auto * result = runtime.GrabEdgeEvent <TEvPqCache::TEvCacheKeysResponse>(handle);
1125
+
1126
+ return result->RenamedKeys ;
1127
+ }
1128
+
1087
1129
void TFixture::RestartLongTxService ()
1088
1130
{
1089
1131
auto & runtime = Setup->GetRuntime ();
@@ -2579,6 +2621,55 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture)
2579
2621
UNIT_ASSERT_GT (topicDescription.GetTotalPartitionsCount (), 2 );
2580
2622
}
2581
2623
2624
+ Y_UNIT_TEST_F (WriteToTopic_Demo_50, TFixture)
2625
+ {
2626
+ // We write to the topic in the transaction. When a transaction is committed, the keys in the blob
2627
+ // cache are renamed.
2628
+ CreateTopic (" topic_A" , TEST_CONSUMER);
2629
+ CreateTopic (" topic_B" , TEST_CONSUMER);
2630
+
2631
+ TString message (128_KB, ' x' );
2632
+
2633
+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, message);
2634
+ WaitForAcks (" topic_A" , TEST_MESSAGE_GROUP_ID_1);
2635
+
2636
+ auto session = CreateTableSession ();
2637
+
2638
+ // tx #1
2639
+ // After the transaction commit, there will be no large blobs in the batches. The number of renames
2640
+ // will not change in the cache.
2641
+ auto tx = BeginTx (session);
2642
+
2643
+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_2, message, &tx);
2644
+ WriteToTopic (" topic_B" , TEST_MESSAGE_GROUP_ID_3, message, &tx);
2645
+
2646
+ UNIT_ASSERT_VALUES_EQUAL (GetPQCacheRenameKeysCount (), 0 );
2647
+
2648
+ CommitTx (tx, EStatus::SUCCESS);
2649
+
2650
+ Sleep (TDuration::Seconds (5 ));
2651
+
2652
+ UNIT_ASSERT_VALUES_EQUAL (GetPQCacheRenameKeysCount (), 0 );
2653
+
2654
+ // tx #2
2655
+ // After the commit, the party will rename one big blob
2656
+ tx = BeginTx (session);
2657
+
2658
+ for (unsigned i = 0 ; i < 80 ; ++i) {
2659
+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_2, message, &tx);
2660
+ }
2661
+
2662
+ WriteToTopic (" topic_B" , TEST_MESSAGE_GROUP_ID_3, message, &tx);
2663
+
2664
+ UNIT_ASSERT_VALUES_EQUAL (GetPQCacheRenameKeysCount (), 0 );
2665
+
2666
+ CommitTx (tx, EStatus::SUCCESS);
2667
+
2668
+ Sleep (TDuration::Seconds (5 ));
2669
+
2670
+ UNIT_ASSERT_VALUES_EQUAL (GetPQCacheRenameKeysCount (), 1 );
2671
+ }
2672
+
2582
2673
class TFixtureSinks : public TFixture {
2583
2674
protected:
2584
2675
void CreateRowTable (const TString& path);
0 commit comments