Skip to content

Commit 239c220

Browse files
The PQ tablet does not receive a TEvTxCalcPredicateResult (#17497) (#17520)
1 parent a175611 commit 239c220

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2222,7 +2222,7 @@ void TPartition::AnswerCurrentReplies(const TActorContext& ctx)
22222222
TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t)
22232223
{
22242224
auto result = EProcessResult::Continue;
2225-
if (t->SupportivePartitionActor && !t->WriteInfo) { // Pending for write info
2225+
if (t->SupportivePartitionActor && !t->WriteInfo && !t->WriteInfoApplied) { // Pending for write info
22262226
return EProcessResult::NotReady;
22272227
}
22282228
if (t->WriteInfo && !t->WriteInfoApplied) { //Recieved write info but not applied

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,15 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
310310
void WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode);
311311
void WaitForDeletePartitionDone();
312312

313+
void SendCalcPredicate(ui64 step,
314+
ui64 txId,
315+
const TActorId& suppPartitionId);
316+
void WaitForGetWriteInfoRequest();
317+
void SendGetWriteInfoError(ui32 internalPartitionId,
318+
TString message,
319+
const TActorId& suppPartitionId);
320+
void WaitForCalcPredicateResult(ui64 txId, bool predicate);
321+
313322
TMaybe<TTestContext> Ctx;
314323
TMaybe<TFinalizer> Finalizer;
315324

@@ -2500,6 +2509,47 @@ void TPartitionFixture::CmdChangeOwner(ui64 cookie, const TString& sourceId, TDu
25002509
ownerCookie = event->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
25012510
}
25022511

2512+
void TPartitionFixture::SendCalcPredicate(ui64 step,
2513+
ui64 txId,
2514+
const TActorId& suppPartitionId)
2515+
{
2516+
SendCalcPredicate(step, txId, "", 0, 0, suppPartitionId);
2517+
}
2518+
2519+
void TPartitionFixture::WaitForGetWriteInfoRequest()
2520+
{
2521+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvGetWriteInfoRequest>();
2522+
UNIT_ASSERT(event != nullptr);
2523+
//UNIT_ASSERT_VALUES_EQUAL(event->OriginalPartition, ActorId);
2524+
}
2525+
2526+
void TPartitionFixture::SendGetWriteInfoError(ui32 internalPartitionId,
2527+
TString message,
2528+
const TActorId& suppPartitionId)
2529+
{
2530+
auto event = MakeHolder<TEvPQ::TEvGetWriteInfoError>(internalPartitionId,
2531+
std::move(message));
2532+
//event->SupportivePartition = suppPartitionId;
2533+
2534+
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, suppPartitionId, event.Release()));
2535+
}
2536+
2537+
void TPartitionFixture::WaitForCalcPredicateResult(ui64 txId, bool predicate)
2538+
{
2539+
while (true) {
2540+
TAutoPtr<IEventHandle> handle;
2541+
auto events =
2542+
Ctx->Runtime->GrabEdgeEvents<TEvPQ::TEvTxCalcPredicateResult, TEvKeyValue::TEvRequest>(handle,
2543+
TDuration::Seconds(1));
2544+
if (std::get<TEvKeyValue::TEvRequest*>(events)) {
2545+
SendDiskStatusResponse(nullptr);
2546+
} else if (auto* event = std::get<TEvPQ::TEvTxCalcPredicateResult*>(events)) {
2547+
UNIT_ASSERT_VALUES_EQUAL(event->TxId, txId);
2548+
UNIT_ASSERT_VALUES_EQUAL(event->Predicate, predicate);
2549+
break;
2550+
}
2551+
}
2552+
}
25032553

25042554
Y_UNIT_TEST_F(ReserveSubDomainOutOfSpace, TPartitionFixture)
25052555
{
@@ -3602,6 +3652,20 @@ Y_UNIT_TEST_F(The_DeletePartition_Message_Arrives_Before_The_ApproveWriteQuota_M
36023652
WaitForWriteError(2, NPersQueue::NErrorCode::ERROR);
36033653
}
36043654

3655+
Y_UNIT_TEST_F(After_TEvGetWriteInfoError_Comes_TEvTxCalcPredicateResult, TPartitionFixture)
3656+
{
3657+
const TPartitionId partitionId{1};
3658+
const ui64 step = 12345;
3659+
const ui64 txId = 67890;
3660+
3661+
CreatePartition({.Partition=partitionId});
3662+
3663+
SendCalcPredicate(step, txId, Ctx->Edge);
3664+
WaitForGetWriteInfoRequest();
3665+
SendGetWriteInfoError(31415, "error", Ctx->Edge);
3666+
WaitForCalcPredicateResult(txId, false);
3667+
}
3668+
36053669
} // End of suite
36063670

36073671
} // namespace

0 commit comments

Comments
 (0)