Skip to content

Commit f9f2c5c

Browse files
The serialized TEvReadSet takes up a lot of memory (#18301)
2 parents fd5ab83 + 627838d commit f9f2c5c

File tree

5 files changed

+115
-42
lines changed

5 files changed

+115
-42
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ static constexpr ui32 CACHE_SIZE = 100_MB;
4141
static constexpr ui32 MAX_BYTES = 25_MB;
4242
static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048;
4343
static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB;
44+
static constexpr ui32 MAX_TXS = 1000;
4445

4546
struct TChangeNotification {
4647
TChangeNotification(const TActorId& actor, const ui64 txId)
@@ -699,17 +700,18 @@ void TPersQueue::HandleTransactionsReadResponse(NKikimrClient::TResponse&& resp,
699700
(resp.ReadRangeResultSize() == 1) &&
700701
(resp.HasSetExecutorFastLogPolicyResult()) &&
701702
(resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK);
703+
if (!ok) {
704+
PQ_LOG_ERROR_AND_DIE("Transactions read error: " << resp.ShortDebugString());
705+
return;
706+
}
707+
702708
const auto& result = resp.GetReadRangeResult(0);
703709
auto status = result.GetStatus();
704710
if (status != NKikimrProto::OVERRUN &&
705711
status != NKikimrProto::OK &&
706712
status != NKikimrProto::NODATA) {
707713
ok = false;
708714
}
709-
if (!ok) {
710-
PQ_LOG_ERROR_AND_DIE("Transactions read error: " << resp.ShortDebugString());
711-
return;
712-
}
713715

714716
TransactionsReadResults.emplace_back(std::move(result));
715717

@@ -2972,8 +2974,10 @@ void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
29722974
continue;
29732975
}
29742976

2975-
for (auto& message : tx->GetBindedMsgs(tabletId)) {
2976-
PipeClientCache->Send(ctx, tabletId, message.Type, message.Data);
2977+
for (const auto& message : tx->GetBindedMsgs(tabletId)) {
2978+
auto event = std::make_unique<TEvTxProcessing::TEvReadSet>();
2979+
event->Record = message;
2980+
PipeClientCache->Send(ctx, tabletId, event.release());
29772981
}
29782982
}
29792983
}
@@ -3286,10 +3290,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
32863290
return;
32873291
}
32883292

3289-
//
3290-
// TODO(abcdef): сохранить пока инициализируемся. TEvPersQueue::TEvHasDataInfo::TPtr как образец. не только конфиг. Inited==true
3291-
//
3292-
32933293
if (txBody.OperationsSize() <= 0) {
32943294
PQ_LOG_D("TxId " << event.GetTxId() << " empty list of operations");
32953295
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
@@ -3358,7 +3358,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33583358
return;
33593359
}
33603360

3361-
33623361
if (txBody.GetImmediate()) {
33633362
PQ_LOG_D("immediate transaction");
33643363
TPartitionId originalPartitionId(txBody.GetOperations(0).GetPartitionId());
@@ -3372,6 +3371,15 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33723371

33733372
ctx.Send(partition.Actor, ev.Release());
33743373
} else {
3374+
if ((EvProposeTransactionQueue.size() + Txs.size()) >= MAX_TXS) {
3375+
SendProposeTransactionOverloaded(ActorIdFromProto(event.GetSourceActor()),
3376+
event.GetTxId(),
3377+
NKikimrPQ::TError::ERROR,
3378+
"too many transactions",
3379+
ctx);
3380+
return;
3381+
}
3382+
33753383
PQ_LOG_D("distributed transaction");
33763384
EvProposeTransactionQueue.emplace_back(ev.Release());
33773385

@@ -4159,7 +4167,7 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
41594167

41604168
void TPersQueue::SendToPipe(ui64 tabletId,
41614169
TDistributedTransaction& tx,
4162-
std::unique_ptr<IEventBase> event,
4170+
std::unique_ptr<TEvTxProcessing::TEvReadSet> event,
41634171
const TActorContext& ctx)
41644172
{
41654173
Y_ABORT_UNLESS(event);
@@ -4678,16 +4686,17 @@ bool TPersQueue::AllTransactionsHaveBeenProcessed() const
46784686
return EvProposeTransactionQueue.empty() && Txs.empty();
46794687
}
46804688

4681-
void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
4682-
ui64 txId,
4683-
NKikimrPQ::TError::EKind kind,
4684-
const TString& reason,
4685-
const TActorContext& ctx)
4689+
void TPersQueue::SendProposeTransactionResult(const TActorId& target,
4690+
ui64 txId,
4691+
NKikimrPQ::TEvProposeTransactionResult::EStatus status,
4692+
NKikimrPQ::TError::EKind kind,
4693+
const TString& reason,
4694+
const TActorContext& ctx)
46864695
{
46874696
auto event = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();
46884697

46894698
event->Record.SetOrigin(TabletID());
4690-
event->Record.SetStatus(NKikimrPQ::TEvProposeTransactionResult::ABORTED);
4699+
event->Record.SetStatus(status);
46914700
event->Record.SetTxId(txId);
46924701

46934702
if (kind != NKikimrPQ::TError::OK) {
@@ -4703,6 +4712,34 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
47034712
ctx.Send(target, std::move(event));
47044713
}
47054714

4715+
void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
4716+
ui64 txId,
4717+
NKikimrPQ::TError::EKind kind,
4718+
const TString& reason,
4719+
const TActorContext& ctx)
4720+
{
4721+
SendProposeTransactionResult(target,
4722+
txId,
4723+
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
4724+
kind,
4725+
reason,
4726+
ctx);
4727+
}
4728+
4729+
void TPersQueue::SendProposeTransactionOverloaded(const TActorId& target,
4730+
ui64 txId,
4731+
NKikimrPQ::TError::EKind kind,
4732+
const TString& reason,
4733+
const TActorContext& ctx)
4734+
{
4735+
SendProposeTransactionResult(target,
4736+
txId,
4737+
NKikimrPQ::TEvProposeTransactionResult::OVERLOADED,
4738+
kind,
4739+
reason,
4740+
ctx);
4741+
}
4742+
47064743
void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
47074744
TDistributedTransaction& tx)
47084745
{

ydb/core/persqueue/pq_impl.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,11 +359,22 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
359359
void EndWriteTabletState(const NKikimrClient::TResponse& resp,
360360
const TActorContext& ctx);
361361

362+
void SendProposeTransactionResult(const TActorId& target,
363+
ui64 txId,
364+
NKikimrPQ::TEvProposeTransactionResult::EStatus status,
365+
NKikimrPQ::TError::EKind kind,
366+
const TString& reason,
367+
const TActorContext& ctx);
362368
void SendProposeTransactionAbort(const TActorId& target,
363369
ui64 txId,
364370
NKikimrPQ::TError::EKind kind,
365371
const TString& reason,
366372
const TActorContext& ctx);
373+
void SendProposeTransactionOverloaded(const TActorId& target,
374+
ui64 txId,
375+
NKikimrPQ::TError::EKind kind,
376+
const TString& reason,
377+
const TActorContext& ctx);
367378

368379
void Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx);
369380
void HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> event,
@@ -407,7 +418,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
407418

408419
void SendToPipe(ui64 tabletId,
409420
TDistributedTransaction& tx,
410-
std::unique_ptr<IEventBase> event,
421+
std::unique_ptr<TEvTxProcessing::TEvReadSet> event,
411422
const TActorContext& ctx);
412423

413424
void InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,

ydb/core/persqueue/transaction.cpp

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -454,28 +454,23 @@ TString TDistributedTransaction::GetKey() const
454454
return GetTxKey(TxId);
455455
}
456456

457-
void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const IEventBase& event)
457+
void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const TEvTxProcessing::TEvReadSet& event)
458458
{
459-
Y_ABORT_UNLESS(event.IsSerializable());
460-
461-
TAllocChunkSerializer serializer;
462-
Y_ABORT_UNLESS(event.SerializeToArcadiaStream(&serializer));
463-
auto data = serializer.Release(event.CreateSerializationInfo());
464-
OutputMsgs[tabletId].emplace_back(event.Type(), std::move(data));
459+
OutputMsgs[tabletId].push_back(event.Record);
465460
}
466461

467462
void TDistributedTransaction::UnbindMsgsFromPipe(ui64 tabletId)
468463
{
469464
OutputMsgs.erase(tabletId);
470465
}
471466

472-
auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>&
467+
const TVector<NKikimrTx::TEvReadSet>& TDistributedTransaction::GetBindedMsgs(ui64 tabletId)
473468
{
474469
if (auto p = OutputMsgs.find(tabletId); p != OutputMsgs.end()) {
475470
return p->second;
476471
}
477472

478-
static TVector<TSerializedMessage> empty;
473+
static TVector<NKikimrTx::TEvReadSet> empty;
479474

480475
return empty;
481476
}

ydb/core/persqueue/transaction.h

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,22 +98,11 @@ struct TDistributedTransaction {
9898

9999
TString LogPrefix() const;
100100

101-
struct TSerializedMessage {
102-
ui32 Type;
103-
TIntrusivePtr<TEventSerializedData> Data;
101+
THashMap<ui64, TVector<NKikimrTx::TEvReadSet>> OutputMsgs;
104102

105-
TSerializedMessage(ui32 type, TIntrusivePtr<TEventSerializedData> data) :
106-
Type(type),
107-
Data(data)
108-
{
109-
}
110-
};
111-
112-
THashMap<ui64, TVector<TSerializedMessage>> OutputMsgs;
113-
114-
void BindMsgToPipe(ui64 tabletId, const IEventBase& event);
103+
void BindMsgToPipe(ui64 tabletId, const TEvTxProcessing::TEvReadSet& event);
115104
void UnbindMsgsFromPipe(ui64 tabletId);
116-
const TVector<TSerializedMessage>& GetBindedMsgs(ui64 tabletId);
105+
const TVector<NKikimrTx::TEvReadSet>& GetBindedMsgs(ui64 tabletId);
117106

118107
bool HasWriteOperations = false;
119108
size_t PredicateAcksCount = 0;

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2107,6 +2107,47 @@ Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
21072107
WaitForTheTransactionToBeDeleted(txId);
21082108
}
21092109

2110+
Y_UNIT_TEST_F(Limit_On_The_Number_Of_Transactons, TPQTabletFixture)
2111+
{
2112+
const ui64 mockTabletId = MakeTabletID(false, 22222);
2113+
const ui64 txId = 67890;
2114+
2115+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
2116+
2117+
for (ui64 i = 0; i < 1002; ++i) {
2118+
SendProposeTransactionRequest({.TxId=txId + i,
2119+
.Senders={mockTabletId}, .Receivers={mockTabletId},
2120+
.TxOps={
2121+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
2122+
}});
2123+
}
2124+
2125+
size_t preparedCount = 0;
2126+
size_t overloadedCount = 0;
2127+
2128+
for (ui64 i = 0; i < 1002; ++i) {
2129+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPersQueue::TEvProposeTransactionResult>();
2130+
UNIT_ASSERT(event != nullptr);
2131+
2132+
UNIT_ASSERT(event->Record.HasStatus());
2133+
2134+
const auto status = event->Record.GetStatus();
2135+
switch (status) {
2136+
case NKikimrPQ::TEvProposeTransactionResult::PREPARED:
2137+
++preparedCount;
2138+
break;
2139+
case NKikimrPQ::TEvProposeTransactionResult::OVERLOADED:
2140+
++overloadedCount;
2141+
break;
2142+
default:
2143+
UNIT_FAIL("unexpected transaction status " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(status));
2144+
}
2145+
}
2146+
2147+
UNIT_ASSERT_EQUAL(preparedCount, 1000);
2148+
UNIT_ASSERT_EQUAL(overloadedCount, 2);
2149+
}
2150+
21102151
}
21112152

21122153
}

0 commit comments

Comments
 (0)