Skip to content

Commit 14bf58d

Browse files
Fix seqNo conflict on concurrent TXs (ydb-platform#15281) (ydb-platform#16847)
1 parent cf62ce8 commit 14bf58d

File tree

5 files changed

+201
-28
lines changed

5 files changed

+201
-28
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,16 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
12771277
}
12781278
txSourceIds.insert(s.first);
12791279
}
1280+
auto inFlightIter = TxInflightMaxSeqNoPerSourceId.find(s.first);
1281+
1282+
if (!inFlightIter.IsEnd()) {
1283+
if (s.second.MinSeqNo <= inFlightIter->second) {
1284+
tx.Predicate = false;
1285+
tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first;
1286+
tx.WriteInfoApplied = true;
1287+
break;
1288+
}
1289+
}
12801290

12811291
auto existing = knownSourceIds.find(s.first);
12821292
if (existing.IsEnd())
@@ -2385,6 +2395,13 @@ void TPartition::CommitWriteOperations(TTransaction& t)
23852395
if (!t.WriteInfo) {
23862396
return;
23872397
}
2398+
for (const auto& s : t.WriteInfo->SrcIdInfo) {
2399+
auto [iter, ins] = TxInflightMaxSeqNoPerSourceId.emplace(s.first, s.second.SeqNo);
2400+
if (!ins) {
2401+
Y_ABORT_UNLESS(iter->second < s.second.SeqNo);
2402+
iter->second = s.second.SeqNo;
2403+
}
2404+
}
23882405
const auto& ctx = ActorContext();
23892406

23902407
if (!HaveWriteMsg) {
@@ -2400,6 +2417,8 @@ void TPartition::CommitWriteOperations(TTransaction& t)
24002417
", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size());
24012418
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);
24022419

2420+
auto oldHeadOffset = NewHead.Offset;
2421+
24032422
if (!t.WriteInfo->BodyKeys.empty()) {
24042423
bool needCompactHead =
24052424
(Parameters->FirstCommitWriteOperations ? Head : NewHead).PackedSize != 0;
@@ -2486,12 +2505,16 @@ void TPartition::CommitWriteOperations(TTransaction& t)
24862505

24872506
WriteInflightSize += msg.Msg.Data.size();
24882507
ExecRequest(msg, *Parameters, PersistRequest.Get());
2489-
2490-
auto& info = TxSourceIdForPostPersist[blob.SourceId];
2491-
info.SeqNo = blob.SeqNo;
2492-
info.Offset = NewHead.Offset;
24932508
}
24942509
}
2510+
for (const auto& [srcId, info] : t.WriteInfo->SrcIdInfo) {
2511+
auto& sourceIdBatch = Parameters->SourceIdBatch;
2512+
auto sourceId = sourceIdBatch.GetSource(srcId);
2513+
sourceId.Update(info.SeqNo, info.Offset + oldHeadOffset, CurrentTimestamp);
2514+
auto& persistInfo = TxSourceIdForPostPersist[srcId];
2515+
persistInfo.SeqNo = info.SeqNo;
2516+
persistInfo.Offset = info.Offset + oldHeadOffset;
2517+
}
24952518

24962519
Parameters->FirstCommitWriteOperations = false;
24972520

ydb/core/persqueue/partition.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
665665
THashSet<TString> TxAffectedConsumers;
666666
THashSet<TString> SetOffsetAffectedConsumers;
667667
THashMap<TString, TSourceIdPostPersistInfo> TxSourceIdForPostPersist;
668+
THashMap<TString, ui64> TxInflightMaxSeqNoPerSourceId;
669+
668670

669671
ui32 MaxBlobSize;
670672
const ui32 TotalLevels = 4;

ydb/core/persqueue/partition_write.cpp

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
315315
already ? maxOffset : offset, CurrentTimestamp, already, maxSeqNo,
316316
PartitionQuotaWaitTimeForCurrentBlob, TopicQuotaWaitTimeForCurrentBlob, queueTime, writeTime, response.Span
317317
);
318+
318319
PQ_LOG_D("Answering for message sourceid: '" << EscapeC(s)
319320
<< "', Topic: '" << TopicName()
320321
<< "', Partition: " << Partition
@@ -521,6 +522,7 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
521522
SourceIdCounter.Use(sourceId, now);
522523
}
523524
TxSourceIdForPostPersist.clear();
525+
TxInflightMaxSeqNoPerSourceId.clear();
524526

525527
TxAffectedSourcesIds.clear();
526528
WriteAffectedSourcesIds.clear();
@@ -1042,6 +1044,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
10421044
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
10431045
return EProcessResult::ContinueDrop;
10441046
}
1047+
10451048
if (DiskIsFull) {
10461049
ScheduleReplyError(p.Cookie,
10471050
NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL,
@@ -1051,6 +1054,13 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
10511054
if (TxAffectedSourcesIds.contains(p.Msg.SourceId)) {
10521055
return EProcessResult::Blocked;
10531056
}
1057+
auto inflightMaxSeqNo = TxInflightMaxSeqNoPerSourceId.find(p.Msg.SourceId);
1058+
1059+
if (!inflightMaxSeqNo.IsEnd()) {
1060+
if (p.Msg.SeqNo <= inflightMaxSeqNo->second) {
1061+
return EProcessResult::Blocked;
1062+
}
1063+
}
10541064
WriteAffectedSourcesIds.insert(p.Msg.SourceId);
10551065
return EProcessResult::Continue;
10561066
}
@@ -1173,12 +1183,11 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
11731183
<< ". Writing seqNo: " << sourceId.UpdatedSeqNo()
11741184
<< ". EndOffset: " << EndOffset << ". CurOffset: " << curOffset << ". Offset: " << poffset
11751185
);
1176-
if (!p.Internal) {
1177-
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
1178-
MsgsDiscarded.Inc();
1179-
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
1180-
BytesDiscarded.Inc(p.Msg.Data.size());
1181-
}
1186+
Y_ENSURE(!p.Internal); // No Already for transactions;
1187+
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
1188+
MsgsDiscarded.Inc();
1189+
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
1190+
BytesDiscarded.Inc(p.Msg.Data.size());
11821191
} else {
11831192
if (!p.Internal) {
11841193
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
@@ -1225,6 +1234,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
12251234
", must be at least " << curOffset,
12261235
p,
12271236
NPersQueue::NErrorCode::EErrorCode::WRITE_ERROR_BAD_OFFSET);
1237+
12281238
return false;
12291239
}
12301240

@@ -1279,6 +1289,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
12791289
//this must not be happen - client sends gaps, fail this client till the end
12801290
//now no changes will leak
12811291
ctx.Send(Tablet, new TEvents::TEvPoisonPill());
1292+
12821293
return false;
12831294
}
12841295
WriteNewSizeFull += p.Msg.SourceId.size() + p.Msg.Data.size();
@@ -1380,7 +1391,6 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
13801391
++curOffset;
13811392
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
13821393
}
1383-
13841394
return true;
13851395
}
13861396

ydb/core/persqueue/ut/common/pq_ut_common.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ struct TTestContext {
127127

128128
static bool RequestTimeoutFilter(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration duration, TInstant& deadline) {
129129
if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) {
130-
Cerr << "Captured TEvents::TSystem::Wakeup to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
131130
if (runtime.FindActorName(event->GetRecipientRewrite()) == "PERSQUEUE_ANS_ACTOR") {
132131
return true;
133132
}

0 commit comments

Comments
 (0)