Skip to content

Commit ff9df7a

Browse files
authored
fixed memory travel when PartitionGraph was changed (#20558)
2 parents 0d24c4f + d52e3ac commit ff9df7a

File tree

13 files changed

+180
-98
lines changed

13 files changed

+180
-98
lines changed

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ void TKafkaReadSessionActor::HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthRes
476476

477477
for (const auto& [name, t] : ev->Get()->TopicAndTablets) {
478478
auto internalName = t.TopicNameConverter->GetInternalName();
479-
TopicsInfo[internalName] = NGRpcProxy::TTopicHolder::FromTopicInfo(t);
479+
TopicsInfo[internalName] = NKikimr::NGRpcProxy::TTopicHolder(t);
480480
FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter;
481481
FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter;
482482
}

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3263,9 +3263,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
32633263
);
32643264

32653265
userInfo.Offset = offset;
3266-
if (userInfo.Offset <= (i64)StartOffset) {
3267-
userInfo.AnyCommits = false;
3268-
}
3266+
userInfo.AnyCommits = userInfo.Offset > (i64)StartOffset;
32693267

32703268
if (LastOffsetHasBeenCommited(userInfo)) {
32713269
SendReadingFinished(user);

ydb/core/persqueue/partition_init.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,12 +433,12 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
433433
Y_ABORT_UNLESS(key);
434434
RequestInfoRange(ctx, Partition()->Tablet, PartitionId(), *key);
435435
} else {
436-
Done(ctx);
436+
PostProcessing(ctx);
437437
}
438438
break;
439439
}
440440
case NKikimrProto::NODATA:
441-
Done(ctx);
441+
PostProcessing(ctx);
442442
break;
443443
case NKikimrProto::ERROR:
444444
PQ_LOG_ERROR("read topic error");
@@ -450,6 +450,16 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
450450
};
451451
}
452452

453+
void TInitInfoRangeStep::PostProcessing(const TActorContext& ctx) {
454+
auto& usersInfoStorage = Partition()->UsersInfoStorage;
455+
for (auto& [_, userInfo] : usersInfoStorage->GetAll()) {
456+
userInfo.AnyCommits = userInfo.Offset > (i64)Partition()->StartOffset;
457+
}
458+
459+
Done(ctx);
460+
}
461+
462+
453463

454464
//
455465
// TInitDataRangeStep

ydb/core/persqueue/partition_init.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ class TInitInfoRangeStep: public TBaseKVStep {
140140

141141
void Execute(const TActorContext& ctx) override;
142142
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
143+
144+
void PostProcessing(const TActorContext& ctx);
143145
};
144146

145147
class TInitDataRangeStep: public TBaseKVStep {

ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ TTopicSdkTestSetup CreateSetup() {
113113
setup.GetRuntime().GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true);
114114
setup.GetRuntime().GetAppData().PQConfig.SetUseSrcIdMetaMappingInFirstClass(true);
115115
setup.GetRuntime().GetAppData().PQConfig.SetBalancerWakeupIntervalSec(1);
116+
setup.GetRuntime().GetAppData().PQConfig.SetACLRetryTimeoutSec(1);
116117

117118
return setup;
118119
}

ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,49 @@ Y_UNIT_TEST_SUITE(CommitOffset) {
618618
return true;
619619
});
620620
}
621+
622+
Y_UNIT_TEST(DistributedTxCommit_LongReadSession) {
623+
TTopicSdkTestSetup setup = CreateSetup();
624+
PrepareAutopartitionedTopic(setup);
625+
626+
std::vector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent> messages;
627+
628+
auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
629+
messages.push_back(x);
630+
return true;
631+
});
632+
633+
{
634+
Cerr << ">>>>> Alter topic" << Endl << Flush;
635+
TTopicClient client(setup.MakeDriver());
636+
TAlterTopicSettings settings;
637+
settings.SetRetentionPeriod(TDuration::Hours(1));
638+
client.AlterTopic(TEST_TOPIC, settings).GetValueSync();
639+
}
640+
641+
// Wait recheck acl happened
642+
messages[0].GetMessages()[0].Commit(); // trigger recheck
643+
Sleep(TDuration::Seconds(5));
644+
645+
Cerr << ">>>>> Commit message" << Endl << Flush;
646+
647+
bool first = true;
648+
for (auto& e : messages) {
649+
for (auto& m :e.GetMessages()) {
650+
if (first) {
651+
first = false;
652+
continue;
653+
}
654+
m.Commit();
655+
}
656+
}
657+
658+
// Wait commit happened
659+
Sleep(TDuration::Seconds(5));
660+
661+
result.Reader->Close();
662+
}
663+
621664
}
622665

623666
} // namespace NKikimr

ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
804804
THashMap<std::pair<TString, ui32>, TPartitionActorInfo> Partitions; //topic[ClientSideName!]:partition -> info
805805

806806
THashMap<TString, NPersQueue::TTopicConverterPtr> FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching
807-
THashMap<TString, TTopicHolder> Topics; // PrimaryName ->topic info
807+
THashMap<TString, TTopicHolder::TPtr> Topics; // PrimaryName ->topic info
808808

809809
TVector<ui32> Groups;
810810
bool ReadOnlyLocal;

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class TPartitionActor : public NActors::TActorBootstrapped<TPartitionActor> {
146146
public:
147147
TPartitionActor(const TActorId& parentId, const TString& clientId, const ui64 cookie, const TString& session, const ui32 generation,
148148
const ui32 step, const NPersQueue::TTopicConverterPtr& topic, const TString& database, const ui32 partition, const ui64 tabletID,
149-
const TReadSessionActor::TTopicCounters& counters, const TString& clientDC, std::set<NPQ::TPartitionGraph::Node*> parents);
149+
const TReadSessionActor::TTopicCounters& counters, const TString& clientDC, const TTopicHolder::TPtr& topicHolder);
150150
~TPartitionActor();
151151

152152
void Bootstrap(const NActors::TActorContext& ctx);
@@ -212,6 +212,7 @@ class TPartitionActor : public NActors::TActorBootstrapped<TPartitionActor> {
212212
void CommitDone(ui64 cookie, const TActorContext& ctx);
213213
void SendPartitionReady(const TActorContext& ctx);
214214

215+
const std::set<NPQ::TPartitionGraph::Node*>& GetParents() const;
215216
private:
216217
const TActorId ParentId;
217218
const TString InternalClientId;
@@ -267,7 +268,7 @@ class TPartitionActor : public NActors::TActorBootstrapped<TPartitionActor> {
267268
bool ReadingFinishedSent;
268269

269270
std::unordered_map<ui64, std::shared_ptr<NKikimr::NGRpcProxy::V1::TDistributedCommitHelper>> Kqps;
270-
std::set<NPQ::TPartitionGraph::Node*> Parents;
271+
const TTopicHolder::TPtr TopicHolder;
271272
};
272273

273274

@@ -347,8 +348,8 @@ void TReadSessionActor::Die(const TActorContext& ctx) {
347348
}
348349

349350
for (auto& t : Topics) {
350-
if (t.second.PipeClient)
351-
NTabletPipe::CloseClient(ctx, t.second.PipeClient);
351+
if (t.second->PipeClient)
352+
NTabletPipe::CloseClient(ctx, t.second->PipeClient);
352353
}
353354
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD");
354355

@@ -844,7 +845,7 @@ void TReadSessionActor::RegisterSessions(const TActorContext& ctx) {
844845

845846
for (auto& t : Topics) {
846847
auto& topic = t.first;
847-
RegisterSession(t.second.PipeClient, topic, ctx);
848+
RegisterSession(t.second->PipeClient, topic, ctx);
848849
NumPartitionsFromTopic[topic] = 0;
849850
}
850851
}
@@ -978,14 +979,14 @@ void TReadSessionActor::Handle(V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const
978979

979980
for (auto& [name, t] : ev->Get()->TopicAndTablets) {
980981
auto& topicHolder = Topics[t.TopicNameConverter->GetInternalName()];
981-
topicHolder.TabletID = t.TabletID;
982-
topicHolder.CloudId = t.CloudId;
983-
topicHolder.DbId = t.DbId;
984-
topicHolder.DbPath = t.DbPath;
985-
topicHolder.IsServerless = t.IsServerless;
986-
topicHolder.FolderId = t.FolderId;
987-
topicHolder.FullConverter = t.TopicNameConverter;
988-
topicHolder.PartitionGraph = t.PartitionGraph;
982+
topicHolder->TabletID = t.TabletID;
983+
topicHolder->CloudId = t.CloudId;
984+
topicHolder->DbId = t.DbId;
985+
topicHolder->DbPath = t.DbPath;
986+
topicHolder->IsServerless = t.IsServerless;
987+
topicHolder->FolderId = t.FolderId;
988+
topicHolder->FullConverter = t.TopicNameConverter;
989+
topicHolder->PartitionGraph = t.PartitionGraph;
989990
FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter;
990991
const auto& second = t.TopicNameConverter->GetSecondaryPath();
991992
if (!second.empty()) {
@@ -999,7 +1000,7 @@ void TReadSessionActor::Handle(V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const
9991000
clientConfig.CheckAliveness = false;
10001001

10011002
clientConfig.RetryPolicy = RetryPolicyForPipes;
1002-
t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig));
1003+
t.second->PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second->TabletID, clientConfig));
10031004
}
10041005

10051006
RegisterSessions(ctx);
@@ -1014,7 +1015,7 @@ void TReadSessionActor::Handle(V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const
10141015
NPersQueue::NErrorCode::BAD_REQUEST, ctx);
10151016
return;
10161017
}
1017-
it->second.PartitionGraph = t.PartitionGraph;
1018+
it->second->PartitionGraph = t.PartitionGraph;
10181019
}
10191020
}
10201021

@@ -1054,7 +1055,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
10541055
Y_ABORT_UNLESS(!intName.empty());
10551056
auto jt = Topics.find(intName);
10561057

1057-
if (jt == Topics.end() || pipe != jt->second.PipeClient) { //this is message from old version of pipe
1058+
if (jt == Topics.end() || pipe != jt->second->PipeClient) { //this is message from old version of pipe
10581059
LOG_DEBUG_S(
10591060
ctx, NKikimrServices::PQ_READ_PROXY,
10601061
PQ_LOG_PREFIX << " ignored ev lock for topic = " << converterIter->second->GetPrintableString()
@@ -1063,7 +1064,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
10631064
return;
10641065
}
10651066

1066-
auto* partitionNode = jt->second.PartitionGraph->GetPartition(record.GetPartition());
1067+
auto* partitionNode = jt->second->PartitionGraph->GetPartition(record.GetPartition());
10671068
if (!partitionNode) {
10681069
LOG_DEBUG_S(
10691070
ctx, NKikimrServices::PQ_READ_PROXY,
@@ -1079,7 +1080,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
10791080
// ToDo[counters]
10801081
if (NumPartitionsFromTopic[intName]++ == 0) {
10811082
if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
1082-
SetupTopicCounters(converterIter->second, jt->second.CloudId, jt->second.DbId, jt->second.DbPath, jt->second.IsServerless, jt->second.FolderId);
1083+
SetupTopicCounters(converterIter->second, jt->second->CloudId, jt->second->DbId, jt->second->DbPath, jt->second->IsServerless, jt->second->FolderId);
10831084
} else {
10841085
SetupTopicCounters(converterIter->second);
10851086
}
@@ -1088,11 +1089,11 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
10881089
auto it = TopicCounters.find(intName);
10891090
Y_ABORT_UNLESS(it != TopicCounters.end());
10901091

1091-
auto database = jt->second.DbPath;
1092+
auto database = jt->second->DbPath;
10921093
IActor* partitionActor = new TPartitionActor(
10931094
ctx.SelfID, InternalClientId, Cookie, Session, record.GetGeneration(),
1094-
record.GetStep(), jt->second.FullConverter, database, record.GetPartition(), record.GetTabletId(), it->second,
1095-
ClientDC, jt->second.PartitionGraph->GetPartition(record.GetPartition())->AllParents
1095+
record.GetStep(), jt->second->FullConverter, database, record.GetPartition(), record.GetTabletId(), it->second,
1096+
ClientDC, jt->second
10961097
);
10971098

10981099
TActorId actorId = ctx.Register(partitionActor);
@@ -1102,8 +1103,8 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
11021103
Y_ABORT_UNLESS(record.GetGeneration() > 0);
11031104
//Partitions use clientside name !
11041105
auto pp = Partitions.insert({
1105-
std::make_pair(jt->second.FullConverter->GetClientsideName(), record.GetPartition()),
1106-
TPartitionActorInfo{actorId, (((ui64)record.GetGeneration()) << 32) + record.GetStep(), jt->second.FullConverter}
1106+
std::make_pair(jt->second->FullConverter->GetClientsideName(), record.GetPartition()),
1107+
TPartitionActorInfo{actorId, (((ui64)record.GetGeneration()) << 32) + record.GetStep(), jt->second->FullConverter}
11071108
});
11081109
Y_ABORT_UNLESS(pp.second);
11091110
if (SessionsActive) {
@@ -1142,7 +1143,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const T
11421143
Y_ABORT_UNLESS(ClientsideLocksAllowed);
11431144
TReadResponse result;
11441145
auto lock = result.MutableLock();
1145-
lock->SetTopic(topicIter->second.FullConverter->GetClientsideName());
1146+
lock->SetTopic(topicIter->second->FullConverter->GetClientsideName());
11461147
lock->SetPartition(ev->Get()->Partition);
11471148
lock->SetReadOffset(ev->Get()->Offset);
11481149
lock->SetEndOffset(ev->Get()->EndOffset);
@@ -1211,7 +1212,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, cons
12111212

12121213
TActorId pipe = ActorIdFromProto(record.GetPipeClient());
12131214

1214-
if (pipe != it->second.PipeClient) { //this is message from old version of pipe
1215+
if (pipe != it->second->PipeClient) { //this is message from old version of pipe
12151216
return;
12161217
}
12171218

@@ -1286,14 +1287,14 @@ void TReadSessionActor::InformBalancerAboutRelease(const THashMap<std::pair<TStr
12861287
Y_ABORT_UNLESS(jt != Topics.end());
12871288

12881289
req.SetSession(Session);
1289-
ActorIdToProto(jt->second.PipeClient, req.MutablePipeClient());
1290+
ActorIdToProto(jt->second->PipeClient, req.MutablePipeClient());
12901291
req.SetClientId(InternalClientId);
12911292
req.SetTopic(it->first.first);
12921293
req.SetPartition(it->first.second);
12931294

12941295
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " released: " << it->first.first << ":" << it->first.second);
12951296

1296-
NTabletPipe::SendData(ctx, jt->second.PipeClient, request.Release());
1297+
NTabletPipe::SendData(ctx, jt->second->PipeClient, request.Release());
12971298
}
12981299

12991300

@@ -1428,7 +1429,7 @@ bool TReadSessionActor::ProcessReleasePartition(const THashMap<std::pair<TString
14281429

14291430
bool TReadSessionActor::ProcessBalancerDead(const ui64 tablet, const TActorContext& ctx) {
14301431
for (auto& t : Topics) {
1431-
if (t.second.TabletID == tablet) {
1432+
if (t.second->TabletID == tablet) {
14321433
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " balancer for topic " << t.first << " is dead, restarting all from this topic");
14331434

14341435
//Drop all partitions from this topic
@@ -1451,14 +1452,14 @@ bool TReadSessionActor::ProcessBalancerDead(const ui64 tablet, const TActorConte
14511452
NTabletPipe::TClientConfig clientConfig;
14521453
clientConfig.CheckAliveness = false;
14531454
clientConfig.RetryPolicy = RetryPolicyForPipes;
1454-
t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig));
1455+
t.second->PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second->TabletID, clientConfig));
14551456
if (InitDone) {
14561457
if (PipeReconnects) {
14571458
++(*PipeReconnects);
14581459
++(*Errors);
14591460
}
14601461

1461-
RegisterSession(t.second.PipeClient, t.first, ctx);
1462+
RegisterSession(t.second->PipeClient, t.first, ctx);
14621463
}
14631464
}
14641465
}
@@ -1833,7 +1834,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadingStarted::TPtr& ev, const TA
18331834
}
18341835

18351836
auto& topic = it->second;
1836-
NTabletPipe::SendData(ctx, topic.PipeClient, new TEvPersQueue::TEvReadingPartitionStartedRequest(InternalClientId, msg->PartitionId));
1837+
NTabletPipe::SendData(ctx, topic->PipeClient, new TEvPersQueue::TEvReadingPartitionStartedRequest(InternalClientId, msg->PartitionId));
18371838
}
18381839

18391840
void TReadSessionActor::Handle(TEvPQProxy::TEvReadingFinished::TPtr& ev, const TActorContext& ctx) {
@@ -1845,7 +1846,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadingFinished::TPtr& ev, const T
18451846
}
18461847

18471848
auto& topic = it->second;
1848-
NTabletPipe::SendData(ctx, topic.PipeClient, new TEvPersQueue::TEvReadingPartitionFinishedRequest(InternalClientId, msg->PartitionId, false, msg->FirstMessage));
1849+
NTabletPipe::SendData(ctx, topic->PipeClient, new TEvPersQueue::TEvReadingPartitionFinishedRequest(InternalClientId, msg->PartitionId, false, msg->FirstMessage));
18491850
}
18501851

18511852

@@ -1854,7 +1855,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadingFinished::TPtr& ev, const T
18541855
TPartitionActor::TPartitionActor(
18551856
const TActorId& parentId, const TString& internalClientId, const ui64 cookie, const TString& session,
18561857
const ui32 generation, const ui32 step, const NPersQueue::TTopicConverterPtr& topic, const TString& database, const ui32 partition,
1857-
const ui64 tabletID, const TReadSessionActor::TTopicCounters& counters, const TString& clientDC, std::set<NPQ::TPartitionGraph::Node*> parents
1858+
const ui64 tabletID, const TReadSessionActor::TTopicCounters& counters, const TString& clientDC, const TTopicHolder::TPtr& topicHolder
18581859
)
18591860
: ParentId(parentId)
18601861
, InternalClientId(internalClientId)
@@ -1891,7 +1892,7 @@ TPartitionActor::TPartitionActor(
18911892
, Counters(counters)
18921893
, FirstRead(true)
18931894
, ReadingFinishedSent(false)
1894-
, Parents(parents)
1895+
, TopicHolder(topicHolder)
18951896
{
18961897
}
18971898

@@ -1922,11 +1923,21 @@ void TPartitionActor::CheckRelease(const TActorContext& ctx) {
19221923
}
19231924
}
19241925

1926+
const std::set<NPQ::TPartitionGraph::Node*>& TPartitionActor::GetParents() const {
1927+
const auto* partition = TopicHolder->PartitionGraph->GetPartition(Partition);
1928+
if (partition) {
1929+
return partition->AllParents;
1930+
}
1931+
1932+
static std::set<NPQ::TPartitionGraph::Node*> empty;
1933+
return empty;
1934+
}
19251935

19261936
void TPartitionActor::SendCommit(const ui64 readId, const ui64 offset, const TActorContext& ctx) {
1927-
if (!ClientHasAnyCommits && Parents.size() != 0) {
1937+
const auto& parents = GetParents();
1938+
if (!ClientHasAnyCommits && parents.size() != 0) {
19281939
std::vector<NKikimr::NGRpcProxy::V1::TDistributedCommitHelper::TCommitInfo> commits;
1929-
for (auto& parent: Parents) {
1940+
for (auto& parent: parents) {
19301941
NKikimr::NGRpcProxy::V1::TDistributedCommitHelper::TCommitInfo commit {.PartitionId = parent->Id, .Offset = Max<i64>(), .KillReadSession = false, .OnlyCheckCommitedToFinish = true, .ReadSessionId = Session};
19311942
commits.push_back(commit);
19321943
}

0 commit comments

Comments
 (0)