Skip to content

Commit 4805600

Browse files
authored
Support read from timestamp for topics autopartitioning (#11882)
1 parent e92e224 commit 4805600

21 files changed

+347
-65
lines changed

ydb/core/persqueue/blob.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ void TBatch::Pack() {
399399
Header.SetPayloadSize(PackedData.size());
400400
}
401401

402+
for (auto& b : Blobs) {
403+
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
404+
}
405+
402406

403407
TVector<TClientBlob> tmp;
404408
Blobs.swap(tmp);
@@ -414,11 +418,14 @@ void TBatch::Unpack() {
414418
UnpackTo(&Blobs);
415419
Y_ABORT_UNLESS(InternalPartsPos.empty());
416420
for (ui32 i = 0; i < Blobs.size(); ++i) {
417-
if (!Blobs[i].IsLastPart())
421+
auto& b = Blobs[i];
422+
if (!b.IsLastPart()) {
418423
InternalPartsPos.push_back(i);
424+
}
425+
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
419426
}
420427
Y_ABORT_UNLESS(InternalPartsPos.size() == GetInternalPartsCount());
421-
428+
422429
PackedData.Clear();
423430
}
424431

@@ -978,4 +985,3 @@ bool TPartitionedBlob::IsNextPart(const TString& sourceId, const ui64 seqNo, con
978985

979986
}// NPQ
980987
}// NKikimr
981-

ydb/core/persqueue/blob.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ struct TClientBlob {
101101
void SerializeTo(TBuffer& buffer) const;
102102
static TClientBlob Deserialize(const char *data, ui32 size);
103103

104-
static void CheckBlob(const TKey& key, const TString& blob);
104+
static void CheckBlob(const TKey& key, const TString& blob);
105105
};
106106

107107
static constexpr const ui32 MAX_BLOB_SIZE = 8_MB;
@@ -121,6 +121,7 @@ struct TBatch {
121121
TVector<ui32> InternalPartsPos;
122122
NKikimrPQ::TBatchHeader Header;
123123
TBuffer PackedData;
124+
TInstant EndWriteTimestamp;
124125

125126
TBatch()
126127
: Packed(false)
@@ -162,27 +163,42 @@ struct TBatch {
162163
Header.SetUnpackedSize(unpackedSize);
163164
Header.SetCount(count);
164165
Header.SetInternalPartsCount(InternalPartsPos.size());
166+
167+
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
165168
}
166169

167170
ui64 GetOffset() const {
168171
return Header.GetOffset();
169172
}
173+
170174
ui16 GetPartNo() const {
171175
return Header.GetPartNo();
172176
}
177+
173178
ui32 GetUnpackedSize() const {
174179
return Header.GetUnpackedSize();
175180
}
181+
176182
ui32 GetCount() const {
177183
return Header.GetCount();
178184
}
185+
179186
ui16 GetInternalPartsCount() const {
180187
return Header.GetInternalPartsCount();
181188
}
189+
182190
bool IsGreaterThan(ui64 offset, ui16 partNo) const {
183191
return GetOffset() > offset || GetOffset() == offset && GetPartNo() > partNo;
184192
}
185193

194+
bool Empty() const {
195+
return Blobs.empty();
196+
}
197+
198+
TInstant GetEndWriteTimestamp() const {
199+
return EndWriteTimestamp;
200+
}
201+
186202
TBatch(const NKikimrPQ::TBatchHeader &header, const char* data)
187203
: Packed(true)
188204
, Header(header)
@@ -239,7 +255,7 @@ struct THead {
239255
private:
240256
std::deque<TBatch> Batches;
241257
ui16 InternalPartsCount = 0;
242-
258+
243259
friend class TPartitionedBlob;
244260

245261
class TBatchAccessor {

ydb/core/persqueue/partition.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,14 @@ ui64 TPartition::ImportantClientsMinOffset() const {
313313
return minOffset;
314314
}
315315

316+
TInstant TPartition::GetEndWriteTimestamp() const {
317+
return EndWriteTimestamp;
318+
}
319+
320+
THead& TPartition::GetHead() {
321+
return Head;
322+
}
323+
316324
void TPartition::HandleWakeup(const TActorContext& ctx) {
317325
FilterDeadlinedWrites(ctx);
318326

@@ -356,6 +364,7 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
356364
meta.SetStartOffset(StartOffset);
357365
meta.SetEndOffset(Max(NewHead.GetNextOffset(), EndOffset));
358366
meta.SetSubDomainOutOfSpace(SubDomainOutOfSpace);
367+
meta.SetEndWriteTimestamp(PendingWriteTimestamp.MilliSeconds());
359368

360369
if (IsSupportive()) {
361370
auto* counterData = meta.MutableCounterData();

ydb/core/persqueue/partition.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
116116
friend TInitInfoRangeStep;
117117
friend TInitDataRangeStep;
118118
friend TInitDataStep;
119+
friend TInitEndWriteTimestampStep;
119120

120121
friend TPartitionSourceManager;
121122

@@ -440,6 +441,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
440441
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
441442
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
442443

444+
ui64 GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const;
443445

444446
public:
445447
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -470,6 +472,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
470472
// Minimal offset, the data from which cannot be deleted, because it is required by an important consumer
471473
ui64 ImportantClientsMinOffset() const;
472474

475+
TInstant GetEndWriteTimestamp() const; // For tests only
476+
THead& GetHead(); // For tests only
473477

474478
//Bootstrap sends kvRead
475479
//Become StateInit
@@ -666,6 +670,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
666670
// [DataKeysBody ][DataKeysHead ]
667671
ui64 StartOffset;
668672
ui64 EndOffset;
673+
TInstant EndWriteTimestamp;
674+
TInstant PendingWriteTimestamp;
669675

670676
ui64 WriteInflightSize;
671677
TActorId Tablet;

ydb/core/persqueue/partition_init.cpp

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ TInitializer::TInitializer(TPartition* partition)
3030
Steps.push_back(MakeHolder<TInitInfoRangeStep>(this));
3131
Steps.push_back(MakeHolder<TInitDataRangeStep>(this));
3232
Steps.push_back(MakeHolder<TInitDataStep>(this));
33+
Steps.push_back(MakeHolder<TInitEndWriteTimestampStep>(this));
3334

3435
CurrentStep = Steps.begin();
3536
}
@@ -308,14 +309,14 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T
308309
bool res = meta.ParseFromString(response.GetValue());
309310
Y_ABORT_UNLESS(res);
310311

311-
/* Bring back later, when switch to 21-2 will be unable
312-
StartOffset = meta.GetStartOffset();
313-
EndOffset = meta.GetEndOffset();
314-
if (StartOffset == EndOffset) {
315-
NewHead.Offset = Head.Offset = EndOffset;
316-
}
317-
*/
312+
Partition()->StartOffset = meta.GetStartOffset();
313+
Partition()->EndOffset = meta.GetEndOffset();
314+
if (Partition()->StartOffset == Partition()->EndOffset) {
315+
Partition()->NewHead.Offset = Partition()->Head.Offset = Partition()->EndOffset;
316+
}
318317
Partition()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace();
318+
Partition()->EndWriteTimestamp = TInstant::MilliSeconds(meta.GetEndWriteTimestamp());
319+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
319320
if (Partition()->IsSupportive()) {
320321
const auto& counterData = meta.GetCounterData();
321322
Partition()->BytesWrittenGrpc.SetSavedValue(counterData.GetBytesWrittenGrpc());
@@ -495,7 +496,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
495496
if (k.GetPartNo() > 0) ++startOffset;
496497
head.PartNo = 0;
497498
} else {
498-
Y_ABORT_UNLESS(endOffset <= k.GetOffset(), "%s", pair.GetKey().c_str());
499+
Y_ABORT_UNLESS(endOffset <= k.GetOffset(), "%" PRIu64 " <= %" PRIu64 " %s", endOffset, k.GetOffset(), pair.GetKey().c_str());
499500
if (endOffset < k.GetOffset()) {
500501
gapOffsets.push_back(std::make_pair(endOffset, k.GetOffset()));
501502
gapSize += k.GetOffset() - endOffset;
@@ -619,7 +620,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
619620

620621
Y_ABORT_UNLESS(offset + 1 >= Partition()->StartOffset);
621622
Y_ABORT_UNLESS(offset < Partition()->EndOffset);
622-
Y_ABORT_UNLESS(size == read.GetValue().size());
623+
Y_ABORT_UNLESS(size == read.GetValue().size(), "size=%d == read.GetValue().size() = %d", size, read.GetValue().size());
623624

624625
for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
625626
head.AddBatch(it.GetBatch());
@@ -653,6 +654,41 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
653654
}
654655

655656

657+
//
658+
// TInitEndWriteTimestampStep
659+
//
660+
661+
TInitEndWriteTimestampStep::TInitEndWriteTimestampStep(TInitializer* initializer)
662+
: TInitializerStep(initializer, "TInitEndWriteTimestampStep", true) {
663+
}
664+
665+
void TInitEndWriteTimestampStep::Execute(const TActorContext &ctx) {
666+
if (Partition()->EndWriteTimestamp != TInstant::Zero() || (Partition()->HeadKeys.empty() && Partition()->DataKeysBody.empty())) {
667+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
668+
<< "' partition " << Partition()->Partition
669+
<< " skiped because already initialized.");
670+
return Done(ctx);
671+
}
672+
673+
TDataKey* lastKey = nullptr;
674+
if (!Partition()->HeadKeys.empty()) {
675+
lastKey = &Partition()->HeadKeys.back();
676+
} else if (!Partition()->DataKeysBody.empty()) {
677+
lastKey = &Partition()->DataKeysBody.back();
678+
}
679+
680+
if (lastKey) {
681+
Partition()->EndWriteTimestamp = lastKey->Timestamp;
682+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
683+
}
684+
685+
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
686+
<< "' partition " << Partition()->Partition
687+
<< " from keys completed. Value " << Partition()->EndWriteTimestamp);
688+
689+
return Done(ctx);
690+
}
691+
656692
//
657693
// TPartition
658694
//

ydb/core/persqueue/partition_init.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,11 @@ class TInitDataStep: public TBaseKVStep {
152152
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
153153
};
154154

155+
class TInitEndWriteTimestampStep: public TInitializerStep {
156+
public:
157+
TInitEndWriteTimestampStep(TInitializer* initializer);
158+
159+
void Execute(const TActorContext& ctx) override;
160+
};
161+
155162
} // NKikimr::NPQ

ydb/core/persqueue/partition_monitoring.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
5959
PROPERTY("StartOffset", StartOffset);
6060
PROPERTY("EndOffset", EndOffset);
6161
PROPERTY("LastOffset", Head.GetNextOffset());
62+
PROPERTY("Last message WriteTimestamp", EndWriteTimestamp.ToRfc822String());
6263
PROPERTY("HeadOffset", Head.Offset << ", count: " << Head.GetCount());
6364
}
6465
}

ydb/core/persqueue/partition_read.cpp

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,24 @@ namespace NKikimr::NPQ {
3030

3131
static const ui32 MAX_USER_ACTS = 1000;
3232

33+
TMaybe<TInstant> GetReadFrom(ui32 maxTimeLagMs, ui64 readTimestampMs, TInstant consumerReadFromTimestamp, const TActorContext& ctx) {
34+
if (!(maxTimeLagMs > 0 || readTimestampMs > 0 || consumerReadFromTimestamp > TInstant::MilliSeconds(1))) {
35+
return {};
36+
}
37+
38+
TInstant timestamp = maxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(maxTimeLagMs) : TInstant::Zero();
39+
timestamp = Max(timestamp, TInstant::MilliSeconds(readTimestampMs));
40+
timestamp = Max(timestamp, consumerReadFromTimestamp);
41+
return timestamp;
42+
}
43+
44+
ui64 TPartition::GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const {
45+
if (!readTimestamp) {
46+
return offset;
47+
}
48+
return Max(GetOffsetEstimate(DataKeysBody, *readTimestamp, Min(Head.Offset, EndOffset - 1)), offset);
49+
}
50+
3351
void TPartition::SendReadingFinished(const TString& consumer) {
3452
Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId, TabletGeneration, ++PQRBCookie));
3553
}
@@ -133,7 +151,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
133151
};
134152

135153
for (auto request = HasDataRequests.begin(); request != HasDataRequests.end();) {
136-
if (request->Offset < EndOffset) {
154+
if (request->Offset < EndOffset && (IsActive() || !request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) {
137155
auto response = MakeHasDataInfoResponse(GetSizeLag(request->Offset), request->Cookie);
138156
ctx.Send(request->Sender, response.Release());
139157
} else if (!IsActive()) {
@@ -170,16 +188,18 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
170188

171189
auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();
172190

191+
auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero() /* TODO */, ctx);
192+
173193
TActorId sender = ActorIdFromProto(record.GetSender());
174-
if (InitDone && EndOffset > (ui64)record.GetOffset()) { //already has data, answer right now
194+
if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now
175195
auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie);
176196
ctx.Send(sender, response.Release());
177197
} else if (InitDone && !IsActive()) {
178198
auto response = MakeHasDataInfoResponse(0, cookie, true);
179199
ctx.Send(sender, response.Release());
180200
} else {
181201
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
182-
record.HasClientId() && InitDone ? record.GetClientId() : ""};
202+
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
183203
THasDataDeadline dl{TInstant::MilliSeconds(record.GetDeadline()), req};
184204
auto res = HasDataRequests.insert(req);
185205
HasDataDeadlines.insert(dl);
@@ -763,11 +783,10 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
763783
}
764784
userInfo->ReadsInQuotaQueue--;
765785
ui64 offset = read->Offset;
766-
if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo->ReadFromTimestamp > TInstant::MilliSeconds(1))) {
767-
TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero();
768-
timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs));
769-
timestamp = Max(timestamp, userInfo->ReadFromTimestamp);
770-
offset = Max(GetOffsetEstimate(DataKeysBody, timestamp, Min(Head.Offset, EndOffset - 1)), offset);
786+
787+
auto readTimestamp = GetReadFrom(read->MaxTimeLagMs, read->ReadTimestampMs, userInfo->ReadFromTimestamp, ctx);
788+
if (read->PartNo == 0 && readTimestamp) {
789+
offset = GetReadOffset(offset, readTimestamp);
771790
userInfo->ReadOffsetRewindSum += offset - read->Offset;
772791
}
773792

ydb/core/persqueue/partition_util.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ struct TPartition::THasDataReq {
113113
TActorId Sender;
114114
TMaybe<ui64> Cookie;
115115
TString ClientId;
116+
TMaybe<TInstant> ReadTimestamp;
116117

117118
bool operator < (const THasDataReq& req) const {
118119
return Num < req.Num;

ydb/core/persqueue/partition_write.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,9 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
381381
void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
382382
PQ_LOG_T("TPartition::SyncMemoryStateWithKVState.");
383383

384-
if (!CompactedKeys.empty())
384+
if (!CompactedKeys.empty()) {
385385
HeadKeys.clear();
386+
}
386387

387388
if (NewHeadKey.Size > 0) {
388389
while (!HeadKeys.empty() &&
@@ -437,6 +438,8 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
437438
}
438439

439440
EndOffset = Head.GetNextOffset();
441+
EndWriteTimestamp = PendingWriteTimestamp;
442+
440443
NewHead.Clear();
441444
NewHead.Offset = EndOffset;
442445

@@ -1397,12 +1400,15 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
13971400
Y_ABORT_UNLESS(Head.GetBatch(pp).GetPartNo() == key.GetPartNo());
13981401
for (; pp < Head.GetBatches().size(); ++pp) { //TODO - merge small batches here
13991402
Y_ABORT_UNLESS(Head.GetBatch(pp).Packed);
1400-
Head.GetBatch(pp).SerializeTo(valueD);
1403+
auto& batch = Head.GetBatch(pp);
1404+
batch.SerializeTo(valueD);
1405+
PendingWriteTimestamp = std::max(PendingWriteTimestamp, batch.GetEndWriteTimestamp());
14011406
}
14021407
}
14031408
for (auto& b : NewHead.GetBatches()) {
14041409
Y_ABORT_UNLESS(b.Packed);
14051410
b.SerializeTo(valueD);
1411+
PendingWriteTimestamp = std::max(PendingWriteTimestamp, b.GetEndWriteTimestamp());
14061412
}
14071413

14081414
Y_ABORT_UNLESS(res.second >= valueD.size());

0 commit comments

Comments
 (0)