Skip to content

Commit b574773

Browse files
Direct read restore session on tablet restarts (#11395)
1 parent 8c51589 commit b574773

File tree

8 files changed

+643
-216
lines changed

8 files changed

+643
-216
lines changed

ydb/core/persqueue/dread_cache_service/caching_service.cpp

Lines changed: 43 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ i32 GetDataChunkCodec(const NKikimrPQClient::TDataChunk& proto) {
2525
return 0;
2626
}
2727

28+
#define PQ_CPROXY_LOG_D(message) LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, TStringBuilder() << "Direct read cache: " << message);
29+
#define PQ_CPROXY_LOG_I(message) LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, TStringBuilder() << "Direct read cache: " << message);
30+
#define PQ_CPROXY_LOG_W(message) LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, TStringBuilder() << "Direct read cache: " << message);
31+
#define PQ_CPROXY_LOG_E(message) LOG_ERROR_S(ctx, NKikimrServices::PQ_READ_PROXY, TStringBuilder() << "Direct read cache: " << message);
32+
#define PQ_CPROXY_LOG_A(message) LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, TStringBuilder() << "Direct read cache: " << message);
33+
2834

2935
class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheService> {
3036
public:
@@ -35,7 +41,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
3541
}
3642

3743
void Bootstrap(const TActorContext& ctx) {
38-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "Direct read cache created");
44+
PQ_CPROXY_LOG_D(": Created");
3945

4046
Become(&TThis::StateWork);
4147
Y_UNUSED(ctx);
@@ -59,10 +65,10 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
5965
void HandleCreateClientSession(TEvPQProxy::TEvDirectReadDataSessionConnected::TPtr& ev) {
6066
const auto& ctx = ActorContext();
6167
auto key = MakeSessionKey(ev->Get());
62-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, TStringBuilder() << "Direct read cache: client session connected with id '" << key.SessionId << "'");
68+
PQ_CPROXY_LOG_D("client session connected with id '" << key.SessionId << "'");
6369
auto sessionIter = ServerSessions.find(key);
6470
if (sessionIter.IsEnd()) {
65-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, TStringBuilder() << "Direct read cache: unknown session id '" << key.SessionId << "', close session");
71+
PQ_CPROXY_LOG_D("unknown session id '" << key.SessionId << "', close session");
6672
CloseSession(ev->Sender, Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST, "Unknown session");
6773
return;
6874
}
@@ -112,16 +118,10 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
112118

113119
auto destroyDone = DestroyServerSession(ServerSessions.find(key), ev->Get()->Generation);
114120
if (destroyDone) {
115-
LOG_DEBUG_S(
116-
ctx, NKikimrServices::PQ_READ_PROXY,
117-
TStringBuilder() << "Direct read cache: server session deregistered: " << key.SessionId
118-
);
121+
PQ_CPROXY_LOG_D("server session deregistered: " << key.SessionId);
119122
} else {
120-
LOG_WARN_S(
121-
ctx, NKikimrServices::PQ_READ_PROXY,
122-
TStringBuilder() << "Direct read cache: attempted to deregister unknown server session: " << key.SessionId
123-
<< ":" << key.PartitionSessionId << " with generation " << ev->Get()->Generation << ", ignored"
124-
);
123+
PQ_CPROXY_LOG_W("attempted to deregister unknown server session: " << key.SessionId
124+
<< ":" << key.PartitionSessionId << " with generation " << ev->Get()->Generation << ", ignored");
125125
return;
126126
}
127127
}
@@ -131,64 +131,47 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
131131
auto sessionKey = MakeSessionKey(ev->Get());
132132
auto sessionIter = ServerSessions.find(sessionKey);
133133
if (sessionIter.IsEnd()) {
134-
LOG_ERROR_S(
135-
ctx, NKikimrServices::PQ_READ_PROXY,
136-
TStringBuilder() << "Direct read cache: tried to stage direct read for unregistered session: "
137-
<< sessionKey.SessionId << ":" << sessionKey.PartitionSessionId
138-
);
134+
PQ_CPROXY_LOG_E("tried to stage direct read for unregistered session: "
135+
<< sessionKey.SessionId << ":" << sessionKey.PartitionSessionId);
139136
return;
140137
}
141138
if (sessionIter->second.Generation != ev->Get()->TabletGeneration) {
142-
LOG_ALERT_S(
143-
ctx, NKikimrServices::PQ_READ_PROXY,
144-
TStringBuilder() << "Direct read cache: tried to stage direct read for session " << sessionKey.SessionId
145-
<< " with generation " << ev->Get()->TabletGeneration << ", previously had this session with generation "
146-
<< sessionIter->second.Generation << ". Data ignored"
147-
);
139+
PQ_CPROXY_LOG_A("tried to stage direct read for session " << sessionKey.SessionId
140+
<< " with generation " << ev->Get()->TabletGeneration << ", previously had this session with generation "
141+
<< sessionIter->second.Generation << ". Data ignored");
148142
return;
149143
}
150144
auto ins = sessionIter->second.StagedReads.insert(std::make_pair(ev->Get()->ReadKey.ReadId, ev->Get()->Response));
151145
if (!ins.second) {
152-
LOG_WARN_S(
153-
ctx, NKikimrServices::PQ_READ_PROXY,
154-
TStringBuilder() << "Direct read cache: tried to stage duplicate direct read for session " << sessionKey.SessionId << " with id "
155-
<< ev->Get()->ReadKey.ReadId << ", new data ignored"
156-
);
146+
PQ_CPROXY_LOG_W("tried to stage duplicate direct read for session " << sessionKey.SessionId << " with id "
147+
<< ev->Get()->ReadKey.ReadId << ", new data ignored");
157148
return;
158149
}
159150
ChangeCounterValue("StagedReadDataSize", ins.first->second->ByteSize(), false);
160151
ChangeCounterValue("StagedReadsCount", 1, false);
161-
LOG_DEBUG_S(
162-
ctx, NKikimrServices::PQ_READ_PROXY,
163-
TStringBuilder() << "Direct read cache: staged direct read id " << ev->Get()->ReadKey.ReadId << " for session: "
164-
<< sessionKey.SessionId
165-
);
152+
PQ_CPROXY_LOG_D("staged direct read id " << ev->Get()->ReadKey.ReadId << " for session: " << sessionKey.SessionId);
166153
}
167154

168155
void HandlePublish(TEvPQ::TEvPublishDirectRead::TPtr& ev) {
169156
const auto& ctx = ActorContext();
170157
auto key = MakeSessionKey(ev->Get());
171158
const auto readId = ev->Get()->ReadKey.ReadId;
172-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, TStringBuilder() << "Direct read cache: publish read: " << readId << " for session " << key.SessionId);
159+
const auto& generation = ev->Get()->TabletGeneration;
160+
PQ_CPROXY_LOG_D("publish read: " << readId << " for session " << key.SessionId << ", Generation: " << generation);
161+
173162
auto iter = ServerSessions.find(key);
174163
if (iter.IsEnd()) {
175-
LOG_ERROR_S(
176-
ctx, NKikimrServices::PQ_READ_PROXY,
177-
TStringBuilder() << "Direct read cache: attempt to publish read for unknow session " << key.SessionId << " ignored"
178-
);
164+
PQ_CPROXY_LOG_E("attempt to publish read for unknow session " << key.SessionId << " ignored");
179165
return;
180166
}
181167

182-
const auto& generation = ev->Get()->TabletGeneration;
183168
if (iter->second.Generation != generation)
184169
return;
185170

186171
auto stagedIter = iter->second.StagedReads.find(readId);
187172
if (stagedIter == iter->second.StagedReads.end()) {
188-
LOG_ERROR_S(
189-
ctx, NKikimrServices::PQ_READ_PROXY,
190-
TStringBuilder() << "Direct read cache: attempt to publish unknown read id " << readId << " from session: "
191-
<< key.SessionId << " ignored");
173+
PQ_CPROXY_LOG_E("attempt to publish unknown read id " << readId << " from session: "
174+
<< key.SessionId << " ignored");
192175
return;
193176
}
194177
auto inserted = iter->second.Reads.insert(std::make_pair(ev->Get()->ReadKey.ReadId, stagedIter->second)).second;
@@ -209,18 +192,10 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
209192
auto key = MakeSessionKey(ev->Get());
210193
auto iter = ServerSessions.find(key);
211194
if (iter.IsEnd()) {
212-
LOG_DEBUG_S(
213-
ctx, NKikimrServices::PQ_READ_PROXY,
214-
TStringBuilder() << "Direct read cache: attempt to forget read for unknown session: "
215-
<< ev->Get()->ReadKey.SessionId << " ignored"
216-
);
195+
PQ_CPROXY_LOG_D("attempt to forget read for unknown session: " << ev->Get()->ReadKey.SessionId << " ignored");
217196
return;
218197
}
219-
LOG_DEBUG_S(
220-
ctx, NKikimrServices::PQ_READ_PROXY,
221-
TStringBuilder() << "Direct read cache: forget read: " << ev->Get()->ReadKey.ReadId << " for session "
222-
<< key.SessionId
223-
);
198+
PQ_CPROXY_LOG_D("forget read: " << ev->Get()->ReadKey.ReadId << " for session " << key.SessionId);
224199

225200
const auto& generation = ev->Get()->TabletGeneration;
226201
if (iter->second.Generation != generation) { // Stale generation in event, ignore it
@@ -265,6 +240,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
265240
[[nodiscard]] bool DestroyServerSession(TSessionsMap::iterator sessionIter, ui64 generation) {
266241
if (sessionIter.IsEnd() || sessionIter->second.Generation > generation)
267242
return false;
243+
Cerr << "CahceProxy: DestroyServerSession with generation: " << generation << Endl;
268244
DestroyPartitionSession(sessionIter, Ydb::PersQueue::ErrorCode::READ_ERROR_NO_SESSION, "Closed by server");
269245
ServerSessions.erase(sessionIter);
270246
ChangeCounterValue("ActiveServerSessions", ServerSessions.size(), true);
@@ -275,32 +251,22 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
275251
const auto& ctx = ActorContext();
276252
auto sessionsIter = ServerSessions.find(key);
277253
if (sessionsIter.IsEnd()) {
278-
LOG_DEBUG_S(
279-
ctx, NKikimrServices::PQ_READ_PROXY,
280-
TStringBuilder() << "Direct read cache: registered server session: " << key.SessionId
281-
<< ":" << key.PartitionSessionId << " with generation " << generation
282-
);
254+
PQ_CPROXY_LOG_D("registered server session: " << key.SessionId
255+
<< ":" << key.PartitionSessionId << " with generation " << generation);
256+
283257
ServerSessions.insert(std::make_pair(key, TCacheServiceData{generation}));
284258
} else if (sessionsIter->second.Generation == generation) {
285-
LOG_WARN_S(
286-
ctx, NKikimrServices::PQ_READ_PROXY,
287-
TStringBuilder() << "Direct read cache: attempted to register duplicate server session: " << key.SessionId
288-
<< ":" << key.PartitionSessionId << " with same generation " << generation << ", ignored"
289-
);
259+
PQ_CPROXY_LOG_W("attempted to register duplicate server session: " << key.SessionId << ":"
260+
<< key.PartitionSessionId << " with same generation " << generation << ", ignored");
261+
290262
} else if (DestroyServerSession(sessionsIter, generation)) {
291-
LOG_DEBUG_S(
292-
ctx, NKikimrServices::PQ_READ_PROXY,
293-
TStringBuilder() << "Direct read cache: registered server session: " << key.SessionId
294-
<< ":" << key.PartitionSessionId << " with generation " << generation
295-
<< ", killed existing session with older generation "
296-
);
263+
PQ_CPROXY_LOG_D("registered server session: " << key.SessionId
264+
<< ":" << key.PartitionSessionId << " with generation " << generation
265+
<< ", killed existing session with older generation ");
297266
ServerSessions.insert(std::make_pair(key, TCacheServiceData{generation}));
298267
} else {
299-
LOG_INFO_S(
300-
ctx, NKikimrServices::PQ_READ_PROXY,
301-
TStringBuilder() << "Direct read cache: attempted to register server session: " << key.SessionId
302-
<< ":" << key.PartitionSessionId << " with stale generation " << generation << ", ignored"
303-
);
268+
PQ_CPROXY_LOG_I("attempted to register server session: " << key.SessionId
269+
<< ":" << key.PartitionSessionId << " with stale generation " << generation << ", ignored");
304270
}
305271
ChangeCounterValue("ActiveServerSessions", ServerSessions.size(), true);
306272
}
@@ -372,8 +338,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
372338
partSessionId);
373339
message->set_status(Ydb::StatusIds::SUCCESS);
374340

375-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, TStringBuilder() << "Direct read cache: send data to client. AssignId: "
376-
<< partSessionId << ", readId: " << readId);
341+
PQ_CPROXY_LOG_D("send data to client. AssignId: " << partSessionId << ", readId: " << readId);
377342

378343
ctx.Send(proxyClient.ProxyId, new TEvPQProxy::TEvDirectReadSendClientData(std::move(message)));
379344
return true;
@@ -386,7 +351,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
386351
) {
387352
const auto& ctx = ActorContext();
388353
ctx.Send(proxyId, new TEvPQProxy::TEvDirectReadCloseSession(code, reason));
389-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, TStringBuilder() << " Direct read cache: close session for proxy " << proxyId.ToString());
354+
PQ_CPROXY_LOG_D("close session for proxy " << proxyId.ToString());
390355
}
391356

392357
bool DestroyPartitionSession(
@@ -400,11 +365,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
400365
ctx.Send(
401366
sessionIter->second.Client->ProxyId, new TEvPQProxy::TEvDirectReadDestroyPartitionSession(sessionIter->first, code, reason)
402367
);
403-
LOG_DEBUG_S(
404-
ctx, NKikimrServices::PQ_READ_PROXY,
405-
TStringBuilder() << " Direct read cache: close session for proxy "
406-
<< sessionIter->second.Client->ProxyId.ToString()
407-
);
368+
PQ_CPROXY_LOG_D("close session for proxy " << sessionIter->second.Client->ProxyId.ToString());
408369
return true;
409370
}
410371

ydb/core/persqueue/partition_read.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
140140
if (!InitDone) {
141141
return;
142142
}
143-
144143
auto now = ctx.Now();
145144

146145
auto forgetSubscription = [&](const TString clientId) {
@@ -400,8 +399,7 @@ TReadAnswer TReadInfo::FormAnswer(
400399
readResult->SetReadFromTimestampMs(ReadTimestampMs);
401400

402401
Y_ABORT_UNLESS(endOffset <= (ui64)Max<i64>(), "Max offset is too big: %" PRIu64, endOffset);
403-
404-
PQ_LOG_D("FormAnswer " << Blobs.size());
402+
PQ_LOG_D("FormAnswer for " << Blobs.size() << " blobs");
405403

406404
if (!isActive && response->GetBlobs().empty()) {
407405
readResult->SetReadingFinished(true);
@@ -538,6 +536,7 @@ TReadAnswer TReadInfo::FormAnswer(
538536
Destination != 0, ctx.Now()
539537
);
540538
}
539+
541540
AddResultBlob(readResult, writeBlob, Offset);
542541
if (writeBlob.IsLastPart()) {
543542
++Offset;
@@ -614,7 +613,7 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(
614613
}
615614
while (it != DataKeysBody.end()
616615
&& (size < maxSize && count < maxCount || count == 0) //count== 0 grants that blob with offset from ReadFromTimestamp will be readed
617-
&& (lastOffset == 0 || it->Key.GetOffset() < lastOffset)
616+
&& (lastOffset == 0 || it->Key.GetOffset() <= lastOffset)
618617
) {
619618
size += sz;
620619
count += cnt;
@@ -660,14 +659,15 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(
660659
Y_ABORT_UNLESS(pno == blobs[i].GetPartNo());
661660
bool skip = offset < startOffset || offset == startOffset &&
662661
blobs[i].GetPartNo() < partNo;
662+
if (lastOffset != 0 && lastOffset < offset) {
663+
break;
664+
}
663665
if (blobs[i].IsLastPart()) {
664666
++offset;
665667
pno = 0;
666668
} else {
667669
++pno;
668670
}
669-
if (lastOffset > 0 && offset >= lastOffset)
670-
break;
671671

672672
if (skip) continue;
673673

@@ -681,10 +681,12 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(
681681
}
682682
lastBlobSize = 0;
683683

684-
if (count > maxCount) // blob is counted already
684+
if (count > maxCount) {// blob is counted already
685685
break;
686-
if (size > maxSize)
686+
}
687+
if (size > maxSize) {
687688
break;
689+
}
688690
}
689691
size += blobs[i].GetBlobSize();
690692
lastBlobSize += blobs[i].GetBlobSize();
@@ -990,8 +992,9 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
990992
);
991993
info.Blobs = blobs;
992994
ui64 lastOffset = info.Offset + Min(count, info.Count);
995+
993996
PQ_LOG_D("read cookie " << cookie << " added " << info.Blobs.size()
994-
<< " blobs, size " << size << " count " << count << " last offset " << lastOffset);
997+
<< " blobs, size " << size << " count " << count << " last offset " << lastOffset << ", current partition end offset: " << EndOffset);
995998

996999
if (blobs.empty() || blobs.back().Key == DataKeysBody.back().Key) { // read from head only when all blobs from body processed
9971000
ui64 insideHeadOffset{0};
@@ -1025,6 +1028,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
10251028
TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp->ByteSize());
10261029

10271030
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
1031+
10281032
OnReadRequestFinished(info.Destination, answer.Size, info.User, ctx);
10291033
return;
10301034
}

0 commit comments

Comments
 (0)