Skip to content

Commit e47d635

Browse files
authored
[24-3] Fix topic read crash (#11137)
1 parent b2ea93c commit e47d635

File tree

4 files changed

+128
-8
lines changed

4 files changed

+128
-8
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,18 +1226,25 @@ void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorCont
12261226

12271227
void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) {
12281228
const ui64 cookie = ev->Get()->GetCookie();
1229-
Y_ABORT_UNLESS(ReadInfo.contains(cookie));
1230-
12311229
auto it = ReadInfo.find(cookie);
1232-
Y_ABORT_UNLESS(it != ReadInfo.end());
1230+
1231+
// If there is no such cookie, then read was canceled.
1232+
// For example, it can be after consumer deletion
1233+
if (it == ReadInfo.end()) {
1234+
return;
1235+
}
12331236

12341237
TReadInfo info = std::move(it->second);
12351238
ReadInfo.erase(it);
12361239

1237-
//make readinfo class
1238-
auto& userInfo = UsersInfoStorage->GetOrCreate(info.User, ctx);
1240+
auto* userInfo = UsersInfoStorage->GetIfExists(info.User);
1241+
if (!userInfo) {
1242+
ReplyError(ctx, info.Destination, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(info.User));
1243+
OnReadRequestFinished(info.Destination, 0, info.User, ctx);
1244+
}
1245+
12391246
TReadAnswer answer(info.FormAnswer(
1240-
ctx, *ev->Get(), EndOffset, Partition, &userInfo,
1247+
ctx, *ev->Get(), EndOffset, Partition, userInfo,
12411248
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode()
12421249
));
12431250
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;
@@ -2428,6 +2435,20 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
24282435
}
24292436

24302437
UsersInfoStorage->Remove(user, ctx);
2438+
2439+
// Finish all ongoing reads
2440+
std::unordered_set<ui64> readCookies;
2441+
for (auto& [cookie, info] : ReadInfo) {
2442+
if (info.User == user) {
2443+
readCookies.insert(cookie);
2444+
ReplyError(ctx, info.Destination, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(user));
2445+
OnReadRequestFinished(info.Destination, 0, user, ctx);
2446+
}
2447+
}
2448+
for (ui64 cookie : readCookies) {
2449+
ReadInfo.erase(cookie);
2450+
}
2451+
24312452
Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
24322453
}
24332454
}

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
623623
static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst);
624624
void RemovePendingRequests(TMessageQueue& requests);
625625
void RemoveMessagesToQueue(TMessageQueue& requests);
626+
static TString GetConsumerDeletedMessage(TStringBuf consumerName);
626627

627628
private:
628629
ui64 TabletID;

ydb/core/persqueue/partition_read.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -748,8 +748,8 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
748748
auto* read = readEvent->Get();
749749
const TString& user = read->ClientId;
750750
auto userInfo = UsersInfoStorage->GetIfExists(user);
751-
if(!userInfo) {
752-
ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() << "cannot finish read request. Consumer " << read->ClientId << " is gone from partition");
751+
if (!userInfo) {
752+
ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(read->ClientId));
753753
Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
754754
OnReadRequestFinished(read->Cookie, 0, user, ctx);
755755
return;
@@ -1026,4 +1026,8 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
10261026
ctx.Send(BlobCache, request.Release());
10271027
}
10281028

1029+
TString TPartition::GetConsumerDeletedMessage(TStringBuf consumerName) {
1030+
return TStringBuilder() << "cannot finish read request. Consumer " << consumerName << " is gone from partition";
1031+
}
1032+
10291033
} // namespace NKikimr::NPQ

ydb/core/persqueue/ut/pq_ut.cpp

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2326,6 +2326,100 @@ Y_UNIT_TEST(TestTabletRestoreEventsOrder) {
23262326
});
23272327
}
23282328

2329+
Y_UNIT_TEST(TestReadAndDeleteConsumer) {
2330+
TTestContext tc;
2331+
RunTestWithReboots(tc.TabletIds, [&]() {
2332+
return tc.InitialEventsFilter.Prepare();
2333+
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
2334+
TFinalizer finalizer(tc);
2335+
tc.Prepare(dispatchName, setup, activeZone);
2336+
activeZone = false;
2337+
tc.Runtime->SetScheduledLimit(2000);
2338+
tc.Runtime->SetScheduledEventFilter(&tc.ImmediateLogFlushAndRequestTimeoutFilter);
2339+
2340+
TVector<std::pair<ui64, TString>> data;
2341+
TString msg;
2342+
msg.resize(102400, 'a');
2343+
for (ui64 i = 1; i <= 1000; ++i) {
2344+
data.emplace_back(i, msg);
2345+
}
2346+
2347+
PQTabletPrepare({.maxCountInPartition=100, .deleteTime=TDuration::Days(2).Seconds(), .partitions=1},
2348+
{{"user1", true}, {"user2", true}}, tc);
2349+
CmdWrite(0, "sourceid1", data, tc, false, {}, true);
2350+
2351+
// Reset tablet cache
2352+
PQTabletRestart(tc);
2353+
2354+
TAutoPtr<IEventHandle> handle;
2355+
TEvPersQueue::TEvResponse* readResult = nullptr;
2356+
THolder<TEvPersQueue::TEvRequest> readRequest;
2357+
TEvPersQueue::TEvUpdateConfigResponse* consumerDeleteResult = nullptr;
2358+
THolder<TEvPersQueue::TEvUpdateConfig> consumerDeleteRequest;
2359+
2360+
// Read request
2361+
{
2362+
readRequest.Reset(new TEvPersQueue::TEvRequest);
2363+
auto req = readRequest->Record.MutablePartitionRequest();
2364+
req->SetPartition(0);
2365+
auto read = req->MutableCmdRead();
2366+
read->SetOffset(1);
2367+
read->SetClientId("user1");
2368+
read->SetCount(1);
2369+
read->SetBytes(1'000'000);
2370+
read->SetTimeoutMs(5000);
2371+
}
2372+
2373+
// Consumer delete request
2374+
{
2375+
consumerDeleteRequest.Reset(new TEvPersQueue::TEvUpdateConfig());
2376+
consumerDeleteRequest->MutableRecord()->SetTxId(42);
2377+
auto& cfg = *consumerDeleteRequest->MutableRecord()->MutableTabletConfig();
2378+
cfg.SetVersion(42);
2379+
cfg.AddPartitionIds(0);
2380+
cfg.AddPartitions()->SetPartitionId(0);
2381+
cfg.SetLocalDC(true);
2382+
cfg.SetTopic("topic");
2383+
auto& cons = *cfg.AddConsumers();
2384+
cons.SetName("user2");
2385+
cons.SetImportant(true);
2386+
}
2387+
2388+
TActorId edge = tc.Runtime->AllocateEdgeActor();
2389+
2390+
// Delete consumer during read request
2391+
tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, readRequest.Release(), 0, GetPipeConfigWithRetries());
2392+
2393+
// Intercept TEvPQ::TEvBlobResponse event
2394+
std::vector<TEvPQ::TEvBlobResponse::TPtr> capturedBlobResponses;
2395+
auto captureBlobResponsesObserver = tc.Runtime->AddObserver<TEvPQ::TEvBlobResponse>([&](TEvPQ::TEvBlobResponse::TPtr& ev) {
2396+
capturedBlobResponses.emplace_back().Swap(ev);
2397+
});
2398+
2399+
// Delete consumer while read request is still in progress
2400+
tc.Runtime->SendToPipe(tc.TabletId, edge, consumerDeleteRequest.Release(), 0, GetPipeConfigWithRetries());
2401+
consumerDeleteResult = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvUpdateConfigResponse>(handle);
2402+
{
2403+
//Cerr << "Got consumer delete response: " << consumerDeleteResult->Record << Endl;
2404+
UNIT_ASSERT(consumerDeleteResult->Record.HasStatus());
2405+
UNIT_ASSERT_EQUAL(consumerDeleteResult->Record.GetStatus(), NKikimrPQ::EStatus::OK);
2406+
}
2407+
2408+
// Resend intercepted blob responses and wait for read result
2409+
captureBlobResponsesObserver.Remove();
2410+
for (auto& ev : capturedBlobResponses) {
2411+
tc.Runtime->Send(ev.Release(), 0, true);
2412+
}
2413+
2414+
readResult = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
2415+
{
2416+
//Cerr << "Got read response: " << readResult->Record << Endl;
2417+
UNIT_ASSERT(readResult->Record.HasStatus());
2418+
UNIT_ASSERT_EQUAL(readResult->Record.GetErrorCode(), NPersQueue::NErrorCode::BAD_REQUEST);
2419+
UNIT_ASSERT_STRING_CONTAINS_C(readResult->Record.GetErrorReason(), "Consumer user1 is gone from partition", readResult->Record.Utf8DebugString());
2420+
}
2421+
});
2422+
}
23292423

23302424

23312425
} // Y_UNIT_TEST_SUITE(TPQTest)

0 commit comments

Comments
 (0)