Skip to content

Commit 159be17

Browse files
Y_VERIFY in the RenameFormedBlobs function (#8916)
1 parent 6ee3d1c commit 159be17

File tree

7 files changed

+140
-21
lines changed

7 files changed

+140
-21
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ TString TPartition::LogPrefix() const {
8282
} else {
8383
state = "Unknown";
8484
}
85-
return TStringBuilder() << "[PQ: " << TabletID << ", Partition:" << Partition << ", State:" << state << "] ";
85+
return TStringBuilder() << "[PQ: " << TabletID << ", Partition: " << Partition << ", State: " << state << "] ";
8686
}
8787

8888
bool TPartition::IsActive() const {
@@ -2149,6 +2149,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
21492149

21502150
void TPartition::CommitWriteOperations(TTransaction& t)
21512151
{
2152+
PQ_LOG_D("TPartition::CommitWriteOperations TxId: " << t.GetTxId());
2153+
21522154
Y_ABORT_UNLESS(PersistRequest);
21532155
Y_ABORT_UNLESS(!PartitionedBlob.IsInited());
21542156

@@ -2166,6 +2168,10 @@ void TPartition::CommitWriteOperations(TTransaction& t)
21662168
HaveWriteMsg = true;
21672169
}
21682170

2171+
PQ_LOG_D("t.WriteInfo->BodyKeys.size=" << t.WriteInfo->BodyKeys.size() <<
2172+
", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size());
2173+
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);
2174+
21692175
if (!t.WriteInfo->BodyKeys.empty()) {
21702176
PartitionedBlob = TPartitionedBlob(Partition,
21712177
NewHead.Offset,
@@ -2180,6 +2186,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
21802186
MaxBlobSize);
21812187

21822188
for (auto& k : t.WriteInfo->BodyKeys) {
2189+
PQ_LOG_D("add key " << k.Key.ToString());
21832190
auto write = PartitionedBlob.Add(k.Key, k.Size);
21842191
if (write && !write->Value.empty()) {
21852192
AddCmdWrite(write, PersistRequest.Get(), ctx);
@@ -2188,18 +2195,17 @@ void TPartition::CommitWriteOperations(TTransaction& t)
21882195
}
21892196
}
21902197

2191-
}
21922198

2193-
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
2194-
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
2195-
RenameFormedBlobs(formedBlobs,
2196-
*Parameters,
2197-
curWrites,
2198-
PersistRequest.Get(),
2199-
ctx);
2200-
}
2199+
PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
2200+
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
2201+
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
2202+
RenameFormedBlobs(formedBlobs,
2203+
*Parameters,
2204+
curWrites,
2205+
PersistRequest.Get(),
2206+
ctx);
2207+
}
22012208

2202-
if (!t.WriteInfo->BodyKeys.empty()) {
22032209
const auto& last = t.WriteInfo->BodyKeys.back();
22042210

22052211
NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount());

ydb/core/persqueue/partition_id.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <util/system/types.h>
88
#include <util/digest/multi.h>
99
#include <util/str_stl.h>
10+
#include <util/string/builder.h>
1011

1112
#include <functional>
1213

@@ -51,6 +52,13 @@ class TPartitionId {
5152
}
5253
}
5354

55+
TString ToString() const
56+
{
57+
TStringBuilder s;
58+
s << *this;
59+
return s;
60+
}
61+
5462
bool IsSupportivePartition() const
5563
{
5664
return WriteId.Defined();

ydb/core/persqueue/partition_write.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,16 +1066,16 @@ void TPartition::RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFor
10661066
}
10671067
if (!DataKeysBody.empty() && CompactedKeys.empty()) {
10681068
Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.NewKey.GetOffset(),
1069+
"PQ: %" PRIu64 ", Partition: %s, "
10691070
"LAST KEY %s, HeadOffset %lu, NEWKEY %s",
1071+
TabletID, Partition.ToString().c_str(),
10701072
DataKeysBody.back().Key.ToString().c_str(),
10711073
Head.Offset,
10721074
x.NewKey.ToString().c_str());
10731075
}
1074-
LOG_DEBUG_S(
1075-
ctx, NKikimrServices::PERSQUEUE,
1076-
"writing blob: topic '" << TopicName() << "' partition " << Partition
1077-
<< " " << x.OldKey.ToString() << " size " << x.Size << " WTime " << ctx.Now().MilliSeconds()
1078-
);
1076+
PQ_LOG_D("writing blob: topic '" << TopicName() << "' partition " << Partition <<
1077+
" old key " << x.OldKey.ToString() << " new key " << x.NewKey.ToString() <<
1078+
" size " << x.Size << " WTime " << ctx.Now().MilliSeconds());
10791079

10801080
CompactedKeys.emplace_back(x.NewKey, x.Size);
10811081
}

ydb/core/persqueue/pq_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4661,6 +4661,8 @@ void TPersQueue::TryStartTransaction(const TActorContext& ctx)
46614661
Y_ABORT_UNLESS(next);
46624662

46634663
CheckTxState(ctx, *next);
4664+
4665+
TryWriteTxs(ctx);
46644666
}
46654667

46664668
void TPersQueue::OnInitComplete(const TActorContext& ctx)

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
12481248

12491249
// get records
12501250
{
1251+
WaitForDataRecords(client, shardIt);
1252+
12511253
auto res = client.GetRecords(shardIt).ExtractValueSync();
12521254
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
12531255
UNIT_ASSERT_VALUES_EQUAL(res.GetResult().records().size(), records.size());
@@ -1269,6 +1271,19 @@ Y_UNIT_TEST_SUITE(Cdc) {
12691271
}
12701272
}
12711273

1274+
static void WaitForDataRecords(TDataStreamsClient& client, const TString& shardIt) {
1275+
int n = 0;
1276+
for (; n < 100; ++n) {
1277+
auto res = client.GetRecords(shardIt).ExtractValueSync();
1278+
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
1279+
if (res.GetResult().records().size()) {
1280+
break;
1281+
}
1282+
Sleep(TDuration::MilliSeconds(100));
1283+
}
1284+
UNIT_ASSERT_VALUES_UNEQUAL(n, 100);
1285+
}
1286+
12721287
static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) {
12731288
TTestYdsEnv env(tableDesc, streamDesc);
12741289

ydb/core/tx/schemeshard/ut_base/ut_base.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6331,6 +6331,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
63316331
"PartitionPerTablet: 10 "
63326332
"PQTabletConfig: {PartitionConfig { LifetimeSeconds : 10}}"
63336333
);
6334+
env.TestWaitNotification(runtime, txId);
63346335

63356336
TestDescribeResult(DescribePath(runtime, "/MyRoot/DirA/PQGroup_1", true),
63366337
{NLs::CheckPartCount("PQGroup_1", 100, 10, 10, 100),
@@ -6853,7 +6854,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
68536854
AsyncForceDropUnsafe(runtime, ++txId, pVer.PathId.LocalPathId);
68546855

68556856
TestModificationResult(runtime, txId-2, NKikimrScheme::StatusAccepted);
6856-
TestModificationResult(runtime, txId-1, NKikimrScheme::StatusAccepted);
6857+
TestModificationResults(runtime, txId-1, {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications});
68576858
TestModificationResult(runtime, txId, NKikimrScheme::StatusAccepted);
68586859

68596860
TActorId sender = runtime.AllocateEdgeActor();

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ class TFixture : public NUnitTest::TBaseFixture {
152152

153153
void TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params);
154154

155+
void WriteMessagesInTx(size_t big, size_t small);
156+
155157
const TDriver& GetDriver() const;
156158

157159
void CheckTabletKeys(const TString& topicName);
@@ -1611,21 +1613,22 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)
16111613

16121614
for (size_t i = 0; i < params.OldHeadCount; ++i) {
16131615
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'));
1616+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
16141617
++oldHeadMsgCount;
16151618
}
16161619

16171620
for (size_t i = 0; i < params.BigBlobsCount; ++i) {
1618-
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'900'000, 'x'), &tx);
1621+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx);
1622+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
16191623
++bigBlobMsgCount;
16201624
}
16211625

16221626
for (size_t i = 0; i < params.NewHeadCount; ++i) {
16231627
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx);
1628+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
16241629
++newHeadMsgCount;
16251630
}
16261631

1627-
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
1628-
16291632
if (params.RestartMode == ERestartBeforeCommit) {
16301633
RestartPQTablet("topic_A", 0);
16311634
}
@@ -1654,7 +1657,7 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)
16541657
start += oldHeadMsgCount;
16551658

16561659
for (size_t i = 0; i < bigBlobMsgCount; ++i) {
1657-
UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'900'000);
1660+
UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'000'000);
16581661
}
16591662
start += bigBlobMsgCount;
16601663

@@ -1921,6 +1924,90 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture)
19211924
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
19221925
}
19231926

1927+
void TFixture::WriteMessagesInTx(size_t big, size_t small)
1928+
{
1929+
CreateTopic("topic_A", TEST_CONSUMER);
1930+
1931+
NTable::TSession tableSession = CreateTableSession();
1932+
NTable::TTransaction tx = BeginTx(tableSession);
1933+
1934+
for (size_t i = 0; i < big; ++i) {
1935+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0);
1936+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
1937+
}
1938+
1939+
for (size_t i = 0; i < small; ++i) {
1940+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0);
1941+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
1942+
}
1943+
1944+
CommitTx(tx, EStatus::SUCCESS);
1945+
}
1946+
1947+
Y_UNIT_TEST_F(WriteToTopic_Demo_29, TFixture)
1948+
{
1949+
WriteMessagesInTx(1, 0);
1950+
WriteMessagesInTx(1, 0);
1951+
}
1952+
1953+
Y_UNIT_TEST_F(WriteToTopic_Demo_30, TFixture)
1954+
{
1955+
WriteMessagesInTx(1, 0);
1956+
WriteMessagesInTx(0, 1);
1957+
}
1958+
1959+
Y_UNIT_TEST_F(WriteToTopic_Demo_31, TFixture)
1960+
{
1961+
WriteMessagesInTx(1, 0);
1962+
WriteMessagesInTx(1, 1);
1963+
}
1964+
1965+
Y_UNIT_TEST_F(WriteToTopic_Demo_32, TFixture)
1966+
{
1967+
WriteMessagesInTx(0, 1);
1968+
WriteMessagesInTx(1, 0);
1969+
}
1970+
1971+
Y_UNIT_TEST_F(WriteToTopic_Demo_33, TFixture)
1972+
{
1973+
WriteMessagesInTx(0, 1);
1974+
WriteMessagesInTx(0, 1);
1975+
}
1976+
1977+
Y_UNIT_TEST_F(WriteToTopic_Demo_34, TFixture)
1978+
{
1979+
WriteMessagesInTx(0, 1);
1980+
WriteMessagesInTx(1, 1);
1981+
}
1982+
1983+
Y_UNIT_TEST_F(WriteToTopic_Demo_35, TFixture)
1984+
{
1985+
WriteMessagesInTx(1, 1);
1986+
WriteMessagesInTx(1, 0);
1987+
}
1988+
1989+
Y_UNIT_TEST_F(WriteToTopic_Demo_36, TFixture)
1990+
{
1991+
WriteMessagesInTx(1, 1);
1992+
WriteMessagesInTx(0, 1);
1993+
}
1994+
1995+
Y_UNIT_TEST_F(WriteToTopic_Demo_37, TFixture)
1996+
{
1997+
WriteMessagesInTx(1, 1);
1998+
WriteMessagesInTx(1, 1);
1999+
}
2000+
2001+
2002+
Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
2003+
{
2004+
WriteMessagesInTx(2, 202);
2005+
WriteMessagesInTx(2, 200);
2006+
WriteMessagesInTx(0, 1);
2007+
WriteMessagesInTx(4, 0);
2008+
WriteMessagesInTx(0, 1);
2009+
}
2010+
19242011
}
19252012

19262013
}

0 commit comments

Comments
 (0)