Skip to content

Commit c319be4

Browse files
TEvReadSet messages are being sent too often (#7482)
1 parent 3aa07ce commit c319be4

File tree

4 files changed

+50
-10
lines changed

4 files changed

+50
-10
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2938,6 +2938,10 @@ void TPartition::ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& ev
29382938
NKikimrPQ::TError::EKind kind,
29392939
const TString& reason)
29402940
{
2941+
PQ_LOG_D("schedule TEvPersQueue::TEvProposeTransactionResult(" <<
2942+
NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(statusCode) <<
2943+
")" <<
2944+
", reason=" << reason);
29412945
Replies.emplace_back(ActorIdFromProto(event.GetSourceActor()),
29422946
MakeReplyPropose(event,
29432947
statusCode,
@@ -3353,13 +3357,17 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
33533357

33543358
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
33553359
{
3360+
PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition");
3361+
33563362
Y_ABORT_UNLESS(IsSupportive());
33573363

33583364
PendingEvents.emplace_back(ev->ReleaseBase().Release());
33593365
}
33603366

33613367
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
33623368
{
3369+
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
3370+
33633371
Y_ABORT_UNLESS(IsSupportive());
33643372
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
33653373

ydb/core/persqueue/pq_impl.cpp

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,6 +1292,9 @@ TPartitionInfo& TPersQueue::GetPartitionInfo(const TPartitionId& partitionId)
12921292

12931293
void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorContext& ctx)
12941294
{
1295+
PQ_LOG_D("Handle TEvPQ::TEvPartitionCounters" <<
1296+
" PartitionId " << ev->Get()->Partition);
1297+
12951298
const auto& partitionId = ev->Get()->Partition;
12961299
auto& partition = GetPartitionInfo(partitionId);
12971300
auto diff = ev->Get()->Counters.MakeDiffForAggr(partition.Baseline);
@@ -2580,7 +2583,9 @@ const TPartitionInfo& TPersQueue::GetPartitionInfo(const NKikimrClient::TPersQue
25802583
const TWriteId writeId = GetWriteId(req);
25812584
ui32 originalPartitionId = req.GetPartition();
25822585

2583-
Y_ABORT_UNLESS(TxWrites.contains(writeId) && TxWrites.at(writeId).Partitions.contains(originalPartitionId));
2586+
Y_ABORT_UNLESS(TxWrites.contains(writeId) && TxWrites.at(writeId).Partitions.contains(originalPartitionId),
2587+
"PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}, Partition %" PRIu32,
2588+
TabletID(), writeId.NodeId, writeId.KeyId, originalPartitionId);
25842589

25852590
const TPartitionId& partitionId = TxWrites.at(writeId).Partitions.at(originalPartitionId);
25862591
Y_ABORT_UNLESS(Partitions.contains(partitionId));
@@ -3589,7 +3594,9 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
35893594

35903595
if (tx.WriteId.Defined()) {
35913596
const TWriteId& writeId = *tx.WriteId;
3592-
Y_ABORT_UNLESS(TxWrites.contains(writeId));
3597+
Y_ABORT_UNLESS(TxWrites.contains(writeId),
3598+
"PQ %" PRIu64 ", TxId %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
3599+
TabletID(), tx.TxId, writeId.NodeId, writeId.KeyId);
35933600
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
35943601
writeInfo.TxId = tx.TxId;
35953602
}
@@ -3901,7 +3908,9 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
39013908

39023909
if (tx.WriteId.Defined()) {
39033910
const TWriteId& writeId = *tx.WriteId;
3904-
Y_ABORT_UNLESS(TxWrites.contains(writeId));
3911+
Y_ABORT_UNLESS(TxWrites.contains(writeId),
3912+
"PQ %" PRIu64 ", TxId %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
3913+
TabletID(), tx.TxId, writeId.NodeId, writeId.KeyId);
39053914
const TTxWriteInfo& writeInfo = TxWrites.at(writeId);
39063915

39073916
for (auto& [originalPartitionId, partitionId] : writeInfo.Partitions) {
@@ -4146,9 +4155,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
41464155
PQ_LOG_D("TxId " << tx.TxId <<
41474156
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
41484157

4149-
[[fallthrough]];
4150-
4151-
case NKikimrPQ::TTransaction::WAIT_RS:
41524158
//
41534159
// the number of TEvReadSetAck sent should not be greater than the number of senders
41544160
// from TEvProposeTransaction
@@ -4160,6 +4166,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
41604166

41614167
SendEvReadSetToReceivers(ctx, tx);
41624168

4169+
[[fallthrough]];
4170+
4171+
case NKikimrPQ::TTransaction::WAIT_RS:
41634172
PQ_LOG_D("HaveParticipantsDecision " << tx.HaveParticipantsDecision());
41644173

41654174
if (tx.HaveParticipantsDecision()) {
@@ -4604,8 +4613,6 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
46044613
return;
46054614
}
46064615

4607-
PQ_LOG_D("delete write info for WriteId " << writeId << " and TxId " << writeInfo.TxId);
4608-
46094616
if (!writeInfo.TxId.Defined()) {
46104617
PQ_LOG_D("delete write info for WriteId " << writeId);
46114618
// the message TEvProposeTransaction will not come anymore
@@ -4656,7 +4663,9 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
46564663
auto* event = ev->Get();
46574664
Y_ABORT_UNLESS(event->PartitionId.WriteId.Defined());
46584665
const TWriteId& writeId = *event->PartitionId.WriteId;
4659-
Y_ABORT_UNLESS(TxWrites.contains(writeId));
4666+
Y_ABORT_UNLESS(TxWrites.contains(writeId),
4667+
"PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
4668+
TabletID(), writeId.NodeId, writeId.KeyId);
46604669
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
46614670
Y_ABORT_UNLESS(writeInfo.Partitions.contains(event->PartitionId.OriginalPartitionId));
46624671
const TPartitionId& partitionId = writeInfo.Partitions.at(event->PartitionId.OriginalPartitionId);
@@ -4690,7 +4699,9 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo
46904699
}
46914700

46924701
const TWriteId& writeId = *event->WriteId;
4693-
Y_ABORT_UNLESS(TxWrites.contains(writeId));
4702+
Y_ABORT_UNLESS(TxWrites.contains(writeId),
4703+
"PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
4704+
TabletID(), writeId.NodeId, writeId.KeyId);
46944705
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
46954706
Y_ABORT_UNLESS(writeInfo.Partitions.size() == 1);
46964707

ydb/core/persqueue/transaction.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction&
6060
}
6161
}
6262

63+
TString TDistributedTransaction::LogPrefix() const
64+
{
65+
return TStringBuilder() << "[TxId: " << TxId << "] ";
66+
}
67+
6368
void TDistributedTransaction::InitDataTransaction(const NKikimrPQ::TTransaction& tx)
6469
{
6570
InitPartitions(tx.GetOperations());
@@ -208,12 +213,16 @@ void TDistributedTransaction::OnPlanStep(ui64 step)
208213

209214
void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPredicateResult& event)
210215
{
216+
PQ_LOG_D("Handle TEvTxCalcPredicateResult");
217+
211218
OnPartitionResult(event,
212219
event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT);
213220
}
214221

215222
void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event)
216223
{
224+
PQ_LOG_D("Handle TEvProposePartitionConfigResult");
225+
217226
OnPartitionResult(event,
218227
NKikimrTx::TReadSetData::DECISION_COMMIT);
219228
}
@@ -229,12 +238,16 @@ void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decisi
229238
SetDecision(SelfDecision, decision);
230239

231240
++PartitionRepliesCount;
241+
242+
PQ_LOG_D("Partition responses " << PartitionRepliesCount << "/" << PartitionRepliesExpected);
232243
}
233244

234245
void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event,
235246
const TActorId& sender,
236247
std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack)
237248
{
249+
PQ_LOG_D("Handle TEvReadSet");
250+
238251
Y_ABORT_UNLESS((Step == Max<ui64>()) || (event.HasStep() && (Step == event.GetStep())));
239252
Y_ABORT_UNLESS(event.HasTxId() && (TxId == event.GetTxId()));
240253

@@ -249,6 +262,8 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event,
249262
if (!p.HasPredicate()) {
250263
p.SetPredicate(data.GetDecision() == NKikimrTx::TReadSetData::DECISION_COMMIT);
251264
++ReadSetCount;
265+
266+
PQ_LOG_D("Predicates " << ReadSetCount << "/" << PredicatesReceived.size());
252267
}
253268
} else {
254269
Y_DEBUG_ABORT("unknown sender tablet %" PRIu64, event.GetTabletProducer());
@@ -257,12 +272,16 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event,
257272

258273
void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event)
259274
{
275+
PQ_LOG_D("Handle TEvReadSetAck");
276+
260277
Y_ABORT_UNLESS(event.HasStep() && (Step == event.GetStep()));
261278
Y_ABORT_UNLESS(event.HasTxId() && (TxId == event.GetTxId()));
262279

263280
if (PredicateRecipients.contains(event.GetTabletConsumer())) {
264281
PredicateRecipients[event.GetTabletConsumer()] = true;
265282
++PredicateAcksCount;
283+
284+
PQ_LOG_D("Predicate acks " << PredicateAcksCount << "/" << PredicateRecipients.size());
266285
}
267286
}
268287

ydb/core/persqueue/transaction.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ struct TDistributedTransaction {
9393
template<class E>
9494
void OnPartitionResult(const E& event, EDecision decision);
9595

96+
TString LogPrefix() const;
97+
9698
struct TSerializedMessage {
9799
ui32 Type;
98100
TIntrusivePtr<TEventSerializedData> Data;

0 commit comments

Comments
 (0)