Skip to content

Commit 0311ff3

Browse files
FloatingCrowbarqyryq
authored andcommitted
Fix bugs in DR restore (#16125)
1 parent da0bfb1 commit 0311ff3

File tree

2 files changed

+11
-10
lines changed

2 files changed

+11
-10
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,13 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
140140
{
141141
Y_ABORT_UNLESS(Response);
142142
const auto& record = ev->Get()->Record;
143-
if (!record.HasPartitionResponse() || !record.GetPartitionResponse().HasCmdReadResult() ||
144-
record.GetStatus() != NMsgBusProxy::MSTATUS_OK || record.GetErrorCode() != NPersQueue::NErrorCode::OK ||
145-
record.GetPartitionResponse().GetCmdReadResult().ResultSize() == 0) {
143+
auto isDirectRead = DirectReadKey.ReadId != 0;
144+
if (!record.HasPartitionResponse()
145+
|| !record.GetPartitionResponse().HasCmdReadResult()
146+
|| record.GetStatus() != NMsgBusProxy::MSTATUS_OK
147+
|| record.GetErrorCode() != NPersQueue::NErrorCode::OK
148+
|| (record.GetPartitionResponse().GetCmdReadResult().ResultSize() == 0 && !isDirectRead)
149+
) {
146150

147151
Response->Record.CopyFrom(record);
148152
ctx.Send(Sender, Response.Release());
@@ -151,7 +155,6 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
151155
}
152156
Y_ABORT_UNLESS(record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdReadResult());
153157
const auto& readResult = record.GetPartitionResponse().GetCmdReadResult();
154-
auto isDirectRead = DirectReadKey.ReadId != 0;
155158
if (isDirectRead) {
156159
if (!PreparedResponse) {
157160
PreparedResponse = std::make_shared<NKikimrClient::TResponse>();
@@ -162,10 +165,12 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
162165
responseRecord.SetStatus(NMsgBusProxy::MSTATUS_OK);
163166
responseRecord.SetErrorCode(NPersQueue::NErrorCode::OK);
164167

165-
Y_ABORT_UNLESS(readResult.ResultSize() > 0);
168+
Y_ABORT_UNLESS(readResult.ResultSize() > 0 || isDirectRead);
166169
bool isStart = false;
167170
if (!responseRecord.HasPartitionResponse()) {
168-
Y_ABORT_UNLESS(!readResult.GetResult(0).HasPartNo() || readResult.GetResult(0).GetPartNo() == 0); //starts from begin of record
171+
if (readResult.ResultSize() > 0) {
172+
Y_ABORT_UNLESS(!readResult.GetResult(0).HasPartNo() || readResult.GetResult(0).GetPartNo() == 0); //starts from begin of record
173+
}
169174
auto partResp = responseRecord.MutablePartitionResponse();
170175
auto readRes = partResp->MutableCmdReadResult();
171176
readRes->SetBlobsFromDisk(readRes->GetBlobsFromDisk() + readResult.GetBlobsFromDisk());

ydb/services/persqueue_v1/actors/partition_actor.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -969,10 +969,6 @@ void TPartitionActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const
969969

970970
TabletGeneration = msg->Generation;
971971
NodeId = msg->ServerId.NodeId();
972-
973-
if (InitDone) {
974-
ctx.Send(ParentId, new TEvPQProxy::TEvUpdateSession(Partition, NodeId, TabletGeneration));
975-
}
976972
}
977973

978974
void TPartitionActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {

0 commit comments

Comments
 (0)