Skip to content

Commit a0fcd50

Browse files
committed
Fix generation mismatch handler
Set CmdForgetReadResult.DirectReadId Correct bytes bookkeeping on tablet restarts Stop adding messages to response when LastOffset is reached Fix for direct read (on behalf of FloatingCrowbar)
1 parent d636c46 commit a0fcd50

File tree

12 files changed

+387
-40
lines changed

12 files changed

+387
-40
lines changed

ydb/core/persqueue/dread_cache_service/caching_service.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,16 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
7272
CloseSession(ev->Sender, Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST, "Unknown session");
7373
return;
7474
}
75+
76+
auto sender = ev->Sender;
7577
if (sessionIter->second.Generation != ev->Get()->Generation) {
7678
ctx.Send(
77-
sessionIter->second.Client->ProxyId,
78-
new TEvPQProxy::TEvDirectReadDestroyPartitionSession(key, Ydb::PersQueue::ErrorCode::ErrorCode::ERROR, "Generation mismatch")
79+
sender,
80+
new TEvPQProxy::TEvDirectReadDestroyPartitionSession(key, Ydb::PersQueue::ErrorCode::ErrorCode::ERROR, "Generation mismatch")
7981
);
8082
return;
8183
}
8284

83-
auto sender = ev->Sender;
8485
auto startingReadId = ev->Get()->StartingReadId;
8586

8687
// Let the proxy respond with StartDirectReadPartitionSessionResponse right away,
@@ -344,6 +345,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
344345
auto* directReadMessage = message->mutable_direct_read_response();
345346
directReadMessage->set_direct_read_id(readId);
346347
directReadMessage->set_partition_session_id(partSessionId);
348+
directReadMessage->set_bytes_size(response->GetPartitionResponse().GetCmdPrepareReadResult().GetBytesSizeEstimate());
347349

348350
auto ok = VaildatePartitionResponse(proxyClient, *response);
349351
if (!ok) {

ydb/core/persqueue/partition_read.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ bool TPartition::ProcessHasDataRequest(const THasDataReq& request, const TActorC
152152

153153
auto now = ctx.Now();
154154
auto& userInfo = UsersInfoStorage->GetOrCreate(request.ClientId, ctx);
155-
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
155+
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
156156
}
157157
} else if (request.Offset < EndOffset) {
158158
sendResponse(GetSizeLag(request.Offset), false);
@@ -515,6 +515,11 @@ TReadAnswer TReadInfo::FormAnswer(
515515
<< " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());
516516

517517
for (size_t i = pos; i < batch.Blobs.size(); ++i) {
518+
if (0 < LastOffset && LastOffset <= Offset) {
519+
needStop = true;
520+
break;
521+
}
522+
518523
TClientBlob &res = batch.Blobs[i];
519524
VERIFY_RESULT_BLOB(res, i);
520525

@@ -640,7 +645,7 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(
640645
}
641646
while (it != DataKeysBody.end()
642647
&& (size < maxSize && count < maxCount || count == 0) //count== 0 grants that blob with offset from ReadFromTimestamp will be readed
643-
&& (lastOffset == 0 || it->Key.GetOffset() <= lastOffset)
648+
&& (lastOffset == 0 || it->Key.GetOffset() < lastOffset)
644649
) {
645650
size += sz;
646651
count += cnt;
@@ -688,7 +693,7 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(
688693
Y_ABORT_UNLESS(pno == blobs[i].GetPartNo());
689694
bool skip = offset < startOffset || offset == startOffset &&
690695
blobs[i].GetPartNo() < partNo;
691-
if (lastOffset != 0 && lastOffset < offset) {
696+
if (0 < lastOffset && lastOffset <= offset) {
692697
break;
693698
}
694699
if (blobs[i].IsLastPart()) {

ydb/core/persqueue/pq_impl.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
227227
}
228228
}
229229
}
230+
230231
if (!partResp->GetResult().empty()) {
231232
const auto& lastRes = partResp->GetResult(partResp->GetResult().size() - 1);
232233
if (lastRes.HasPartNo() && lastRes.GetPartNo() + 1 < lastRes.GetTotalParts()) { //last res is not full
@@ -260,7 +261,10 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
260261
}
261262
if (isDirectRead) {
262263
auto* prepareResponse = Response->Record.MutablePartitionResponse()->MutableCmdPrepareReadResult();
263-
prepareResponse->SetBytesSizeEstimate(readResult.GetSizeEstimate());
264+
auto sizeEstimate = Request.GetPartitionRequest().GetCmdRead().GetSizeEstimate();
265+
sizeEstimate = sizeEstimate ? sizeEstimate : PreparedResponse->GetPartitionResponse().ByteSize();
266+
PreparedResponse->MutablePartitionResponse()->MutableCmdPrepareReadResult()->SetBytesSizeEstimate(sizeEstimate);
267+
prepareResponse->SetBytesSizeEstimate(sizeEstimate);
264268
prepareResponse->SetDirectReadId(DirectReadKey.ReadId);
265269
prepareResponse->SetReadOffset(readResult.GetRealReadOffset());
266270
prepareResponse->SetLastOffset(readResult.GetLastOffset());
@@ -2318,15 +2322,18 @@ void TPersQueue::HandleReadRequest(
23182322
return;
23192323
}
23202324
}
2325+
23212326
THolder<TEvPQ::TEvRead> event =
23222327
MakeHolder<TEvPQ::TEvRead>(responseCookie, cmd.GetOffset(), cmd.GetLastOffset(),
23232328
cmd.HasPartNo() ? cmd.GetPartNo() : 0,
23242329
count,
23252330
cmd.HasSessionId() ? cmd.GetSessionId() : "",
23262331
cmd.GetClientId(),
2327-
cmd.HasTimeoutMs() ? cmd.GetTimeoutMs() : 0, bytes,
2332+
cmd.HasTimeoutMs() ? cmd.GetTimeoutMs() : 0,
2333+
bytes,
23282334
cmd.HasMaxTimeLagMs() ? cmd.GetMaxTimeLagMs() : 0,
2329-
cmd.HasReadTimestampMs() ? cmd.GetReadTimestampMs() : 0, clientDC,
2335+
cmd.HasReadTimestampMs() ? cmd.GetReadTimestampMs() : 0,
2336+
clientDC,
23302337
cmd.GetExternalOperation(),
23312338
pipeClient);
23322339

@@ -2407,8 +2414,7 @@ void TPersQueue::HandleForgetReadRequest(
24072414
THolder<TEvPQ::TEvProxyResponse> forgetDoneEvent = MakeHolder<TEvPQ::TEvProxyResponse>(responseCookie);
24082415
forgetDoneEvent->Response->SetStatus(NMsgBusProxy::MSTATUS_OK);
24092416
forgetDoneEvent->Response->SetErrorCode(NPersQueue::NErrorCode::OK);
2410-
2411-
forgetDoneEvent->Response->MutablePartitionResponse()->MutableCmdForgetReadResult();
2417+
forgetDoneEvent->Response->MutablePartitionResponse()->MutableCmdForgetReadResult()->SetDirectReadId(key.ReadId);
24122418
ctx.Send(SelfId(), forgetDoneEvent.Release());
24132419

24142420
PQ_LOG_D("Forget direct read id " << key.ReadId << " for session " << key.SessionId);

ydb/core/persqueue/ut/common/pq_ut_common.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,9 @@ void BeginCmdRead(const TPQCmdReadSettings& settings, TTestContext& tc)
10291029
if (settings.PartitionSessionId > 0) {
10301030
read->SetPartitionSessionId(settings.PartitionSessionId);
10311031
}
1032+
if (settings.LastOffset > 0) {
1033+
read->SetLastOffset(settings.LastOffset);
1034+
}
10321035
if (settings.Pipe) {
10331036
ActorIdToProto(settings.Pipe, req->MutablePipeClient());
10341037
}

ydb/core/persqueue/ut/common/pq_ut_common.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,11 +449,12 @@ struct TPQCmdReadSettings : public TPQCmdSettingsBase {
449449
ui32 MaxTimeLagMs = 0;
450450
ui32 ReadTimestampMs = 0;
451451
ui64 DirectReadId = 0;
452+
i64 LastOffset = 0;
452453
TActorId Pipe;
453454
TPQCmdReadSettings() = default;
454455
TPQCmdReadSettings(const TString& session, ui32 partition, i64 offset, ui32 count, ui32 size, ui32 resCount, bool timeout = false,
455456
TVector<i32> offsets = {}, const ui32 maxTimeLagMs = 0, const ui64 readTimestampMs = 0,
456-
const TString user = "user")
457+
const TString user = "user", const i64 lastOffset = 0)
457458

458459
: TPQCmdSettingsBase{partition, user, session, 0, offset, false}
459460
, Count(count)
@@ -463,6 +464,7 @@ struct TPQCmdReadSettings : public TPQCmdSettingsBase {
463464
, Offsets (offsets)
464465
, MaxTimeLagMs(maxTimeLagMs)
465466
, ReadTimestampMs(readTimestampMs)
467+
, LastOffset(lastOffset)
466468
{}
467469
};
468470

ydb/core/persqueue/ut/pq_ut.cpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,75 @@ const static TString TOPIC_NAME = "rt3.dc1--topic";
2020

2121
Y_UNIT_TEST_SUITE(TPQTest) {
2222

23+
Y_UNIT_TEST(TestCmdReadWithLastOffset) {
24+
TTestContext tc;
25+
tc.EnableDetailedPQLog = true;
26+
RunTestWithReboots(tc.TabletIds, [&]() {
27+
return tc.InitialEventsFilter.Prepare();
28+
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
29+
activeZone = false;
30+
TFinalizer finalizer(tc);
31+
tc.Prepare(dispatchName, setup, activeZone);
32+
activeZone = false;
33+
tc.Runtime->SetScheduledLimit(1000);
34+
tc.Runtime->RegisterService(MakePQDReadCacheServiceActorId(), tc.Runtime->Register(
35+
CreatePQDReadCacheService(new NMonitoring::TDynamicCounters()))
36+
);
37+
38+
PQTabletPrepare({.partitions = 1, .writeSpeed = 100_KB}, {{"user1", true}}, tc);
39+
TVector<std::pair<ui64, TString>> data;
40+
i64 messageCount = 100;
41+
for (i64 i = 1; i <= messageCount; ++i) {
42+
data.push_back({i, TString(100_KB, 'a')});
43+
}
44+
CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 0, false, false, true);
45+
TString sessionId = "session1";
46+
TString user = "user1";
47+
TPQCmdSettings sessionSettings{0, user, sessionId};
48+
sessionSettings.PartitionSessionId = 1;
49+
sessionSettings.KeepPipe = true;
50+
TPQCmdReadSettings readSettings{
51+
/*session=*/ sessionId,
52+
/*partition=*/ 0,
53+
/*offset=*/ 0,
54+
/*count=*/ static_cast<ui32>(messageCount),
55+
/*size=*/ 16_MB,
56+
/*resCount=*/ 0,
57+
};
58+
readSettings.PartitionSessionId = 1;
59+
readSettings.User = user;
60+
61+
activeZone = false;
62+
Cerr << "Create session\n";
63+
auto pipe = CmdCreateSession(sessionSettings, tc);
64+
readSettings.Pipe = pipe;
65+
66+
for (i64 offset = 0; offset < messageCount; offset += 10) {
67+
for (i64 lastOffset = 0; lastOffset <= messageCount; lastOffset += 10) {
68+
readSettings.Offset = offset;
69+
readSettings.LastOffset = lastOffset;
70+
readSettings.ResCount = lastOffset < offset ? 0 : static_cast<ui32>(lastOffset - offset);
71+
BeginCmdRead(readSettings, tc);
72+
73+
TAutoPtr<IEventHandle> handle;
74+
auto* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
75+
76+
UNIT_ASSERT_C(result->Record.GetPartitionResponse().HasCmdReadResult(), result->Record.GetPartitionResponse().DebugString());
77+
auto res = result->Record.GetPartitionResponse().GetCmdReadResult();
78+
79+
if (lastOffset) {
80+
UNIT_ASSERT_VALUES_EQUAL(readSettings.ResCount, res.ResultSize());
81+
}
82+
83+
for (size_t i = 0; i < res.ResultSize(); ++i) {
84+
UNIT_ASSERT_EQUAL(res.GetResult(i).GetOffset(), offset + i);
85+
UNIT_ASSERT_EQUAL(res.GetResult(i).GetData(), data[offset + i].second);
86+
}
87+
}
88+
}
89+
});
90+
}
91+
2392
Y_UNIT_TEST(TestDirectReadHappyWay) {
2493
TTestContext tc;
2594
tc.EnableDetailedPQLog = true;

ydb/core/protos/msgbus_pq.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ message TPersQueuePartitionRequest {
3636
optional uint64 PartitionSessionId = 16;
3737
optional int64 LastOffset = 17;
3838

39+
optional uint64 SizeEstimate = 18;
3940
}
4041

4142
message TCmdPublishDirectRead {

ydb/core/testlib/test_pq_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,6 +1455,7 @@ class TFlatMsgBusPQClient : public NFlatTests::TFlatMsgBusClient {
14551455
struct CreateTopicNoLegacyParams {
14561456
TString Name;
14571457
ui32 PartsCount;
1458+
TDuration RetentionPeriod = TDuration::Hours(18);
14581459
bool DoWait = true;
14591460
bool CanWrite = true;
14601461
std::optional<TString> Dc = std::nullopt;
@@ -1477,6 +1478,7 @@ class TFlatMsgBusPQClient : public NFlatTests::TFlatMsgBusClient {
14771478
auto settings = NYdb::NPersQueue::TCreateTopicSettings().PartitionsCount(params.PartsCount).ClientWriteDisabled(!params.CanWrite);
14781479
settings.FederationAccount(params.Account);
14791480
settings.SupportedCodecs(params.Codecs);
1481+
settings.RetentionPeriod(params.RetentionPeriod);
14801482
//settings.MaxPartitionWriteSpeed(50_MB);
14811483
//settings.MaxPartitionWriteBurst(50_MB);
14821484
TVector<NYdb::NPersQueue::TReadRuleSettings> rrSettings;

ydb/public/api/protos/ydb_topic.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,10 @@ message StreamDirectReadMessage {
688688

689689
// Messages data
690690
StreamReadMessage.ReadResponse.PartitionData partition_data = 3;
691+
692+
// Total size in bytes of this response as calculated by server.
693+
// See ReadRequest comment above.
694+
int64 bytes_size = 4;
691695
}
692696
}
693697

ydb/services/persqueue_v1/actors/partition_actor.cpp

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,16 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
685685
}
686686
return;
687687
}
688+
689+
if (!(
690+
result.HasCmdReadResult() || result.HasCmdPrepareReadResult() || result.HasCmdPublishReadResult()
691+
|| result.HasCmdForgetReadResult() || result.HasCmdRestoreDirectReadResult()
692+
)) {
693+
// this is commit response
694+
CommitDone(result.GetCookie(), ctx);
695+
return;
696+
}
697+
688698
switch (DirectReadRestoreStage) {
689699
case EDirectReadRestoreStage::None:
690700
break;
@@ -696,14 +706,19 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
696706
return;
697707
case EDirectReadRestoreStage::Prepare:
698708
Y_ABORT_UNLESS(RestoredDirectReadId != 0);
699-
709+
if (!result.HasCmdPrepareReadResult()) {
710+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Invalid response on direct read restore for " << Partition << ": expect PrepareReadResult");
711+
}
700712
Y_ABORT_UNLESS(result.HasCmdPrepareReadResult());
701713
Y_ABORT_UNLESS(DirectReadsToRestore.begin()->first == result.GetCmdPrepareReadResult().GetDirectReadId());
702714
DirectReadsToRestore.erase(DirectReadsToRestore.begin());
703715
{
704716
auto sent = SendNextRestorePublishRequest();
705-
if (!sent) // Nothing to publish
717+
if (!sent) {
718+
// Read was not published previously and thus no response sent to session. Need to keep it
719+
UnpublishedDirectReads.insert(result.GetCmdPrepareReadResult().GetDirectReadId());
706720
sent = SendNextRestorePrepareOrForget();
721+
}
707722
if (!sent)
708723
OnDirectReadsRestored();
709724
}
@@ -728,11 +743,6 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
728743
return;
729744
}
730745

731-
if (!(result.HasCmdReadResult() || result.HasCmdPrepareReadResult() || result.HasCmdPublishReadResult() || result.HasCmdForgetReadResult())) { // this is commit response
732-
CommitDone(result.GetCookie(), ctx);
733-
return;
734-
}
735-
736746
if (result.HasCmdForgetReadResult()) {
737747
// ignore it
738748
return;
@@ -1164,8 +1174,8 @@ bool TPartitionActor::SendNextRestorePrepareOrForget() {
11641174
DirectReadRestoreStage = EDirectReadRestoreStage::Prepare;
11651175
Y_ABORT_UNLESS(dr.GetReadOffset() <= dr.GetLastOffset());
11661176

1167-
auto request = MakeReadRequest(dr.GetReadOffset(), dr.GetLastOffset(), std::numeric_limits<i32>::max(),
1168-
std::numeric_limits<i32>::max(), 0, 0, dr.GetDirectReadId());
1177+
auto request = MakeReadRequest(dr.GetReadOffset(), dr.GetLastOffset() + 1, std::numeric_limits<i32>::max(),
1178+
std::numeric_limits<i32>::max(), 0, 0, dr.GetDirectReadId(), dr.GetBytesSizeEstimate());
11691179

11701180
if (!PipeClient) //Pipe will be recreated soon
11711181
return true;
@@ -1205,6 +1215,16 @@ void TPartitionActor::OnDirectReadsRestored() {
12051215
const auto& ctx = ActorContext();
12061216
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << Partition
12071217
<< "Restore direct reads done, continue working");
1218+
1219+
if (InitDone) {
1220+
ctx.Send(ParentId, new TEvPQProxy::TEvUpdateSession(Partition, NodeId, TabletGeneration));
1221+
}
1222+
1223+
for (auto id: UnpublishedDirectReads) {
1224+
SendPublishDirectRead(id, ActorContext());
1225+
}
1226+
UnpublishedDirectReads.clear();
1227+
12081228
ResendRecentRequests();
12091229
}
12101230

@@ -1310,7 +1330,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvHasDataInfoResponse::TPtr& ev, con
13101330

13111331

13121332
NKikimrClient::TPersQueueRequest TPartitionActor::MakeReadRequest(
1313-
ui64 readOffset, ui64 lastOffset, ui64 maxCount, ui64 maxSize, ui64 maxTimeLagMs, ui64 readTimestampMs, ui64 directReadId
1333+
ui64 readOffset, ui64 lastOffset, ui64 maxCount, ui64 maxSize, ui64 maxTimeLagMs, ui64 readTimestampMs, ui64 directReadId, ui64 sizeEstimate
13141334
) const {
13151335
NKikimrClient::TPersQueueRequest request;
13161336

@@ -1326,6 +1346,7 @@ NKikimrClient::TPersQueueRequest TPartitionActor::MakeReadRequest(
13261346
read->SetSessionId(Session);
13271347
if (DirectRead) {
13281348
read->SetDirectReadId(directReadId);
1349+
read->SetSizeEstimate(sizeEstimate);
13291350
}
13301351
if (maxCount) {
13311352
read->SetCount(maxCount);
@@ -1366,7 +1387,7 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext&
13661387
auto request = MakeReadRequest(ReadOffset, 0, req->MaxCount, req->MaxSize, req->MaxTimeLagMs, req->ReadTimestampMs, DirectReadId);
13671388
RequestInfly = true;
13681389
CurrentRequest = request;
1369-
1390+
13701391
if (!PipeClient) //Pipe will be recreated soon
13711392
return;
13721393

0 commit comments

Comments
 (0)