Skip to content

Commit dd1e0fa

Browse files
authored
Merge pull request #18086 from qyryq/25-1/direct-read-fixes
Bug fixes for direct read in topics (stable-25-1)
2 parents c222668 + f42e3e1 commit dd1e0fa

File tree

15 files changed

+443
-57
lines changed

15 files changed

+443
-57
lines changed

ydb/core/persqueue/dread_cache_service/caching_service.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
5656
hFunc(TEvPQ::TEvGetFullDirectReadData, HandleGetData)
5757
hFunc(TEvPQProxy::TEvDirectReadDataSessionConnected, HandleCreateClientSession)
5858
hFunc(TEvPQProxy::TEvDirectReadDataSessionDead, HandleDestroyClientSession)
59+
hFunc(TEvPQProxy::TEvDirectReadDestroyPartitionSession, HandlePartitionSessionReleased)
5960
)
6061

6162
private:
@@ -72,15 +73,16 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
7273
CloseSession(ev->Sender, Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST, "Unknown session");
7374
return;
7475
}
76+
77+
auto sender = ev->Sender;
7578
if (sessionIter->second.Generation != ev->Get()->Generation) {
7679
ctx.Send(
77-
sessionIter->second.Client->ProxyId,
78-
new TEvPQProxy::TEvDirectReadDestroyPartitionSession(key, Ydb::PersQueue::ErrorCode::ErrorCode::ERROR, "Generation mismatch")
80+
sender,
81+
new TEvPQProxy::TEvDirectReadDestroyPartitionSession(key, Ydb::PersQueue::ErrorCode::ErrorCode::ERROR, "Generation mismatch")
7982
);
8083
return;
8184
}
8285

83-
auto sender = ev->Sender;
8486
auto startingReadId = ev->Get()->StartingReadId;
8587

8688
// Let the proxy respond with StartDirectReadPartitionSessionResponse right away,
@@ -112,6 +114,17 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
112114
AssignByProxy.erase(assignIter);
113115
}
114116

117+
void HandlePartitionSessionReleased(TEvPQProxy::TEvDirectReadDestroyPartitionSession::TPtr& ev) {
118+
auto assignIter = AssignByProxy.find(ev->Sender);
119+
if (assignIter.IsEnd())
120+
return;
121+
if (!assignIter->second.contains(ev->Get()->ReadKey.PartitionSessionId))
122+
return;
123+
124+
assignIter->second.erase(ev->Get()->ReadKey.PartitionSessionId);
125+
ServerSessions.erase(ev->Get()->ReadKey);
126+
}
127+
115128
void HandleRegister(TEvPQ::TEvRegisterDirectReadSession::TPtr& ev) {
116129
const auto& key = ev->Get()->Session;
117130
RegisterServerSession(key, ev->Get()->Generation);
@@ -344,6 +357,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
344357
auto* directReadMessage = message->mutable_direct_read_response();
345358
directReadMessage->set_direct_read_id(readId);
346359
directReadMessage->set_partition_session_id(partSessionId);
360+
directReadMessage->set_bytes_size(response->GetPartitionResponse().GetCmdPrepareReadResult().GetBytesSizeEstimate());
347361

348362
auto ok = VaildatePartitionResponse(proxyClient, *response);
349363
if (!ok) {

ydb/core/persqueue/partition_read.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ bool TPartition::ProcessHasDataRequest(const THasDataReq& request, const TActorC
152152

153153
auto now = ctx.Now();
154154
auto& userInfo = UsersInfoStorage->GetOrCreate(request.ClientId, ctx);
155-
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
155+
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
156156
}
157157
} else if (request.Offset < EndOffset) {
158158
sendResponse(GetSizeLag(request.Offset), false);
@@ -515,6 +515,11 @@ TReadAnswer TReadInfo::FormAnswer(
515515
<< " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());
516516

517517
for (size_t i = pos; i < batch.Blobs.size(); ++i) {
518+
if (0 < LastOffset && LastOffset <= Offset) {
519+
needStop = true;
520+
break;
521+
}
522+
518523
TClientBlob &res = batch.Blobs[i];
519524
VERIFY_RESULT_BLOB(res, i);
520525

@@ -640,7 +645,7 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(
640645
}
641646
while (it != DataKeysBody.end()
642647
&& (size < maxSize && count < maxCount || count == 0) //count== 0 grants that blob with offset from ReadFromTimestamp will be readed
643-
&& (lastOffset == 0 || it->Key.GetOffset() <= lastOffset)
648+
&& (lastOffset == 0 || it->Key.GetOffset() < lastOffset)
644649
) {
645650
size += sz;
646651
count += cnt;
@@ -688,7 +693,7 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(
688693
Y_ABORT_UNLESS(pno == blobs[i].GetPartNo());
689694
bool skip = offset < startOffset || offset == startOffset &&
690695
blobs[i].GetPartNo() < partNo;
691-
if (lastOffset != 0 && lastOffset < offset) {
696+
if (0 < lastOffset && lastOffset <= offset) {
692697
break;
693698
}
694699
if (blobs[i].IsLastPart()) {

ydb/core/persqueue/pq_impl.cpp

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,13 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
140140
{
141141
Y_ABORT_UNLESS(Response);
142142
const auto& record = ev->Get()->Record;
143-
if (!record.HasPartitionResponse() || !record.GetPartitionResponse().HasCmdReadResult() ||
144-
record.GetStatus() != NMsgBusProxy::MSTATUS_OK || record.GetErrorCode() != NPersQueue::NErrorCode::OK ||
145-
record.GetPartitionResponse().GetCmdReadResult().ResultSize() == 0) {
143+
auto isDirectRead = DirectReadKey.ReadId != 0;
144+
if (!record.HasPartitionResponse()
145+
|| !record.GetPartitionResponse().HasCmdReadResult()
146+
|| record.GetStatus() != NMsgBusProxy::MSTATUS_OK
147+
|| record.GetErrorCode() != NPersQueue::NErrorCode::OK
148+
|| (record.GetPartitionResponse().GetCmdReadResult().ResultSize() == 0 && !isDirectRead)
149+
) {
146150

147151
Response->Record.CopyFrom(record);
148152
ctx.Send(Sender, Response.Release());
@@ -151,7 +155,6 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
151155
}
152156
Y_ABORT_UNLESS(record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdReadResult());
153157
const auto& readResult = record.GetPartitionResponse().GetCmdReadResult();
154-
auto isDirectRead = DirectReadKey.ReadId != 0;
155158
if (isDirectRead) {
156159
if (!PreparedResponse) {
157160
PreparedResponse = std::make_shared<NKikimrClient::TResponse>();
@@ -162,10 +165,12 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
162165
responseRecord.SetStatus(NMsgBusProxy::MSTATUS_OK);
163166
responseRecord.SetErrorCode(NPersQueue::NErrorCode::OK);
164167

165-
Y_ABORT_UNLESS(readResult.ResultSize() > 0);
168+
Y_ABORT_UNLESS(readResult.ResultSize() > 0 || isDirectRead);
166169
bool isStart = false;
167170
if (!responseRecord.HasPartitionResponse()) {
168-
Y_ABORT_UNLESS(!readResult.GetResult(0).HasPartNo() || readResult.GetResult(0).GetPartNo() == 0); //starts from begin of record
171+
if (readResult.ResultSize() > 0) {
172+
Y_ABORT_UNLESS(!readResult.GetResult(0).HasPartNo() || readResult.GetResult(0).GetPartNo() == 0); //starts from begin of record
173+
}
169174
auto partResp = responseRecord.MutablePartitionResponse();
170175
auto readRes = partResp->MutableCmdReadResult();
171176
readRes->SetBlobsFromDisk(readRes->GetBlobsFromDisk() + readResult.GetBlobsFromDisk());
@@ -227,6 +232,7 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
227232
}
228233
}
229234
}
235+
230236
if (!partResp->GetResult().empty()) {
231237
const auto& lastRes = partResp->GetResult(partResp->GetResult().size() - 1);
232238
if (lastRes.HasPartNo() && lastRes.GetPartNo() + 1 < lastRes.GetTotalParts()) { //last res is not full
@@ -260,7 +266,10 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
260266
}
261267
if (isDirectRead) {
262268
auto* prepareResponse = Response->Record.MutablePartitionResponse()->MutableCmdPrepareReadResult();
263-
prepareResponse->SetBytesSizeEstimate(readResult.GetSizeEstimate());
269+
auto sizeEstimate = Request.GetPartitionRequest().GetCmdRead().GetSizeEstimate();
270+
sizeEstimate = sizeEstimate ? sizeEstimate : PreparedResponse->GetPartitionResponse().ByteSize();
271+
PreparedResponse->MutablePartitionResponse()->MutableCmdPrepareReadResult()->SetBytesSizeEstimate(sizeEstimate);
272+
prepareResponse->SetBytesSizeEstimate(sizeEstimate);
264273
prepareResponse->SetDirectReadId(DirectReadKey.ReadId);
265274
prepareResponse->SetReadOffset(readResult.GetRealReadOffset());
266275
prepareResponse->SetLastOffset(readResult.GetLastOffset());
@@ -2322,15 +2331,18 @@ void TPersQueue::HandleReadRequest(
23222331
return;
23232332
}
23242333
}
2334+
23252335
THolder<TEvPQ::TEvRead> event =
23262336
MakeHolder<TEvPQ::TEvRead>(responseCookie, cmd.GetOffset(), cmd.GetLastOffset(),
23272337
cmd.HasPartNo() ? cmd.GetPartNo() : 0,
23282338
count,
23292339
cmd.HasSessionId() ? cmd.GetSessionId() : "",
23302340
cmd.GetClientId(),
2331-
cmd.HasTimeoutMs() ? cmd.GetTimeoutMs() : 0, bytes,
2341+
cmd.HasTimeoutMs() ? cmd.GetTimeoutMs() : 0,
2342+
bytes,
23322343
cmd.HasMaxTimeLagMs() ? cmd.GetMaxTimeLagMs() : 0,
2333-
cmd.HasReadTimestampMs() ? cmd.GetReadTimestampMs() : 0, clientDC,
2344+
cmd.HasReadTimestampMs() ? cmd.GetReadTimestampMs() : 0,
2345+
clientDC,
23342346
cmd.GetExternalOperation(),
23352347
pipeClient);
23362348

@@ -2411,8 +2423,7 @@ void TPersQueue::HandleForgetReadRequest(
24112423
THolder<TEvPQ::TEvProxyResponse> forgetDoneEvent = MakeHolder<TEvPQ::TEvProxyResponse>(responseCookie);
24122424
forgetDoneEvent->Response->SetStatus(NMsgBusProxy::MSTATUS_OK);
24132425
forgetDoneEvent->Response->SetErrorCode(NPersQueue::NErrorCode::OK);
2414-
2415-
forgetDoneEvent->Response->MutablePartitionResponse()->MutableCmdForgetReadResult();
2426+
forgetDoneEvent->Response->MutablePartitionResponse()->MutableCmdForgetReadResult()->SetDirectReadId(key.ReadId);
24162427
ctx.Send(SelfId(), forgetDoneEvent.Release());
24172428

24182429
PQ_LOG_D("Forget direct read id " << key.ReadId << " for session " << key.SessionId);

ydb/core/persqueue/ut/common/pq_ut_common.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,9 @@ void BeginCmdRead(const TPQCmdReadSettings& settings, TTestContext& tc)
10291029
if (settings.PartitionSessionId > 0) {
10301030
read->SetPartitionSessionId(settings.PartitionSessionId);
10311031
}
1032+
if (settings.LastOffset > 0) {
1033+
read->SetLastOffset(settings.LastOffset);
1034+
}
10321035
if (settings.Pipe) {
10331036
ActorIdToProto(settings.Pipe, req->MutablePipeClient());
10341037
}

ydb/core/persqueue/ut/common/pq_ut_common.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,11 +449,12 @@ struct TPQCmdReadSettings : public TPQCmdSettingsBase {
449449
ui32 MaxTimeLagMs = 0;
450450
ui32 ReadTimestampMs = 0;
451451
ui64 DirectReadId = 0;
452+
i64 LastOffset = 0;
452453
TActorId Pipe;
453454
TPQCmdReadSettings() = default;
454455
TPQCmdReadSettings(const TString& session, ui32 partition, i64 offset, ui32 count, ui32 size, ui32 resCount, bool timeout = false,
455456
TVector<i32> offsets = {}, const ui32 maxTimeLagMs = 0, const ui64 readTimestampMs = 0,
456-
const TString user = "user")
457+
const TString user = "user", const i64 lastOffset = 0)
457458

458459
: TPQCmdSettingsBase{partition, user, session, 0, offset, false}
459460
, Count(count)
@@ -463,6 +464,7 @@ struct TPQCmdReadSettings : public TPQCmdSettingsBase {
463464
, Offsets (offsets)
464465
, MaxTimeLagMs(maxTimeLagMs)
465466
, ReadTimestampMs(readTimestampMs)
467+
, LastOffset(lastOffset)
466468
{}
467469
};
468470

ydb/core/persqueue/ut/pq_ut.cpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,75 @@ const static TString TOPIC_NAME = "rt3.dc1--topic";
2020

2121
Y_UNIT_TEST_SUITE(TPQTest) {
2222

23+
Y_UNIT_TEST(TestCmdReadWithLastOffset) {
24+
TTestContext tc;
25+
tc.EnableDetailedPQLog = true;
26+
RunTestWithReboots(tc.TabletIds, [&]() {
27+
return tc.InitialEventsFilter.Prepare();
28+
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
29+
activeZone = false;
30+
TFinalizer finalizer(tc);
31+
tc.Prepare(dispatchName, setup, activeZone);
32+
activeZone = false;
33+
tc.Runtime->SetScheduledLimit(1000);
34+
tc.Runtime->RegisterService(MakePQDReadCacheServiceActorId(), tc.Runtime->Register(
35+
CreatePQDReadCacheService(new NMonitoring::TDynamicCounters()))
36+
);
37+
38+
PQTabletPrepare({.partitions = 1, .writeSpeed = 100_KB}, {{"user1", true}}, tc);
39+
TVector<std::pair<ui64, TString>> data;
40+
i64 messageCount = 100;
41+
for (i64 i = 1; i <= messageCount; ++i) {
42+
data.push_back({i, TString(100_KB, 'a')});
43+
}
44+
CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 0, false, false, true);
45+
TString sessionId = "session1";
46+
TString user = "user1";
47+
TPQCmdSettings sessionSettings{0, user, sessionId};
48+
sessionSettings.PartitionSessionId = 1;
49+
sessionSettings.KeepPipe = true;
50+
TPQCmdReadSettings readSettings{
51+
/*session=*/ sessionId,
52+
/*partition=*/ 0,
53+
/*offset=*/ 0,
54+
/*count=*/ static_cast<ui32>(messageCount),
55+
/*size=*/ 16_MB,
56+
/*resCount=*/ 0,
57+
};
58+
readSettings.PartitionSessionId = 1;
59+
readSettings.User = user;
60+
61+
activeZone = false;
62+
Cerr << "Create session\n";
63+
auto pipe = CmdCreateSession(sessionSettings, tc);
64+
readSettings.Pipe = pipe;
65+
66+
for (i64 offset = 0; offset < messageCount; offset += 10) {
67+
for (i64 lastOffset = 0; lastOffset <= messageCount; lastOffset += 10) {
68+
readSettings.Offset = offset;
69+
readSettings.LastOffset = lastOffset;
70+
readSettings.ResCount = lastOffset < offset ? 0 : static_cast<ui32>(lastOffset - offset);
71+
BeginCmdRead(readSettings, tc);
72+
73+
TAutoPtr<IEventHandle> handle;
74+
auto* result = tc.Runtime->GrabEdgeEvent<TEvPersQueue::TEvResponse>(handle);
75+
76+
UNIT_ASSERT_C(result->Record.GetPartitionResponse().HasCmdReadResult(), result->Record.GetPartitionResponse().DebugString());
77+
auto res = result->Record.GetPartitionResponse().GetCmdReadResult();
78+
79+
if (lastOffset) {
80+
UNIT_ASSERT_VALUES_EQUAL(readSettings.ResCount, res.ResultSize());
81+
}
82+
83+
for (size_t i = 0; i < res.ResultSize(); ++i) {
84+
UNIT_ASSERT_EQUAL(res.GetResult(i).GetOffset(), offset + i);
85+
UNIT_ASSERT_EQUAL(res.GetResult(i).GetData(), data[offset + i].second);
86+
}
87+
}
88+
}
89+
});
90+
}
91+
2392
Y_UNIT_TEST(TestDirectReadHappyWay) {
2493
TTestContext tc;
2594
tc.EnableDetailedPQLog = true;

ydb/core/protos/msgbus_pq.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ message TPersQueuePartitionRequest {
3636
optional uint64 PartitionSessionId = 16;
3737
optional int64 LastOffset = 17;
3838

39+
optional uint64 SizeEstimate = 18;
3940
}
4041

4142
message TCmdPublishDirectRead {

ydb/core/testlib/test_pq_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,6 +1455,7 @@ class TFlatMsgBusPQClient : public NFlatTests::TFlatMsgBusClient {
14551455
struct CreateTopicNoLegacyParams {
14561456
TString Name;
14571457
ui32 PartsCount;
1458+
TDuration RetentionPeriod = TDuration::Hours(18);
14581459
bool DoWait = true;
14591460
bool CanWrite = true;
14601461
std::optional<TString> Dc = std::nullopt;
@@ -1477,6 +1478,7 @@ class TFlatMsgBusPQClient : public NFlatTests::TFlatMsgBusClient {
14771478
auto settings = NYdb::NPersQueue::TCreateTopicSettings().PartitionsCount(params.PartsCount).ClientWriteDisabled(!params.CanWrite);
14781479
settings.FederationAccount(params.Account);
14791480
settings.SupportedCodecs(params.Codecs);
1481+
settings.RetentionPeriod(params.RetentionPeriod);
14801482
//settings.MaxPartitionWriteSpeed(50_MB);
14811483
//settings.MaxPartitionWriteBurst(50_MB);
14821484
TVector<NYdb::NPersQueue::TReadRuleSettings> rrSettings;

ydb/core/viewer/tests/test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ def write(writer, message_pattern, close=True):
620620
'path': '{}/topic1'.format(dedicated_db),
621621
'partition': '0',
622622
'offset': '0',
623-
'last_offset': '2',
623+
'last_offset': '3',
624624
'limit': '5'
625625
})
626626

ydb/public/api/protos/ydb_topic.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,10 @@ message StreamDirectReadMessage {
688688

689689
// Messages data
690690
StreamReadMessage.ReadResponse.PartitionData partition_data = 3;
691+
692+
// Total size in bytes of this response as calculated by server.
693+
// See ReadRequest comment above.
694+
int64 bytes_size = 4;
691695
}
692696
}
693697

0 commit comments

Comments
 (0)