Skip to content

Commit ef5d702

Browse files
The PQ tablet loses its TEvReadSet (#17842) (#17853)
1 parent 5cd3c53 commit ef5d702

File tree

3 files changed

+156
-18
lines changed

3 files changed

+156
-18
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3141,6 +3141,11 @@ void TPersQueue::SetTxInFlyCounter()
31413141

31423142
void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx)
31433143
{
3144+
if (!InitCompleted) {
3145+
AddPendingEvent(ev.Release());
3146+
return;
3147+
}
3148+
31443149
PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal");
31453150

31463151
NKikimrPQ::TEvCancelTransactionProposal& event = ev->Get()->Record;
@@ -3157,6 +3162,11 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co
31573162

31583163
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
31593164
{
3165+
if (!InitCompleted) {
3166+
AddPendingEvent(ev.Release());
3167+
return;
3168+
}
3169+
31603170
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
31613171
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << event.ShortDebugString());
31623172

@@ -3336,6 +3346,11 @@ void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransa
33363346

33373347
void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx)
33383348
{
3349+
if (!InitCompleted) {
3350+
AddPendingEvent(ev.Release());
3351+
return;
3352+
}
3353+
33393354
PQ_LOG_D("Handle TEvTxProcessing::TEvPlanStep " << ev->Get()->Record.ShortDebugString());
33403355

33413356
EvPlanStepQueue.emplace_back(ev->Sender, ev->Release().Release());
@@ -3345,6 +3360,11 @@ void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorCont
33453360

33463361
void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorContext& ctx)
33473362
{
3363+
if (!InitCompleted) {
3364+
AddPendingEvent(ev.Release());
3365+
return;
3366+
}
3367+
33483368
PQ_LOG_D("Handle TEvTxProcessing::TEvReadSet " << ev->Get()->Record.ShortDebugString());
33493369

33503370
NKikimrTx::TEvReadSet& event = ev->Get()->Record;
@@ -3356,7 +3376,8 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte
33563376
}
33573377

33583378
if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->PredicatesReceived.contains(event.GetTabletProducer())) {
3359-
if (tx->State >= NKikimrPQ::TTransaction::EXECUTED) {
3379+
if ((tx->State > NKikimrPQ::TTransaction::EXECUTED) ||
3380+
((tx->State == NKikimrPQ::TTransaction::EXECUTED) && !tx->WriteInProgress)) {
33603381
if (ack) {
33613382
PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer());
33623383
ctx.Send(ev->Sender, ack.release());
@@ -3733,15 +3754,17 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
37333754
void TPersQueue::ProcessWriteTxs(const TActorContext& ctx,
37343755
NKikimrClient::TKeyValueRequest& request)
37353756
{
3736-
Y_ABORT_UNLESS(!WriteTxsInProgress);
3757+
Y_ABORT_UNLESS(!WriteTxsInProgress, "PQ %" PRIu64, TabletID());
37373758

37383759
for (auto& [txId, state] : WriteTxs) {
3760+
// There may be cases when in one iteration of a record we change the state of a transaction and delete it
37393761
auto tx = GetTransaction(ctx, txId);
3740-
Y_ABORT_UNLESS(tx);
3741-
3742-
tx->AddCmdWrite(request, state);
3762+
if (tx) {
3763+
PQ_LOG_D("write key for TxId " << txId);
3764+
tx->AddCmdWrite(request, state);
37433765

3744-
ChangedTxs.emplace(tx->Step, txId);
3766+
ChangedTxs.emplace(tx->Step, txId);
3767+
}
37453768
}
37463769

37473770
WriteTxs.clear();
@@ -3750,9 +3773,7 @@ void TPersQueue::ProcessWriteTxs(const TActorContext& ctx,
37503773
void TPersQueue::ProcessDeleteTxs(const TActorContext& ctx,
37513774
NKikimrClient::TKeyValueRequest& request)
37523775
{
3753-
Y_ABORT_UNLESS(!WriteTxsInProgress,
3754-
"PQ %" PRIu64,
3755-
TabletID());
3776+
Y_ABORT_UNLESS(!WriteTxsInProgress, "PQ %" PRIu64, TabletID());
37563777

37573778
for (ui64 txId : DeleteTxs) {
37583779
PQ_LOG_D("delete key for TxId " << txId);
@@ -4860,8 +4881,9 @@ void TPersQueue::DeleteSupportivePartitions(const TActorContext& ctx)
48604881
void TPersQueue::OnInitComplete(const TActorContext& ctx)
48614882
{
48624883
SignalTabletActive(ctx);
4863-
TryStartTransaction(ctx);
48644884
InitCompleted = true;
4885+
ProcessPendingEvents();
4886+
TryStartTransaction(ctx);
48654887
}
48664888

48674889
ui64 TPersQueue::GetAllowedStep() const
@@ -4897,6 +4919,11 @@ void TPersQueue::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
48974919

48984920
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, const TActorContext &ctx)
48994921
{
4922+
if (!InitCompleted) {
4923+
AddPendingEvent(ev.Release());
4924+
return;
4925+
}
4926+
49004927
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransactionAttach " << ev->Get()->Record.ShortDebugString());
49014928

49024929
const ui64 txId = ev->Get()->Record.GetTxId();
@@ -5107,6 +5134,21 @@ ui64 TPersQueue::GetGeneration() {
51075134
return *TabletGeneration;
51085135
}
51095136

5137+
void TPersQueue::AddPendingEvent(IEventHandle* ev)
5138+
{
5139+
PendingEvents.emplace_back(ev);
5140+
}
5141+
5142+
void TPersQueue::ProcessPendingEvents()
5143+
{
5144+
auto events = std::move(PendingEvents);
5145+
PendingEvents.clear();
5146+
5147+
for (auto& ev : events) {
5148+
HandleHook(ev);
5149+
}
5150+
}
5151+
51105152
bool TPersQueue::HandleHook(STFUNC_SIG)
51115153
{
51125154
SetActivityType(NKikimrServices::TActivity::PERSQUEUE_ACTOR);

ydb/core/persqueue/pq_impl.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,11 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
561561
void ResendEvReadSetToReceiversForState(const TActorContext& ctx, NKikimrPQ::TTransaction::EState state);
562562

563563
void DeleteSupportivePartitions(const TActorContext& ctx);
564+
565+
TDeque<TAutoPtr<IEventHandle>> PendingEvents;
566+
567+
void AddPendingEvent(IEventHandle* ev);
568+
void ProcessPendingEvents();
564569
};
565570

566571

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 99 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
197197

198198
void WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher);
199199
void SendReadSetAck(NHelpers::TPQTabletMock& tablet);
200+
void WaitForNoReadSetAck(NHelpers::TPQTabletMock& tablet);
200201

201202
void SendDropTablet(const TDropTabletParams& params);
202203
void WaitDropTabletReply(const TDropTabletReplyMatcher& matcher);
@@ -206,7 +207,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
206207

207208
void SendCancelTransactionProposal(const TCancelTransactionProposalParams& params);
208209

209-
void StartPQWriteTxsObserver();
210+
void StartPQWriteTxsObserver(TAutoPtr<IEventHandle>* ev = nullptr);
210211
void WaitForPQWriteTxs();
211212

212213
template <class T> void WaitForEvent(size_t count);
@@ -215,7 +216,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
215216

216217
void TestWaitingForTEvReadSet(size_t senders, size_t receivers);
217218

218-
void StartPQWriteObserver(bool& flag, unsigned cookie);
219+
void StartPQWriteObserver(bool& flag, unsigned cookie, TAutoPtr<IEventHandle>* ev = nullptr);
219220
void WaitForPQWriteComplete(bool& flag);
220221

221222
bool FoundPQWriteState = false;
@@ -241,6 +242,9 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
241242
void WaitForTxState(ui64 txId, NKikimrPQ::TTransaction::EState state);
242243
void WaitForExecStep(ui64 step);
243244

245+
void InterceptSaveTxState(TAutoPtr<IEventHandle>& event);
246+
void SendSaveTxState(TAutoPtr<IEventHandle>& event);
247+
244248
//
245249
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
246250
//
@@ -522,6 +526,17 @@ void TPQTabletFixture::WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TRe
522526
}
523527
}
524528

529+
void TPQTabletFixture::WaitForNoReadSetAck(NHelpers::TPQTabletMock& tablet)
530+
{
531+
TDispatchOptions options;
532+
options.CustomFinalCondition = [&]() {
533+
return tablet.ReadSetAck.Defined();
534+
};
535+
Ctx->Runtime->DispatchEvents(options, TDuration::Seconds(2));
536+
537+
UNIT_ASSERT(!tablet.ReadSetAck.Defined());
538+
}
539+
525540
void TPQTabletFixture::SendDropTablet(const TDropTabletParams& params)
526541
{
527542
auto event = MakeHolder<TEvPersQueue::TEvDropTablet>();
@@ -745,18 +760,21 @@ void TPQTabletFixture::WaitWriteResponse(const TWriteResponseMatcher& matcher)
745760
Ctx->Runtime->SetObserverFunc(prev);
746761
}
747762

748-
void TPQTabletFixture::StartPQWriteObserver(bool& flag, unsigned cookie)
763+
void TPQTabletFixture::StartPQWriteObserver(bool& flag, unsigned cookie, TAutoPtr<IEventHandle>* ev)
749764
{
750765
flag = false;
751766

752-
auto observer = [&flag, cookie](TAutoPtr<IEventHandle>& event) {
767+
auto observer = [&flag, cookie, ev](TAutoPtr<IEventHandle>& event) {
753768
if (auto* kvResponse = event->CastAsLocal<TEvKeyValue::TEvResponse>()) {
754-
if (kvResponse->Record.HasCookie()) {
755-
}
756769
if ((event->Sender == event->Recipient) &&
757770
kvResponse->Record.HasCookie() &&
758771
(kvResponse->Record.GetCookie() == cookie)) {
759772
flag = true;
773+
774+
if (ev) {
775+
*ev = event;
776+
return TTestActorRuntimeBase::EEventAction::DROP;
777+
}
760778
}
761779
}
762780

@@ -793,9 +811,9 @@ void TPQTabletFixture::SendCancelTransactionProposal(const TCancelTransactionPro
793811
event.Release());
794812
}
795813

796-
void TPQTabletFixture::StartPQWriteTxsObserver()
814+
void TPQTabletFixture::StartPQWriteTxsObserver(TAutoPtr<IEventHandle>* event)
797815
{
798-
StartPQWriteObserver(FoundPQWriteTxs, 5); // TPersQueue::WRITE_TX_COOKIE
816+
StartPQWriteObserver(FoundPQWriteTxs, 5, event); // TPersQueue::WRITE_TX_COOKIE
799817
}
800818

801819
void TPQTabletFixture::WaitForPQWriteTxs()
@@ -1030,6 +1048,40 @@ void TPQTabletFixture::WaitForExecStep(ui64 step)
10301048
UNIT_FAIL("expected execution step " << step);
10311049
}
10321050

1051+
void TPQTabletFixture::InterceptSaveTxState(TAutoPtr<IEventHandle>& ev)
1052+
{
1053+
bool found = false;
1054+
1055+
TTestActorRuntimeBase::TEventFilter prev;
1056+
auto filter = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) -> bool {
1057+
if (auto* msg = event->CastAsLocal<TEvKeyValue::TEvRequest>()) {
1058+
if (msg->Record.HasCookie() && (msg->Record.GetCookie() == 5)) { // WRITE_TX_COOKIE
1059+
ev = event;
1060+
found = true;
1061+
return true;
1062+
}
1063+
}
1064+
1065+
return false;
1066+
};
1067+
prev = Ctx->Runtime->SetEventFilter(filter);
1068+
1069+
TDispatchOptions options;
1070+
options.CustomFinalCondition = [&found]() {
1071+
return found;
1072+
};
1073+
1074+
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
1075+
UNIT_ASSERT(found);
1076+
1077+
Ctx->Runtime->SetEventFilter(prev);
1078+
}
1079+
1080+
void TPQTabletFixture::SendSaveTxState(TAutoPtr<IEventHandle>& event)
1081+
{
1082+
Ctx->Runtime->Send(event);
1083+
}
1084+
10331085
Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture)
10341086
{
10351087
TestParallelTransactions("consumer", "consumer");
@@ -1926,6 +1978,45 @@ Y_UNIT_TEST_F(After_Restarting_The_Tablet_Sends_A_TEvReadSet_For_Transactions_In
19261978
WaitReadSetEx(*tablet, {.Step=100, .TxId=txId_1, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Count=2});
19271979
}
19281980

1981+
Y_UNIT_TEST_F(TEvReadSet_Is_Not_Sent_Ahead_Of_Time, TPQTabletFixture)
1982+
{
1983+
const ui64 txId = 67890;
1984+
const ui64 mockTabletId = 22222;
1985+
1986+
NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
1987+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
1988+
1989+
SendProposeTransactionRequest({.TxId=txId,
1990+
.Senders={mockTabletId}, .Receivers={mockTabletId},
1991+
.TxOps={
1992+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
1993+
}});
1994+
WaitProposeTransactionResponse({.TxId=txId,
1995+
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
1996+
1997+
SendPlanStep({.Step=100, .TxIds={txId}});
1998+
1999+
WaitForCalcPredicateResult();
2000+
2001+
tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
2002+
2003+
//WaitProposeTransactionResponse({.TxId=txId,
2004+
// .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
2005+
2006+
TAutoPtr<IEventHandle> kvRequest;
2007+
InterceptSaveTxState(kvRequest);
2008+
2009+
tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
2010+
2011+
WaitForNoReadSetAck(*tablet);
2012+
2013+
SendSaveTxState(kvRequest);
2014+
2015+
WaitForTxState(txId, NKikimrPQ::TTransaction::EXECUTED);
2016+
2017+
WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId});
2018+
}
2019+
19292020
}
19302021

19312022
}

0 commit comments

Comments
 (0)