Skip to content

Commit efdee79

Browse files
authored
fixed memory travel when PartitionGraph was changed (#20543)
2 parents 74cf760 + 3b4ed6a commit efdee79

File tree

11 files changed

+136
-98
lines changed

11 files changed

+136
-98
lines changed

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ void TKafkaReadSessionActor::HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthRes
476476

477477
for (const auto& [name, t] : ev->Get()->TopicAndTablets) {
478478
auto internalName = t.TopicNameConverter->GetInternalName();
479-
TopicsInfo[internalName] = NGRpcProxy::TTopicHolder::FromTopicInfo(t);
479+
TopicsInfo[internalName] = NKikimr::NGRpcProxy::TTopicHolder(t);
480480
FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter;
481481
FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter;
482482
}

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3263,9 +3263,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
32633263
);
32643264

32653265
userInfo.Offset = offset;
3266-
if (userInfo.Offset <= (i64)StartOffset) {
3267-
userInfo.AnyCommits = false;
3268-
}
3266+
userInfo.AnyCommits = userInfo.Offset > (i64)StartOffset;
32693267

32703268
if (LastOffsetHasBeenCommited(userInfo)) {
32713269
SendReadingFinished(user);

ydb/core/persqueue/partition_init.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,12 +433,12 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
433433
Y_ABORT_UNLESS(key);
434434
RequestInfoRange(ctx, Partition()->Tablet, PartitionId(), *key);
435435
} else {
436-
Done(ctx);
436+
PostProcessing(ctx);
437437
}
438438
break;
439439
}
440440
case NKikimrProto::NODATA:
441-
Done(ctx);
441+
PostProcessing(ctx);
442442
break;
443443
case NKikimrProto::ERROR:
444444
PQ_LOG_ERROR("read topic error");
@@ -450,6 +450,16 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
450450
};
451451
}
452452

453+
void TInitInfoRangeStep::PostProcessing(const TActorContext& ctx) {
454+
auto& usersInfoStorage = Partition()->UsersInfoStorage;
455+
for (auto& [_, userInfo] : usersInfoStorage->GetAll()) {
456+
userInfo.AnyCommits = userInfo.Offset > (i64)Partition()->StartOffset;
457+
}
458+
459+
Done(ctx);
460+
}
461+
462+
453463

454464
//
455465
// TInitDataRangeStep

ydb/core/persqueue/partition_init.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ class TInitInfoRangeStep: public TBaseKVStep {
140140

141141
void Execute(const TActorContext& ctx) override;
142142
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
143+
144+
void PostProcessing(const TActorContext& ctx);
143145
};
144146

145147
class TInitDataRangeStep: public TBaseKVStep {

ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
804804
THashMap<std::pair<TString, ui32>, TPartitionActorInfo> Partitions; //topic[ClientSideName!]:partition -> info
805805

806806
THashMap<TString, NPersQueue::TTopicConverterPtr> FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching
807-
THashMap<TString, TTopicHolder> Topics; // PrimaryName ->topic info
807+
THashMap<TString, TTopicHolder::TPtr> Topics; // PrimaryName ->topic info
808808

809809
TVector<ui32> Groups;
810810
bool ReadOnlyLocal;

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class TPartitionActor : public NActors::TActorBootstrapped<TPartitionActor> {
146146
public:
147147
TPartitionActor(const TActorId& parentId, const TString& clientId, const ui64 cookie, const TString& session, const ui32 generation,
148148
const ui32 step, const NPersQueue::TTopicConverterPtr& topic, const TString& database, const ui32 partition, const ui64 tabletID,
149-
const TReadSessionActor::TTopicCounters& counters, const TString& clientDC, std::set<NPQ::TPartitionGraph::Node*> parents);
149+
const TReadSessionActor::TTopicCounters& counters, const TString& clientDC, const TTopicHolder::TPtr& topicHolder);
150150
~TPartitionActor();
151151

152152
void Bootstrap(const NActors::TActorContext& ctx);
@@ -212,6 +212,7 @@ class TPartitionActor : public NActors::TActorBootstrapped<TPartitionActor> {
212212
void CommitDone(ui64 cookie, const TActorContext& ctx);
213213
void SendPartitionReady(const TActorContext& ctx);
214214

215+
const std::set<NPQ::TPartitionGraph::Node*>& GetParents() const;
215216
private:
216217
const TActorId ParentId;
217218
const TString InternalClientId;
@@ -267,7 +268,7 @@ class TPartitionActor : public NActors::TActorBootstrapped<TPartitionActor> {
267268
bool ReadingFinishedSent;
268269

269270
std::unordered_map<ui64, std::shared_ptr<NKikimr::NGRpcProxy::V1::TDistributedCommitHelper>> Kqps;
270-
std::set<NPQ::TPartitionGraph::Node*> Parents;
271+
const TTopicHolder::TPtr TopicHolder;
271272
};
272273

273274

@@ -347,8 +348,8 @@ void TReadSessionActor::Die(const TActorContext& ctx) {
347348
}
348349

349350
for (auto& t : Topics) {
350-
if (t.second.PipeClient)
351-
NTabletPipe::CloseClient(ctx, t.second.PipeClient);
351+
if (t.second->PipeClient)
352+
NTabletPipe::CloseClient(ctx, t.second->PipeClient);
352353
}
353354
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD");
354355

@@ -844,7 +845,7 @@ void TReadSessionActor::RegisterSessions(const TActorContext& ctx) {
844845

845846
for (auto& t : Topics) {
846847
auto& topic = t.first;
847-
RegisterSession(t.second.PipeClient, topic, ctx);
848+
RegisterSession(t.second->PipeClient, topic, ctx);
848849
NumPartitionsFromTopic[topic] = 0;
849850
}
850851
}
@@ -978,14 +979,14 @@ void TReadSessionActor::Handle(V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const
978979

979980
for (auto& [name, t] : ev->Get()->TopicAndTablets) {
980981
auto& topicHolder = Topics[t.TopicNameConverter->GetInternalName()];
981-
topicHolder.TabletID = t.TabletID;
982-
topicHolder.CloudId = t.CloudId;
983-
topicHolder.DbId = t.DbId;
984-
topicHolder.DbPath = t.DbPath;
985-
topicHolder.IsServerless = t.IsServerless;
986-
topicHolder.FolderId = t.FolderId;
987-
topicHolder.FullConverter = t.TopicNameConverter;
988-
topicHolder.PartitionGraph = t.PartitionGraph;
982+
topicHolder->TabletID = t.TabletID;
983+
topicHolder->CloudId = t.CloudId;
984+
topicHolder->DbId = t.DbId;
985+
topicHolder->DbPath = t.DbPath;
986+
topicHolder->IsServerless = t.IsServerless;
987+
topicHolder->FolderId = t.FolderId;
988+
topicHolder->FullConverter = t.TopicNameConverter;
989+
topicHolder->PartitionGraph = t.PartitionGraph;
989990
FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter;
990991
const auto& second = t.TopicNameConverter->GetSecondaryPath();
991992
if (!second.empty()) {
@@ -999,7 +1000,7 @@ void TReadSessionActor::Handle(V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const
9991000
clientConfig.CheckAliveness = false;
10001001

10011002
clientConfig.RetryPolicy = RetryPolicyForPipes;
1002-
t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig));
1003+
t.second->PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second->TabletID, clientConfig));
10031004
}
10041005

10051006
RegisterSessions(ctx);
@@ -1014,7 +1015,7 @@ void TReadSessionActor::Handle(V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const
10141015
NPersQueue::NErrorCode::BAD_REQUEST, ctx);
10151016
return;
10161017
}
1017-
it->second.PartitionGraph = t.PartitionGraph;
1018+
it->second->PartitionGraph = t.PartitionGraph;
10181019
}
10191020
}
10201021

@@ -1054,7 +1055,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
10541055
Y_ABORT_UNLESS(!intName.empty());
10551056
auto jt = Topics.find(intName);
10561057

1057-
if (jt == Topics.end() || pipe != jt->second.PipeClient) { //this is message from old version of pipe
1058+
if (jt == Topics.end() || pipe != jt->second->PipeClient) { //this is message from old version of pipe
10581059
LOG_DEBUG_S(
10591060
ctx, NKikimrServices::PQ_READ_PROXY,
10601061
PQ_LOG_PREFIX << " ignored ev lock for topic = " << converterIter->second->GetPrintableString()
@@ -1063,7 +1064,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
10631064
return;
10641065
}
10651066

1066-
auto* partitionNode = jt->second.PartitionGraph->GetPartition(record.GetPartition());
1067+
auto* partitionNode = jt->second->PartitionGraph->GetPartition(record.GetPartition());
10671068
if (!partitionNode) {
10681069
LOG_DEBUG_S(
10691070
ctx, NKikimrServices::PQ_READ_PROXY,
@@ -1079,7 +1080,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
10791080
// ToDo[counters]
10801081
if (NumPartitionsFromTopic[intName]++ == 0) {
10811082
if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
1082-
SetupTopicCounters(converterIter->second, jt->second.CloudId, jt->second.DbId, jt->second.DbPath, jt->second.IsServerless, jt->second.FolderId);
1083+
SetupTopicCounters(converterIter->second, jt->second->CloudId, jt->second->DbId, jt->second->DbPath, jt->second->IsServerless, jt->second->FolderId);
10831084
} else {
10841085
SetupTopicCounters(converterIter->second);
10851086
}
@@ -1088,11 +1089,11 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
10881089
auto it = TopicCounters.find(intName);
10891090
Y_ABORT_UNLESS(it != TopicCounters.end());
10901091

1091-
auto database = jt->second.DbPath;
1092+
auto database = jt->second->DbPath;
10921093
IActor* partitionActor = new TPartitionActor(
10931094
ctx.SelfID, InternalClientId, Cookie, Session, record.GetGeneration(),
1094-
record.GetStep(), jt->second.FullConverter, database, record.GetPartition(), record.GetTabletId(), it->second,
1095-
ClientDC, jt->second.PartitionGraph->GetPartition(record.GetPartition())->AllParents
1095+
record.GetStep(), jt->second->FullConverter, database, record.GetPartition(), record.GetTabletId(), it->second,
1096+
ClientDC, jt->second
10961097
);
10971098

10981099
TActorId actorId = ctx.Register(partitionActor);
@@ -1102,8 +1103,8 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
11021103
Y_ABORT_UNLESS(record.GetGeneration() > 0);
11031104
//Partitions use clientside name !
11041105
auto pp = Partitions.insert({
1105-
std::make_pair(jt->second.FullConverter->GetClientsideName(), record.GetPartition()),
1106-
TPartitionActorInfo{actorId, (((ui64)record.GetGeneration()) << 32) + record.GetStep(), jt->second.FullConverter}
1106+
std::make_pair(jt->second->FullConverter->GetClientsideName(), record.GetPartition()),
1107+
TPartitionActorInfo{actorId, (((ui64)record.GetGeneration()) << 32) + record.GetStep(), jt->second->FullConverter}
11071108
});
11081109
Y_ABORT_UNLESS(pp.second);
11091110
if (SessionsActive) {
@@ -1142,7 +1143,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const T
11421143
Y_ABORT_UNLESS(ClientsideLocksAllowed);
11431144
TReadResponse result;
11441145
auto lock = result.MutableLock();
1145-
lock->SetTopic(topicIter->second.FullConverter->GetClientsideName());
1146+
lock->SetTopic(topicIter->second->FullConverter->GetClientsideName());
11461147
lock->SetPartition(ev->Get()->Partition);
11471148
lock->SetReadOffset(ev->Get()->Offset);
11481149
lock->SetEndOffset(ev->Get()->EndOffset);
@@ -1211,7 +1212,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, cons
12111212

12121213
TActorId pipe = ActorIdFromProto(record.GetPipeClient());
12131214

1214-
if (pipe != it->second.PipeClient) { //this is message from old version of pipe
1215+
if (pipe != it->second->PipeClient) { //this is message from old version of pipe
12151216
return;
12161217
}
12171218

@@ -1286,14 +1287,14 @@ void TReadSessionActor::InformBalancerAboutRelease(const THashMap<std::pair<TStr
12861287
Y_ABORT_UNLESS(jt != Topics.end());
12871288

12881289
req.SetSession(Session);
1289-
ActorIdToProto(jt->second.PipeClient, req.MutablePipeClient());
1290+
ActorIdToProto(jt->second->PipeClient, req.MutablePipeClient());
12901291
req.SetClientId(InternalClientId);
12911292
req.SetTopic(it->first.first);
12921293
req.SetPartition(it->first.second);
12931294

12941295
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " released: " << it->first.first << ":" << it->first.second);
12951296

1296-
NTabletPipe::SendData(ctx, jt->second.PipeClient, request.Release());
1297+
NTabletPipe::SendData(ctx, jt->second->PipeClient, request.Release());
12971298
}
12981299

12991300

@@ -1428,7 +1429,7 @@ bool TReadSessionActor::ProcessReleasePartition(const THashMap<std::pair<TString
14281429

14291430
bool TReadSessionActor::ProcessBalancerDead(const ui64 tablet, const TActorContext& ctx) {
14301431
for (auto& t : Topics) {
1431-
if (t.second.TabletID == tablet) {
1432+
if (t.second->TabletID == tablet) {
14321433
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " balancer for topic " << t.first << " is dead, restarting all from this topic");
14331434

14341435
//Drop all partitions from this topic
@@ -1451,14 +1452,14 @@ bool TReadSessionActor::ProcessBalancerDead(const ui64 tablet, const TActorConte
14511452
NTabletPipe::TClientConfig clientConfig;
14521453
clientConfig.CheckAliveness = false;
14531454
clientConfig.RetryPolicy = RetryPolicyForPipes;
1454-
t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig));
1455+
t.second->PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second->TabletID, clientConfig));
14551456
if (InitDone) {
14561457
if (PipeReconnects) {
14571458
++(*PipeReconnects);
14581459
++(*Errors);
14591460
}
14601461

1461-
RegisterSession(t.second.PipeClient, t.first, ctx);
1462+
RegisterSession(t.second->PipeClient, t.first, ctx);
14621463
}
14631464
}
14641465
}
@@ -1833,7 +1834,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadingStarted::TPtr& ev, const TA
18331834
}
18341835

18351836
auto& topic = it->second;
1836-
NTabletPipe::SendData(ctx, topic.PipeClient, new TEvPersQueue::TEvReadingPartitionStartedRequest(InternalClientId, msg->PartitionId));
1837+
NTabletPipe::SendData(ctx, topic->PipeClient, new TEvPersQueue::TEvReadingPartitionStartedRequest(InternalClientId, msg->PartitionId));
18371838
}
18381839

18391840
void TReadSessionActor::Handle(TEvPQProxy::TEvReadingFinished::TPtr& ev, const TActorContext& ctx) {
@@ -1845,7 +1846,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadingFinished::TPtr& ev, const T
18451846
}
18461847

18471848
auto& topic = it->second;
1848-
NTabletPipe::SendData(ctx, topic.PipeClient, new TEvPersQueue::TEvReadingPartitionFinishedRequest(InternalClientId, msg->PartitionId, false, msg->FirstMessage));
1849+
NTabletPipe::SendData(ctx, topic->PipeClient, new TEvPersQueue::TEvReadingPartitionFinishedRequest(InternalClientId, msg->PartitionId, false, msg->FirstMessage));
18491850
}
18501851

18511852

@@ -1854,7 +1855,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadingFinished::TPtr& ev, const T
18541855
TPartitionActor::TPartitionActor(
18551856
const TActorId& parentId, const TString& internalClientId, const ui64 cookie, const TString& session,
18561857
const ui32 generation, const ui32 step, const NPersQueue::TTopicConverterPtr& topic, const TString& database, const ui32 partition,
1857-
const ui64 tabletID, const TReadSessionActor::TTopicCounters& counters, const TString& clientDC, std::set<NPQ::TPartitionGraph::Node*> parents
1858+
const ui64 tabletID, const TReadSessionActor::TTopicCounters& counters, const TString& clientDC, const TTopicHolder::TPtr& topicHolder
18581859
)
18591860
: ParentId(parentId)
18601861
, InternalClientId(internalClientId)
@@ -1891,7 +1892,7 @@ TPartitionActor::TPartitionActor(
18911892
, Counters(counters)
18921893
, FirstRead(true)
18931894
, ReadingFinishedSent(false)
1894-
, Parents(parents)
1895+
, TopicHolder(topicHolder)
18951896
{
18961897
}
18971898

@@ -1922,11 +1923,21 @@ void TPartitionActor::CheckRelease(const TActorContext& ctx) {
19221923
}
19231924
}
19241925

1926+
const std::set<NPQ::TPartitionGraph::Node*>& TPartitionActor::GetParents() const {
1927+
const auto* partition = TopicHolder->PartitionGraph->GetPartition(Partition);
1928+
if (partition) {
1929+
return partition->AllParents;
1930+
}
1931+
1932+
static std::set<NPQ::TPartitionGraph::Node*> empty;
1933+
return empty;
1934+
}
19251935

19261936
void TPartitionActor::SendCommit(const ui64 readId, const ui64 offset, const TActorContext& ctx) {
1927-
if (!ClientHasAnyCommits && Parents.size() != 0) {
1937+
const auto& parents = GetParents();
1938+
if (!ClientHasAnyCommits && parents.size() != 0) {
19281939
std::vector<NKikimr::NGRpcProxy::V1::TDistributedCommitHelper::TCommitInfo> commits;
1929-
for (auto& parent: Parents) {
1940+
for (auto& parent: parents) {
19301941
NKikimr::NGRpcProxy::V1::TDistributedCommitHelper::TCommitInfo commit {.PartitionId = parent->Id, .Offset = Max<i64>(), .KillReadSession = false, .OnlyCheckCommitedToFinish = true, .ReadSessionId = Session};
19311942
commits.push_back(commit);
19321943
}

ydb/services/lib/actors/type_definitions.h

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ struct TTopicInitInfo {
3232
using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>;
3333

3434
struct TTopicHolder {
35+
using TPtr = std::shared_ptr<TTopicHolder>;
36+
3537
ui64 TabletID = 0;
3638
TActorId PipeClient;
3739
bool ACLRequestInfly = false;
@@ -49,21 +51,25 @@ struct TTopicHolder {
4951
THashMap<ui32, TPartitionInfo> Partitions;
5052
std::shared_ptr<NPQ::TPartitionGraph> PartitionGraph;
5153

54+
TTopicHolder() {
55+
}
56+
57+
explicit TTopicHolder(const TTopicInitInfo& info) {
58+
TabletID = info.TabletID;
59+
ACLRequestInfly = false;
60+
CloudId = info.CloudId;
61+
DbId = info.DbId;
62+
DbPath = info.DbPath;
63+
IsServerless = info.IsServerless;
64+
FolderId = info.FolderId;
65+
MeteringMode = info.MeteringMode;
66+
FullConverter = info.TopicNameConverter;
67+
Partitions = info.Partitions;
68+
PartitionGraph = info.PartitionGraph;
69+
}
5270

53-
inline static TTopicHolder FromTopicInfo(const TTopicInitInfo& info) {
54-
return TTopicHolder{
55-
.TabletID = info.TabletID,
56-
.ACLRequestInfly = false,
57-
.CloudId = info.CloudId,
58-
.DbId = info.DbId,
59-
.DbPath = info.DbPath,
60-
.IsServerless = info.IsServerless,
61-
.FolderId = info.FolderId,
62-
.MeteringMode = info.MeteringMode,
63-
.FullConverter = info.TopicNameConverter,
64-
.Partitions = info.Partitions,
65-
.PartitionGraph = info.PartitionGraph
66-
};
71+
inline static TTopicHolder::TPtr FromTopicInfo(const TTopicInitInfo& info) {
72+
return std::make_shared<TTopicHolder>(info);
6773
}
6874
};
6975

0 commit comments

Comments
 (0)