Skip to content

Commit 356f091

Browse files
The commit of a transaction in the SDK freezes (#17120)
1 parent b3e1db3 commit 356f091

File tree

4 files changed

+156
-10
lines changed

4 files changed

+156
-10
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -498,24 +498,32 @@ void TPartition::DestroyActor(const TActorContext& ctx)
498498
{
499499
// Reply to all outstanding requests in order to destroy corresponding actors
500500

501+
NPersQueue::NErrorCode::EErrorCode errorCode;
501502
TStringBuilder ss;
502-
ss << "Tablet is restarting, topic '" << TopicName() << "'";
503+
504+
if (IsSupportive()) {
505+
errorCode = NPersQueue::NErrorCode::ERROR;
506+
ss << "The transaction is completed";
507+
} else {
508+
errorCode = NPersQueue::NErrorCode::INITIALIZING;
509+
ss << "Tablet is restarting, topic '" << TopicName() << "'";
510+
}
503511

504512
for (const auto& ev : WaitToChangeOwner) {
505-
ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::INITIALIZING, ss);
513+
ReplyError(ctx, ev->Cookie, errorCode, ss);
506514
}
507515

508516
for (auto& w : PendingRequests) {
509-
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::INITIALIZING, ss);
517+
ReplyError(ctx, w.GetCookie(), errorCode, ss);
510518
w.Span.EndError(static_cast<const TString&>(ss));
511519
}
512520

513521
for (const auto& w : Responses) {
514-
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::INITIALIZING, TStringBuilder() << ss << " (WriteResponses)");
522+
ReplyError(ctx, w.GetCookie(), errorCode, TStringBuilder() << ss << " (WriteResponses)");
515523
}
516524

517525
for (const auto& ri : ReadInfo) {
518-
ReplyError(ctx, ri.second.Destination, NPersQueue::NErrorCode::INITIALIZING,
526+
ReplyError(ctx, ri.second.Destination, errorCode,
519527
TStringBuilder() << ss << " (ReadInfo) cookie " << ri.first);
520528
}
521529

@@ -3553,7 +3561,13 @@ void TPartition::Handle(TEvPQ::TEvApproveWriteQuota::TPtr& ev, const TActorConte
35533561
TopicWriteQuotaWaitCounter->IncFor(TopicQuotaWaitTimeForCurrentBlob.MilliSeconds());
35543562
}
35553563

3556-
RequestBlobQuota();
3564+
if (NeedDeletePartition) {
3565+
// deferred TEvPQ::TEvDeletePartition
3566+
DeletePartitionState = DELETION_INITED;
3567+
} else {
3568+
RequestBlobQuota();
3569+
}
3570+
35573571
ProcessTxsAndUserActs(ctx);
35583572
}
35593573

@@ -3657,6 +3671,13 @@ void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvDeletePartition>
36573671
Y_ABORT_UNLESS(IsSupportive());
36583672
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
36593673

3674+
NeedDeletePartition = true;
3675+
3676+
if (TopicQuotaRequestCookie != 0) {
3677+
// wait for TEvPQ::TEvApproveWriteQuota
3678+
return;
3679+
}
3680+
36603681
DeletePartitionState = DELETION_INITED;
36613682

36623683
ProcessTxsAndUserActs(ctx);

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -927,6 +927,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
927927
ui64 TopicQuotaRequestCookie = 0;
928928
ui64 NextTopicWriteQuotaRequestCookie = 1;
929929
ui64 BlobQuotaSize = 0;
930+
bool NeedDeletePartition = false;
930931

931932
// Wait topic quota metrics
932933
ui64 TotalPartitionWriteSpeed = 0;

ydb/core/persqueue/partition_write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1633,7 +1633,7 @@ bool TPartition::RequestBlobQuota()
16331633

16341634
void TPartition::HandlePendingRequests(const TActorContext& ctx)
16351635
{
1636-
if (WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx)) {
1636+
if (WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx) || NeedDeletePartition) {
16371637
return;
16381638
}
16391639
if (RequestBlobQuota()) {

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
286286
void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false);
287287
void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true);
288288
void SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data,
289-
bool ignoreQuotaDeadline = false, ui64 seqNo = 0);
289+
bool ignoreQuotaDeadline = false, ui64 seqNo = 0, bool isDirectWrite = false);
290290
void SendGetWriteInfo();
291291
void ShadowPartitionCountersTest(bool isFirstClass);
292292

@@ -302,6 +302,14 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
302302
void SendEvent(IEventBase* event);
303303
void SendEvent(IEventBase* event, const TActorId& from, const TActorId& to);
304304

305+
THolder<TEvPQ::TEvApproveWriteQuota> WaitForRequestQuotaAndHoldApproveWriteQuota();
306+
void SendDeletePartition();
307+
void WaitForDeletePartitionDoneTimeout();
308+
void SendApproveWriteQuota(THolder<TEvPQ::TEvApproveWriteQuota>&& event);
309+
void WaitForQuotaConsumed();
310+
void WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode);
311+
void WaitForDeletePartitionDone();
312+
305313
TMaybe<TTestContext> Ctx;
306314
TMaybe<TFinalizer> Finalizer;
307315

@@ -639,7 +647,7 @@ void TPartitionFixture::SendReserveBytes(const ui64 cookie, const ui32 size, con
639647

640648
void TPartitionFixture::SendWrite
641649
(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data,
642-
bool ignoreQuotaDeadline, ui64 seqNo
650+
bool ignoreQuotaDeadline, ui64 seqNo, bool isDirectWrite
643651
) {
644652
TEvPQ::TEvWrite::TMsg msg;
645653
msg.SourceId = "SourceId";
@@ -661,7 +669,7 @@ void TPartitionFixture::SendWrite
661669
TVector<TEvPQ::TEvWrite::TMsg> msgs;
662670
msgs.push_back(msg);
663671

664-
auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), false, std::nullopt);
672+
auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), isDirectWrite, std::nullopt);
665673
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
666674
}
667675

@@ -1365,6 +1373,92 @@ void TPartitionFixture::TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration
13651373
}
13661374
}
13671375

1376+
THolder<TEvPQ::TEvApproveWriteQuota> TPartitionFixture::WaitForRequestQuotaAndHoldApproveWriteQuota()
1377+
{
1378+
THolder<TEvPQ::TEvApproveWriteQuota> approveWriteQuota;
1379+
1380+
auto observer = [&approveWriteQuota](TAutoPtr<IEventHandle>& ev) mutable {
1381+
if (auto* event = ev->CastAsLocal<TEvPQ::TEvApproveWriteQuota>()) {
1382+
approveWriteQuota = MakeHolder<TEvPQ::TEvApproveWriteQuota>(event->Cookie,
1383+
event->AccountQuotaWaitTime,
1384+
event->PartitionQuotaWaitTime);
1385+
return TTestActorRuntimeBase::EEventAction::DROP;
1386+
}
1387+
return TTestActorRuntimeBase::EEventAction::PROCESS;
1388+
};
1389+
auto prevObserver = Ctx->Runtime->SetObserverFunc(observer);
1390+
1391+
TDispatchOptions options;
1392+
options.CustomFinalCondition = [&]() {
1393+
return approveWriteQuota != nullptr;
1394+
};
1395+
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
1396+
1397+
Ctx->Runtime->SetObserverFunc(prevObserver);
1398+
1399+
UNIT_ASSERT(approveWriteQuota != nullptr);
1400+
1401+
return approveWriteQuota;
1402+
}
1403+
1404+
void TPartitionFixture::SendDeletePartition()
1405+
{
1406+
auto event = MakeHolder<TEvPQ::TEvDeletePartition>();
1407+
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
1408+
}
1409+
1410+
void TPartitionFixture::WaitForDeletePartitionDoneTimeout()
1411+
{
1412+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvDeletePartitionDone>(TDuration::Seconds(3));
1413+
UNIT_ASSERT_VALUES_EQUAL(event, nullptr);
1414+
}
1415+
1416+
void TPartitionFixture::SendApproveWriteQuota(THolder<TEvPQ::TEvApproveWriteQuota>&& event)
1417+
{
1418+
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
1419+
event = nullptr;
1420+
}
1421+
1422+
void TPartitionFixture::WaitForQuotaConsumed()
1423+
{
1424+
bool hasQuotaConsumed = false;
1425+
1426+
auto observer = [&hasQuotaConsumed](TAutoPtr<IEventHandle>& ev) mutable {
1427+
if (auto* event = ev->CastAsLocal<TEvPQ::TEvConsumed>()) {
1428+
hasQuotaConsumed = true;
1429+
}
1430+
return TTestActorRuntimeBase::EEventAction::PROCESS;
1431+
};
1432+
auto prevObserver = Ctx->Runtime->SetObserverFunc(observer);
1433+
1434+
TDispatchOptions options;
1435+
options.CustomFinalCondition = [&]() {
1436+
return hasQuotaConsumed;
1437+
};
1438+
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
1439+
1440+
Ctx->Runtime->SetObserverFunc(prevObserver);
1441+
1442+
UNIT_ASSERT(hasQuotaConsumed);
1443+
}
1444+
1445+
void TPartitionFixture::WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode)
1446+
{
1447+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvError>();
1448+
1449+
UNIT_ASSERT(event != nullptr);
1450+
1451+
UNIT_ASSERT_VALUES_EQUAL(cookie, event->Cookie);
1452+
UNIT_ASSERT_C(errorCode == event->ErrorCode, "extected: " << (int)errorCode << ", accepted: " << (int)event->ErrorCode);
1453+
}
1454+
1455+
void TPartitionFixture::WaitForDeletePartitionDone()
1456+
{
1457+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvDeletePartitionDone>();
1458+
1459+
UNIT_ASSERT(event != nullptr);
1460+
}
1461+
13681462
struct TTestUserAct {
13691463
TSrcIdMap SourceIds = {};
13701464
TString ClientId = {};
@@ -3478,6 +3572,36 @@ Y_UNIT_TEST_F(EndWriteTimestamp_HeadKeys, TPartitionFixture) {
34783572
UNIT_ASSERT_C(now - TDuration::Seconds(2) < endWriteTimestamp && endWriteTimestamp < now, "" << (now - TDuration::Seconds(2)) << " < " << endWriteTimestamp << " < " << now );
34793573
} // EndWriteTimestamp_FromMeta
34803574

3575+
Y_UNIT_TEST_F(The_DeletePartition_Message_Arrives_Before_The_ApproveWriteQuota_Message, TPartitionFixture)
3576+
{
3577+
// create a supportive partition
3578+
const TPartitionId partitionId{1, TWriteId{2, 3}, 4};
3579+
CreatePartition({.Partition=partitionId});
3580+
3581+
// write 2 messages in it
3582+
SendWrite(1, 0, "owner", 0, "message #1", false, 1, true);
3583+
SendWrite(2, 1, "owner", 1, "message #2", false, 2, true);
3584+
3585+
// delay the response from the quoter
3586+
auto approveWriteQuota = WaitForRequestQuotaAndHoldApproveWriteQuota();
3587+
3588+
// Send a `TEvDeletePartition`. The partition will wait for the response from the quoter to arrive.
3589+
SendDeletePartition();
3590+
WaitForDeletePartitionDoneTimeout();
3591+
3592+
// The answer is from the quoter
3593+
SendApproveWriteQuota(std::move(approveWriteQuota));
3594+
WaitForQuotaConsumed();
3595+
3596+
WaitCmdWrite();
3597+
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
3598+
3599+
// Write operations fail with an error
3600+
WaitForWriteError(1, NPersQueue::NErrorCode::ERROR);
3601+
WaitForDeletePartitionDone();
3602+
WaitForWriteError(2, NPersQueue::NErrorCode::ERROR);
3603+
}
3604+
34813605
} // End of suite
34823606

34833607
} // namespace

0 commit comments

Comments
 (0)