Skip to content

Commit 7fe0768

Browse files
The counter value for UserInfo is leaking (#15233)
1 parent 94077e5 commit 7fe0768

File tree

4 files changed

+29
-6
lines changed

4 files changed

+29
-6
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2969,6 +2969,7 @@ TPartition::EProcessResult TPartition::PreProcessUserAct(
29692969
void TPartition::CommitUserAct(TEvPQ::TEvSetClientInfo& act) {
29702970
const bool strictCommitOffset = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && act.Strict);
29712971
const TString& user = act.ClientId;
2972+
RemoveUserAct(user);
29722973
const auto& ctx = ActorContext();
29732974
if (!PendingUsersInfo.contains(user) && AffectedUsers.contains(user)) {
29742975
switch (act.Type) {
@@ -3088,7 +3089,6 @@ void TPartition::CommitUserAct(TEvPQ::TEvSetClientInfo& act) {
30883089
return;
30893090
}
30903091

3091-
RemoveUserAct(act.ClientId);
30923092
return EmulatePostProcessUserAct(act, userInfo, ActorContext());
30933093
}
30943094

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ namespace NKikimr::NPQ {
2929

3030
static const ui32 MAX_BLOB_PART_SIZE = 500_KB;
3131
static const ui32 DEFAULT_BUCKET_COUNTER_MULTIPLIER = 20;
32+
static const ui32 MAX_USER_ACTS = 1000;
3233

3334
using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
3435

ydb/core/persqueue/partition_read.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828

2929
namespace NKikimr::NPQ {
3030

31-
static const ui32 MAX_USER_ACTS = 1000;
32-
3331
TMaybe<TInstant> GetReadFrom(ui32 maxTimeLagMs, ui64 readTimestampMs, TInstant consumerReadFromTimestamp, const TActorContext& ctx) {
3432
if (!(maxTimeLagMs > 0 || readTimestampMs > 0 || consumerReadFromTimestamp > TInstant::MilliSeconds(1))) {
3533
return {};

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
225225
void SendSetOffset(ui64 cookie,
226226
const TString& clientId,
227227
ui64 offset,
228-
const TString& sessionId);
228+
const TString& sessionId,
229+
bool strict = false);
229230
void SendGetOffset(ui64 cookie,
230231
const TString& clientId);
231232
void WaitCmdWrite(const TCmdWriteMatcher& matcher = {});
@@ -488,7 +489,8 @@ void TPartitionFixture::SendCreateSession(ui64 cookie,
488489
void TPartitionFixture::SendSetOffset(ui64 cookie,
489490
const TString& clientId,
490491
ui64 offset,
491-
const TString& sessionId)
492+
const TString& sessionId,
493+
bool strict)
492494
{
493495
auto event = MakeHolder<TEvPQ::TEvSetClientInfo>(cookie,
494496
clientId,
@@ -498,6 +500,7 @@ void TPartitionFixture::SendSetOffset(ui64 cookie,
498500
0,
499501
0,
500502
TActorId{});
503+
event->Strict = strict;
501504
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
502505
}
503506

@@ -712,7 +715,6 @@ void TPartitionFixture::WaitErrorResponse(const TErrorMatcher& matcher)
712715
}
713716

714717
if (matcher.Error) {
715-
716718
UNIT_ASSERT_VALUES_EQUAL(*matcher.Error, event->Error);
717719
}
718720
}
@@ -1796,6 +1798,28 @@ void TPartitionTxTestHelper::WaitTxPredicateReplyImpl(ui64 userActId, bool statu
17961798
#endif
17971799
}
17981800

1801+
Y_UNIT_TEST_F(UserActCount, TPartitionFixture)
1802+
{
1803+
// In the test, we check that the reference count for `UserInfo` decreases in case of errors. To do this,
1804+
// we send a large number of requests to which the server will respond with an error.
1805+
1806+
CreatePartition();
1807+
1808+
SendCreateSession(1, "client", "session-id", 2, 3);
1809+
WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="session-id", .Offset=0, .Generation=2, .Step=3}}}});
1810+
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
1811+
WaitProxyResponse({.Cookie=1});
1812+
1813+
for (ui64 k = 0; k <= MAX_USER_ACTS; ++k) {
1814+
const ui64 cookie = 2 + k;
1815+
// 1 > EndOffset
1816+
SendSetOffset(cookie, "client", 1, "session-id", true); // strict = true
1817+
WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="session-id", .Offset=0, .Generation=2, .Step=3}}}});
1818+
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
1819+
WaitErrorResponse({.Cookie=cookie, .ErrorCode=NPersQueue::NErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE});
1820+
}
1821+
}
1822+
17991823
Y_UNIT_TEST_F(Batching, TPartitionFixture)
18001824
{
18011825
CreatePartition();

0 commit comments

Comments
 (0)