Skip to content

Commit 4256145

Browse files
An empty list of consumers for the background partition (#15889)
1 parent 99de01f commit 4256145

File tree

3 files changed

+63
-0
lines changed

3 files changed

+63
-0
lines changed
Binary file not shown.
Binary file not shown.

src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <library/cpp/logger/stream.h>
1818
#include <library/cpp/testing/unittest/registar.h>
19+
#include <library/cpp/streams/bzip2/bzip2.h>
1920

2021
namespace NYdb::NTopic::NTests {
2122

@@ -150,6 +151,9 @@ class TFixture : public NUnitTest::TBaseFixture {
150151
void RestartLongTxService();
151152
void RestartPQTablet(const TString& topicPath, ui32 partition);
152153
void DumpPQTabletKeys(const TString& topicName, ui32 partition);
154+
void PQTabletPrepareFromResource(const TString& topicPath,
155+
ui32 partitionId,
156+
const TString& resourceName);
153157

154158
void DeleteSupportivePartition(const TString& topicName,
155159
ui32 partition);
@@ -1857,6 +1861,51 @@ void TFixture::DumpPQTabletKeys(const TString& topicName)
18571861
}
18581862
}
18591863

1864+
void TFixture::PQTabletPrepareFromResource(const TString& topicPath,
1865+
ui32 partitionId,
1866+
const TString& resourceName)
1867+
{
1868+
auto& runtime = Setup->GetRuntime();
1869+
TActorId edge = runtime.AllocateEdgeActor();
1870+
ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicPath, partitionId);
1871+
1872+
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
1873+
size_t count = 0;
1874+
1875+
for (TStringStream stream(NResource::Find(resourceName)); true; ++count) {
1876+
TString key, encoded;
1877+
1878+
if (!stream.ReadTo(key, ' ')) {
1879+
break;
1880+
}
1881+
encoded = stream.ReadLine();
1882+
1883+
auto decoded = Base64Decode(encoded);
1884+
TStringInput decodedStream(decoded);
1885+
TBZipDecompress decompressor(&decodedStream);
1886+
1887+
auto* cmd = request->Record.AddCmdWrite();
1888+
cmd->SetKey(key);
1889+
cmd->SetValue(decompressor.ReadAll());
1890+
}
1891+
1892+
runtime.SendToPipe(tabletId, edge, request.Release(), 0, GetPipeConfigWithRetries());
1893+
1894+
TAutoPtr<IEventHandle> handle;
1895+
auto* response = runtime.GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
1896+
UNIT_ASSERT(response);
1897+
UNIT_ASSERT(response->Record.HasStatus());
1898+
UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
1899+
1900+
UNIT_ASSERT_VALUES_EQUAL(response->Record.WriteResultSize(), count);
1901+
1902+
for (size_t i = 0; i < response->Record.WriteResultSize(); ++i) {
1903+
const auto &result = response->Record.GetWriteResult(i);
1904+
UNIT_ASSERT(result.HasStatus());
1905+
UNIT_ASSERT_EQUAL(result.GetStatus(), NKikimrProto::OK);
1906+
}
1907+
}
1908+
18601909
void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestDescription& d)
18611910
{
18621911
for (auto& topic : d.Topics) {
@@ -3207,6 +3256,20 @@ Y_UNIT_TEST_F(Transactions_Conflict_On_SeqNo, TFixture)
32073256
UNIT_ASSERT_VALUES_UNEQUAL(successCount, TXS_COUNT);
32083257
}
32093258

3259+
Y_UNIT_TEST_F(The_Transaction_Starts_On_One_Version_And_Ends_On_The_Other, TFixture)
3260+
{
3261+
// In the test, we check the compatibility between versions `24-4-2` and `24-4-*/25-1-*`. To do this, the data
3262+
// obtained on the `24-4-2` version is loaded into the PQ tablets.
3263+
3264+
CreateTopic("topic_A", TEST_CONSUMER, 2);
3265+
3266+
PQTabletPrepareFromResource("topic_A", 0, "topic_A_partition_0_v24-4-2.dat");
3267+
PQTabletPrepareFromResource("topic_A", 1, "topic_A_partition_1_v24-4-2.dat");
3268+
3269+
RestartPQTablet("topic_A", 0);
3270+
RestartPQTablet("topic_A", 1);
3271+
}
3272+
32103273
}
32113274

32123275
}

0 commit comments

Comments
 (0)