Skip to content

Commit 8a91dae

Browse files
FloatingCrowbarqyryq
authored andcommitted
direct read fix (#15479)
1 parent 0311ff3 commit 8a91dae

File tree

4 files changed

+44
-5
lines changed

4 files changed

+44
-5
lines changed

ydb/core/persqueue/dread_cache_service/caching_service.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
5656
hFunc(TEvPQ::TEvGetFullDirectReadData, HandleGetData)
5757
hFunc(TEvPQProxy::TEvDirectReadDataSessionConnected, HandleCreateClientSession)
5858
hFunc(TEvPQProxy::TEvDirectReadDataSessionDead, HandleDestroyClientSession)
59+
hFunc(TEvPQProxy::TEvDirectReadDestroyPartitionSession, HandlePartitionSessionReleased)
5960
)
6061

6162
private:
@@ -113,6 +114,17 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
113114
AssignByProxy.erase(assignIter);
114115
}
115116

117+
void HandlePartitionSessionReleased(TEvPQProxy::TEvDirectReadDestroyPartitionSession::TPtr& ev) {
118+
auto assignIter = AssignByProxy.find(ev->Sender);
119+
if (assignIter.IsEnd())
120+
return;
121+
if (!assignIter->second.contains(ev->Get()->ReadKey.PartitionSessionId))
122+
return;
123+
124+
assignIter->second.erase(ev->Get()->ReadKey.PartitionSessionId);
125+
ServerSessions.erase(ev->Get()->ReadKey);
126+
}
127+
116128
void HandleRegister(TEvPQ::TEvRegisterDirectReadSession::TPtr& ev) {
117129
const auto& key = ev->Get()->Session;
118130
RegisterServerSession(key, ev->Get()->Generation);

ydb/services/persqueue_v1/actors/events.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,11 @@ struct TEvPQProxy {
600600
};
601601

602602
struct TEvDirectReadDestroyPartitionSession : public TEventLocal<TEvDirectReadDestroyPartitionSession, EvDirectReadDestroyPartitionSession> {
603+
604+
TEvDirectReadDestroyPartitionSession(const TString& sessionId, ui64 partitionSessionId)
605+
: ReadKey(sessionId, partitionSessionId)
606+
{}
607+
603608
TEvDirectReadDestroyPartitionSession(const NKikimr::NPQ::TReadSessionKey& sessionKey,
604609
Ydb::PersQueue::ErrorCode::ErrorCode code, const TString& reason)
605610
: ReadKey(sessionKey)

ydb/services/persqueue_v1/actors/partition_actor.cpp

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,10 +324,14 @@ void TPartitionActor::Handle(TEvPQProxy::TEvDirectReadAck::TPtr& ev, const TActo
324324
if (DirectReadRestoreStage != EDirectReadRestoreStage::None) {
325325
if (RestoredDirectReadId == ev->Get()->DirectReadId) {
326326
// This direct read is already being restored. Have to forget it later.
327+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Got ack for direct read " << ev->Get()->DirectReadId
328+
<< " while restoring, store it to forget further");
327329
DirectReadsToForget.insert(ev->Get()->DirectReadId);
328330
return;
329331
}
330332
if (DirectReadsToRestore.contains(ev->Get()->DirectReadId)) {
333+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Got ack for direct read " << ev->Get()->DirectReadId
334+
<< " while restoring, remove it from restore list");
331335
// This direct read is pending for restore. No need to foreget - not yet prepared, just erase it;
332336
DirectReadsToRestore.erase(ev->Get()->DirectReadId);
333337
DirectReadsToPublish.erase(ev->Get()->DirectReadId);
@@ -367,6 +371,7 @@ void TPartitionActor::Handle(const TEvPQProxy::TEvRestartPipe::TPtr&, const TAct
367371
DirectReadsToRestore = DirectReadResults;
368372
DirectReadsToPublish = PublishedDirectReads;
369373
Y_ABORT_UNLESS(!DirectReadsToPublish.contains(DirectReadId));
374+
RestoredDirectReadId = 0;
370375
RestartDirectReadSession();
371376
return;
372377
}
@@ -700,6 +705,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
700705
break;
701706
case EDirectReadRestoreStage::Session:
702707
Y_ABORT_UNLESS(result.HasCmdRestoreDirectReadResult());
708+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Direct read - session restarted for partition " << Partition);
703709
if (!SendNextRestorePrepareOrForget()) {
704710
OnDirectReadsRestored();
705711
}
@@ -725,6 +731,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
725731
return;
726732
case EDirectReadRestoreStage::Publish:
727733
Y_ABORT_UNLESS(RestoredDirectReadId != 0);
734+
728735
Y_ABORT_UNLESS(result.HasCmdPublishReadResult());
729736
Y_ABORT_UNLESS(*DirectReadsToPublish.begin() == result.GetCmdPublishReadResult().GetDirectReadId());
730737
DirectReadsToPublish.erase(DirectReadsToPublish.begin());
@@ -762,6 +769,8 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
762769

763770
Y_ABORT_UNLESS(DirectRead);
764771
Y_ABORT_UNLESS(res.GetDirectReadId() == DirectReadId);
772+
if (!PipeClient)
773+
return; // Pipe was already destroyed, direct read session is being restored. Will resend this request afterwards;
765774

766775
EndOffset = res.GetEndOffset();
767776
SizeLag = res.GetSizeLag();
@@ -1157,13 +1166,18 @@ bool TPartitionActor::SendNextRestorePrepareOrForget() {
11571166
if (shouldForget) {
11581167
// We have something to forget from what was already restored; Do NOT change RestoredDirectReadId
11591168
DirectReadRestoreStage = EDirectReadRestoreStage::Forget;
1169+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Restore direct read, forget id "
1170+
<< *DirectReadsToForget.begin() << " for partition " << Partition);
11601171
SendForgetDirectRead(*DirectReadsToForget.begin(), ctx);
11611172
return true;
11621173
} else {
1163-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Resend prepare direct read id " << prepareId << " for partition " << Partition);
1174+
auto& dr = DirectReadsToRestore.begin()->second;
1175+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Resend prepare direct read id " << prepareId
1176+
<< " (internal id: " << dr.GetDirectReadId() << ") for partition " << Partition);
11641177
Y_ABORT_UNLESS(prepareId != 0);
1178+
11651179
//Restore;
1166-
auto& dr = DirectReadsToRestore.begin()->second;
1180+
Y_ABORT_UNLESS(prepareId == dr.GetDirectReadId());
11671181

11681182
Y_ABORT_UNLESS(RestoredDirectReadId < dr.GetDirectReadId());
11691183
RestoredDirectReadId = dr.GetDirectReadId();
@@ -1190,8 +1204,8 @@ bool TPartitionActor::SendNextRestorePublishRequest() {
11901204
return false;
11911205
}
11921206
auto id = *DirectReadsToPublish.begin();
1193-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << Partition
1194-
<< "Resend publish direct read on restore, id: " << id);
1207+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Resend publish direct read on restore, id: "
1208+
<< id << " for partition " << Partition);
11951209

11961210
Y_ABORT_UNLESS(RestoredDirectReadId == id);
11971211
DirectReadRestoreStage = EDirectReadRestoreStage::Publish;
@@ -1373,7 +1387,6 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext&
13731387

13741388
Y_ABORT_UNLESS(ReadGuid.empty());
13751389
Y_ABORT_UNLESS(!RequestInfly);
1376-
Y_ABORT_UNLESS(DirectReadRestoreStage == EDirectReadRestoreStage::None);
13771390

13781391
ReadGuid = ev->Get()->Guid;
13791392

@@ -1386,6 +1399,12 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext&
13861399
if (!PipeClient) //Pipe will be recreated soon
13871400
return;
13881401

1402+
if (DirectReadRestoreStage != EDirectReadRestoreStage::None) {
1403+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " READ FROM " << Partition
1404+
<< " store this request utill direct read is restored");
1405+
return;
1406+
}
1407+
13891408
TAutoPtr<TEvPersQueue::TEvRequest> event(new TEvPersQueue::TEvRequest);
13901409
event->Record.Swap(&request);
13911410

ydb/services/persqueue_v1/actors/read_session_actor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1484,6 +1484,9 @@ void TReadSessionActor<UseMigrationProtocol>::SendReleaseSignal(TPartitionActorI
14841484
result.mutable_stop_partition_session_request()->set_committed_offset(partition.Offset);
14851485
if (DirectRead) {
14861486
result.mutable_stop_partition_session_request()->set_last_direct_read_id(partition.LastDirectReadId);
1487+
ctx.Send(NPQ::MakePQDReadCacheServiceActorId(),
1488+
new TEvPQProxy::TEvDirectReadDestroyPartitionSession(Session, partition.Partition.AssignId));
1489+
14871490
}
14881491
}
14891492

0 commit comments

Comments
 (0)