Skip to content

Commit fde4ec9

Browse files
[-] fix bugs
1 parent fd5ab83 commit fde4ec9

File tree

5 files changed

+114
-42
lines changed

5 files changed

+114
-42
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ static constexpr ui32 CACHE_SIZE = 100_MB;
4141
static constexpr ui32 MAX_BYTES = 25_MB;
4242
static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048;
4343
static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB;
44+
static constexpr ui32 MAX_TXS = 1000;
4445

4546
struct TChangeNotification {
4647
TChangeNotification(const TActorId& actor, const ui64 txId)
@@ -699,17 +700,17 @@ void TPersQueue::HandleTransactionsReadResponse(NKikimrClient::TResponse&& resp,
699700
(resp.ReadRangeResultSize() == 1) &&
700701
(resp.HasSetExecutorFastLogPolicyResult()) &&
701702
(resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK);
703+
if (!ok) {
704+
PQ_LOG_ERROR_AND_DIE("Transactions read error: " << resp.ShortDebugString());
705+
return;
706+
}
702707
const auto& result = resp.GetReadRangeResult(0);
703708
auto status = result.GetStatus();
704709
if (status != NKikimrProto::OVERRUN &&
705710
status != NKikimrProto::OK &&
706711
status != NKikimrProto::NODATA) {
707712
ok = false;
708713
}
709-
if (!ok) {
710-
PQ_LOG_ERROR_AND_DIE("Transactions read error: " << resp.ShortDebugString());
711-
return;
712-
}
713714

714715
TransactionsReadResults.emplace_back(std::move(result));
715716

@@ -2972,8 +2973,10 @@ void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
29722973
continue;
29732974
}
29742975

2975-
for (auto& message : tx->GetBindedMsgs(tabletId)) {
2976-
PipeClientCache->Send(ctx, tabletId, message.Type, message.Data);
2976+
for (const auto& message : tx->GetBindedMsgs(tabletId)) {
2977+
auto event = std::make_unique<TEvTxProcessing::TEvReadSet>();
2978+
event->Record = message;
2979+
PipeClientCache->Send(ctx, tabletId, event.release());
29772980
}
29782981
}
29792982
}
@@ -3286,10 +3289,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
32863289
return;
32873290
}
32883291

3289-
//
3290-
// TODO(abcdef): сохранить пока инициализируемся. TEvPersQueue::TEvHasDataInfo::TPtr как образец. не только конфиг. Inited==true
3291-
//
3292-
32933292
if (txBody.OperationsSize() <= 0) {
32943293
PQ_LOG_D("TxId " << event.GetTxId() << " empty list of operations");
32953294
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
@@ -3358,7 +3357,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33583357
return;
33593358
}
33603359

3361-
33623360
if (txBody.GetImmediate()) {
33633361
PQ_LOG_D("immediate transaction");
33643362
TPartitionId originalPartitionId(txBody.GetOperations(0).GetPartitionId());
@@ -3372,6 +3370,15 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33723370

33733371
ctx.Send(partition.Actor, ev.Release());
33743372
} else {
3373+
if ((EvProposeTransactionQueue.size() + Txs.size()) >= MAX_TXS) {
3374+
SendProposeTransactionOverloaded(ActorIdFromProto(event.GetSourceActor()),
3375+
event.GetTxId(),
3376+
NKikimrPQ::TError::ERROR,
3377+
"too many transactions",
3378+
ctx);
3379+
return;
3380+
}
3381+
33753382
PQ_LOG_D("distributed transaction");
33763383
EvProposeTransactionQueue.emplace_back(ev.Release());
33773384

@@ -4159,7 +4166,7 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
41594166

41604167
void TPersQueue::SendToPipe(ui64 tabletId,
41614168
TDistributedTransaction& tx,
4162-
std::unique_ptr<IEventBase> event,
4169+
std::unique_ptr<TEvTxProcessing::TEvReadSet> event,
41634170
const TActorContext& ctx)
41644171
{
41654172
Y_ABORT_UNLESS(event);
@@ -4678,16 +4685,17 @@ bool TPersQueue::AllTransactionsHaveBeenProcessed() const
46784685
return EvProposeTransactionQueue.empty() && Txs.empty();
46794686
}
46804687

4681-
void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
4682-
ui64 txId,
4683-
NKikimrPQ::TError::EKind kind,
4684-
const TString& reason,
4685-
const TActorContext& ctx)
4688+
void TPersQueue::SendProposeTransactionResult(const TActorId& target,
4689+
ui64 txId,
4690+
NKikimrPQ::TEvProposeTransactionResult::EStatus status,
4691+
NKikimrPQ::TError::EKind kind,
4692+
const TString& reason,
4693+
const TActorContext& ctx)
46864694
{
46874695
auto event = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();
46884696

46894697
event->Record.SetOrigin(TabletID());
4690-
event->Record.SetStatus(NKikimrPQ::TEvProposeTransactionResult::ABORTED);
4698+
event->Record.SetStatus(status);
46914699
event->Record.SetTxId(txId);
46924700

46934701
if (kind != NKikimrPQ::TError::OK) {
@@ -4703,6 +4711,34 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
47034711
ctx.Send(target, std::move(event));
47044712
}
47054713

4714+
void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
4715+
ui64 txId,
4716+
NKikimrPQ::TError::EKind kind,
4717+
const TString& reason,
4718+
const TActorContext& ctx)
4719+
{
4720+
SendProposeTransactionResult(target,
4721+
txId,
4722+
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
4723+
kind,
4724+
reason,
4725+
ctx);
4726+
}
4727+
4728+
void TPersQueue::SendProposeTransactionOverloaded(const TActorId& target,
4729+
ui64 txId,
4730+
NKikimrPQ::TError::EKind kind,
4731+
const TString& reason,
4732+
const TActorContext& ctx)
4733+
{
4734+
SendProposeTransactionResult(target,
4735+
txId,
4736+
NKikimrPQ::TEvProposeTransactionResult::OVERLOADED,
4737+
kind,
4738+
reason,
4739+
ctx);
4740+
}
4741+
47064742
void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
47074743
TDistributedTransaction& tx)
47084744
{

ydb/core/persqueue/pq_impl.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,11 +359,22 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
359359
void EndWriteTabletState(const NKikimrClient::TResponse& resp,
360360
const TActorContext& ctx);
361361

362+
void SendProposeTransactionResult(const TActorId& target,
363+
ui64 txId,
364+
NKikimrPQ::TEvProposeTransactionResult::EStatus status,
365+
NKikimrPQ::TError::EKind kind,
366+
const TString& reason,
367+
const TActorContext& ctx);
362368
void SendProposeTransactionAbort(const TActorId& target,
363369
ui64 txId,
364370
NKikimrPQ::TError::EKind kind,
365371
const TString& reason,
366372
const TActorContext& ctx);
373+
void SendProposeTransactionOverloaded(const TActorId& target,
374+
ui64 txId,
375+
NKikimrPQ::TError::EKind kind,
376+
const TString& reason,
377+
const TActorContext& ctx);
367378

368379
void Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx);
369380
void HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> event,
@@ -407,7 +418,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
407418

408419
void SendToPipe(ui64 tabletId,
409420
TDistributedTransaction& tx,
410-
std::unique_ptr<IEventBase> event,
421+
std::unique_ptr<TEvTxProcessing::TEvReadSet> event,
411422
const TActorContext& ctx);
412423

413424
void InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,

ydb/core/persqueue/transaction.cpp

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -454,28 +454,23 @@ TString TDistributedTransaction::GetKey() const
454454
return GetTxKey(TxId);
455455
}
456456

457-
void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const IEventBase& event)
457+
void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const TEvTxProcessing::TEvReadSet& event)
458458
{
459-
Y_ABORT_UNLESS(event.IsSerializable());
460-
461-
TAllocChunkSerializer serializer;
462-
Y_ABORT_UNLESS(event.SerializeToArcadiaStream(&serializer));
463-
auto data = serializer.Release(event.CreateSerializationInfo());
464-
OutputMsgs[tabletId].emplace_back(event.Type(), std::move(data));
459+
OutputMsgs[tabletId].push_back(event.Record);
465460
}
466461

467462
void TDistributedTransaction::UnbindMsgsFromPipe(ui64 tabletId)
468463
{
469464
OutputMsgs.erase(tabletId);
470465
}
471466

472-
auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>&
467+
const TVector<NKikimrTx::TEvReadSet>& TDistributedTransaction::GetBindedMsgs(ui64 tabletId)
473468
{
474469
if (auto p = OutputMsgs.find(tabletId); p != OutputMsgs.end()) {
475470
return p->second;
476471
}
477472

478-
static TVector<TSerializedMessage> empty;
473+
static TVector<NKikimrTx::TEvReadSet> empty;
479474

480475
return empty;
481476
}

ydb/core/persqueue/transaction.h

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,22 +98,11 @@ struct TDistributedTransaction {
9898

9999
TString LogPrefix() const;
100100

101-
struct TSerializedMessage {
102-
ui32 Type;
103-
TIntrusivePtr<TEventSerializedData> Data;
101+
THashMap<ui64, TVector<NKikimrTx::TEvReadSet>> OutputMsgs;
104102

105-
TSerializedMessage(ui32 type, TIntrusivePtr<TEventSerializedData> data) :
106-
Type(type),
107-
Data(data)
108-
{
109-
}
110-
};
111-
112-
THashMap<ui64, TVector<TSerializedMessage>> OutputMsgs;
113-
114-
void BindMsgToPipe(ui64 tabletId, const IEventBase& event);
103+
void BindMsgToPipe(ui64 tabletId, const TEvTxProcessing::TEvReadSet& event);
115104
void UnbindMsgsFromPipe(ui64 tabletId);
116-
const TVector<TSerializedMessage>& GetBindedMsgs(ui64 tabletId);
105+
const TVector<NKikimrTx::TEvReadSet>& GetBindedMsgs(ui64 tabletId);
117106

118107
bool HasWriteOperations = false;
119108
size_t PredicateAcksCount = 0;

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2107,6 +2107,47 @@ Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
21072107
WaitForTheTransactionToBeDeleted(txId);
21082108
}
21092109

2110+
Y_UNIT_TEST_F(Limit_On_The_Number_Of_Transactons, TPQTabletFixture)
2111+
{
2112+
const ui64 mockTabletId = MakeTabletID(false, 22222);
2113+
const ui64 txId = 67890;
2114+
2115+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
2116+
2117+
for (ui64 i = 0; i < 1002; ++i) {
2118+
SendProposeTransactionRequest({.TxId=txId + i,
2119+
.Senders={mockTabletId}, .Receivers={mockTabletId},
2120+
.TxOps={
2121+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
2122+
}});
2123+
}
2124+
2125+
size_t preparedCount = 0;
2126+
size_t overloadedCount = 0;
2127+
2128+
for (ui64 i = 0; i < 1002; ++i) {
2129+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPersQueue::TEvProposeTransactionResult>();
2130+
UNIT_ASSERT(event != nullptr);
2131+
2132+
UNIT_ASSERT(event->Record.HasStatus());
2133+
2134+
const auto status = event->Record.GetStatus();
2135+
switch (status) {
2136+
case NKikimrPQ::TEvProposeTransactionResult::PREPARED:
2137+
++preparedCount;
2138+
break;
2139+
case NKikimrPQ::TEvProposeTransactionResult::OVERLOADED:
2140+
++overloadedCount;
2141+
break;
2142+
default:
2143+
UNIT_FAIL("unexpected transaction status " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(status));
2144+
}
2145+
}
2146+
2147+
UNIT_ASSERT_EQUAL(preparedCount, 1000);
2148+
UNIT_ASSERT_EQUAL(overloadedCount, 2);
2149+
}
2150+
21102151
}
21112152

21122153
}

0 commit comments

Comments
 (0)