Skip to content

Commit 44aff91

Browse files
Messages are written to the partition in two stages (#18118)
1 parent 9016403 commit 44aff91

36 files changed

+1892
-560
lines changed

ydb/core/http_proxy/ut/internal_counters.json

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@
611611
"folder_id": "folder4",
612612
"database": "/Root"
613613
},
614-
"value":243,
614+
"value":245,
615615
"kind":"RATE"
616616
},
617617
{
@@ -768,6 +768,45 @@
768768
}
769769
,"value": 0,
770770
"kind":"RATE"
771-
}
771+
},
772+
{
773+
"kind":"GAUGE",
774+
"labels":
775+
{
776+
"cloud_id":"cloud4",
777+
"database":"/Root",
778+
"database_id":"database4",
779+
"folder_id":"folder4",
780+
"name":"topic.compaction.unprocessed_count_max",
781+
"topic":"test-counters-stream"
782+
},
783+
"value":0
784+
},
785+
{
786+
"kind":"GAUGE",
787+
"labels":
788+
{
789+
"cloud_id":"cloud4",
790+
"database":"/Root",
791+
"database_id":"database4",
792+
"folder_id":"folder4",
793+
"name":"topic.compaction.unprocessed_bytes_max",
794+
"topic":"test-counters-stream"
795+
},
796+
"value":0
797+
},
798+
{
799+
"kind":"GAUGE",
800+
"labels":
801+
{
802+
"cloud_id":"cloud4",
803+
"database":"/Root",
804+
"database_id":"database4",
805+
"folder_id":"folder4",
806+
"name":"topic.compaction.lag_milliseconds_max",
807+
"topic":"test-counters-stream"
808+
},
809+
"value":0
810+
}
772811
]
773812
}

ydb/core/keyvalue/keyvalue_state.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2122,7 +2122,7 @@ bool TKeyValueState::PrepareCmdRead(const TActorContext &ctx, NKikimrClient::TKe
21222122
auto it = Index.find(request.GetKey());
21232123
if (it == Index.end()) {
21242124
response.Status = NKikimrProto::NODATA;
2125-
response.Message = "No such key Marker# KV48";
2125+
response.Message = "No such key Marker# KV48 (" + request.GetKey() + ")";
21262126
} else {
21272127
bool isOverrun = PrepareOneRead<NKikimrClient::TKeyValueRequest>(it->first, it->second, offset, size,
21282128
priority, 0, intermediate, response, outIsInlineOnly);

ydb/core/persqueue/blob.cpp

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ TBlobIterator::TBlobIterator(const TKey& key, const TString& blob)
1616
, Count(0)
1717
, InternalPartsCount(0)
1818
{
19-
Y_ABORT_UNLESS(Data != End);
19+
Y_ABORT_UNLESS(Data != End,
20+
"Key=%s, blob.size=%" PRISZT,
21+
Key.ToString().data(), blob.size());
2022
ParseBatch();
2123
Y_ABORT_UNLESS(Header.GetPartNo() == Key.GetPartNo());
2224
}
@@ -666,7 +668,9 @@ ui32 THead::GetCount() const
666668
return 0;
667669

668670
//how much offsets before last batch and how much offsets in last batch
669-
Y_ABORT_UNLESS(Batches.front().GetOffset() == Offset);
671+
Y_ABORT_UNLESS(Batches.front().GetOffset() == Offset,
672+
"front.Offset=%" PRIu64 ", offset=%" PRIu64,
673+
Batches.front().GetOffset(), Offset);
670674

671675
return Batches.back().GetOffset() - Offset + Batches.back().GetCount();
672676
}
@@ -778,6 +782,7 @@ TPartitionedBlob& TPartitionedBlob::operator=(const TPartitionedBlob& x)
778782
GlueNewHead = x.GlueNewHead;
779783
NeedCompactHead = x.NeedCompactHead;
780784
MaxBlobSize = x.MaxBlobSize;
785+
FastWrite = x.FastWrite;
781786
return *this;
782787
}
783788

@@ -803,11 +808,12 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionedBlob& x)
803808
, GlueNewHead(x.GlueNewHead)
804809
, NeedCompactHead(x.NeedCompactHead)
805810
, MaxBlobSize(x.MaxBlobSize)
811+
, FastWrite(x.FastWrite)
806812
{}
807813

808814
TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
809815
const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize,
810-
const ui16 nextPartNo)
816+
const ui16 nextPartNo, const bool fastWrite)
811817
: Partition(partition)
812818
, Offset(offset)
813819
, InternalPartsCount(0)
@@ -827,6 +833,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 off
827833
, GlueNewHead(true)
828834
, NeedCompactHead(needCompactHead)
829835
, MaxBlobSize(maxBlobSize)
836+
, FastWrite(fastWrite)
830837
{
831838
Y_ABORT_UNLESS(NewHead.Offset == Head.GetNextOffset() && NewHead.PartNo == 0 || headCleared || needCompactHead || Head.PackedSize == 0); // if head not cleared, then NewHead is going after Head
832839
if (!headCleared) {
@@ -883,8 +890,15 @@ auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optio
883890

884891
Y_ABORT_UNLESS(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset));
885892

886-
auto tmpKey = TKey::ForBody(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount);
887-
auto dataKey = TKey::ForBody(TKeyPrefix::TypeData, Partition, StartOffset, StartPartNo, count, InternalPartsCount);
893+
TKey tmpKey, dataKey;
894+
895+
if (FastWrite) {
896+
tmpKey = TKey::ForFastWrite(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount);
897+
dataKey = TKey::ForFastWrite(TKeyPrefix::TypeData, Partition, StartOffset, StartPartNo, count, InternalPartsCount);
898+
} else {
899+
tmpKey = TKey::ForBody(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount);
900+
dataKey = TKey::ForBody(TKeyPrefix::TypeData, Partition, StartOffset, StartPartNo, count, InternalPartsCount);
901+
}
888902

889903
StartOffset = Offset;
890904
StartPartNo = NextPartNo;
@@ -915,7 +929,9 @@ auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optio
915929

916930
auto TPartitionedBlob::Add(TClientBlob&& blob) -> std::optional<TFormedBlobInfo>
917931
{
918-
Y_ABORT_UNLESS(NewHead.Offset >= Head.Offset);
932+
Y_ABORT_UNLESS(NewHead.Offset >= Head.Offset,
933+
"Head.Offset=%" PRIu64 ", NewHead.Offset=%" PRIu64,
934+
Head.Offset, NewHead.Offset);
919935
ui32 size = blob.GetBlobSize();
920936
Y_ABORT_UNLESS(InternalPartsCount < 1000); //just check for future packing
921937
if (HeadSize + BlobsSize + size + GetMaxHeaderSize() > MaxBlobSize) {
@@ -956,6 +972,7 @@ auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional<TForm
956972
}
957973

958974
auto newKey = TKey::FromKey(oldKey, TKeyPrefix::TypeData, Partition, StartOffset);
975+
newKey.SetFastWrite();
959976

960977
FormedBlobs.emplace_back(oldKey, newKey, size);
961978

ydb/core/persqueue/blob.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ class TPartitionedBlob {
323323

324324
TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
325325
const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize,
326-
ui16 nextPartNo = 0);
326+
ui16 nextPartNo = 0, bool fastWrite = true);
327327

328328
struct TFormedBlobInfo {
329329
TKey Key;
@@ -382,6 +382,7 @@ class TPartitionedBlob {
382382
bool GlueNewHead;
383383
bool NeedCompactHead;
384384
ui32 MaxBlobSize;
385+
bool FastWrite = true;
385386
};
386387

387388
}// NPQ

ydb/core/persqueue/cache_eviction.h

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,29 @@ namespace NKikimr::NPQ {
1414
ui16 PartNo;
1515
ui32 Count; // have to be unique for {Partition, Offset, partNo}
1616
ui16 InternalPartsCount; // have to be unique for {Partition, Offset, partNo}
17+
char Suffix;
1718

18-
TBlobId(const TPartitionId& partition, ui64 offset, ui16 partNo, ui32 count, ui16 internalPartsCount)
19+
TBlobId(const TPartitionId& partition, ui64 offset, ui16 partNo, ui32 count, ui16 internalPartsCount, TMaybe<char> suffix)
1920
: Partition(partition)
2021
, Offset(offset)
2122
, PartNo(partNo)
2223
, Count(count)
2324
, InternalPartsCount(internalPartsCount)
25+
, Suffix(suffix ? *suffix : '\0')
2426
{
2527
}
2628

2729
bool operator<(const TBlobId& r) const {
2830
auto makeTuple = [](const TBlobId& v) {
29-
return std::make_tuple(v.Partition, v.Offset, v.PartNo, v.Count, v.InternalPartsCount);
31+
return std::make_tuple(v.Partition, v.Offset, v.PartNo, v.Count, v.InternalPartsCount, v.Suffix);
3032
};
3133

3234
return makeTuple(*this) < makeTuple(r);
3335
}
3436

3537
bool operator==(const TBlobId& r) const {
3638
auto makeTuple = [](const TBlobId& v) {
37-
return std::make_tuple(v.Partition, v.Offset, v.PartNo, v.Count, v.InternalPartsCount);
39+
return std::make_tuple(v.Partition, v.Offset, v.PartNo, v.Count, v.InternalPartsCount, v.Suffix);
3840
};
3941

4042
return makeTuple(*this) == makeTuple(r);
@@ -44,6 +46,7 @@ namespace NKikimr::NPQ {
4446
ui64 hash = Hash128to32((ui64(Partition.InternalPartitionId) << 17) + (Partition.IsSupportivePartition() ? 0 : (1 << 16)) + PartNo, Offset);
4547
hash = Hash128to32(hash, Count);
4648
hash = Hash128to32(hash, InternalPartsCount);
49+
hash = Hash128to32(hash, Suffix);
4750
return hash;
4851
}
4952
};
@@ -58,7 +61,7 @@ struct THash<NKikimr::NPQ::TBlobId> {
5861

5962
namespace NKikimr::NPQ {
6063
inline TBlobId MakeBlobId(const TPartitionId& partitionId, const TRequestedBlob& blob) {
61-
return {partitionId, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount};
64+
return {partitionId, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, blob.Key.GetSuffix()};
6265
}
6366

6467
struct TKvRequest {
@@ -106,7 +109,7 @@ namespace NKikimr::NPQ {
106109
for (auto& blob : Blobs) {
107110
if (blob.Value.empty()) {
108111
// add reading command
109-
auto key = TKey::ForBody(TKeyPrefix::TypeData, Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
112+
const auto& key = blob.Key;
110113
auto read = request->Record.AddCmdRead();
111114
read->SetKey(key.Data(), key.Size());
112115
}
@@ -142,9 +145,10 @@ namespace NKikimr::NPQ {
142145
}
143146

144147
void Verify(const TRequestedBlob& blob) const {
145-
auto key = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId(0), blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
146-
Y_ABORT_UNLESS(blob.Value.size() == blob.Size);
147-
TClientBlob::CheckBlob(key, blob.Value);
148+
Y_ABORT_UNLESS(blob.Value.size() == blob.Size,
149+
"\nblob.Value.size=%" PRISZT ", blob.Size=%" PRIu64 "\nblob.Key=%s",
150+
blob.Value.size(), blob.Size, blob.Key.ToString().data());
151+
TClientBlob::CheckBlob(blob.Key, blob.Value);
148152
}
149153
};
150154

@@ -271,7 +275,7 @@ namespace NKikimr::NPQ {
271275
for (const auto& blob : kvReq.Blobs) {
272276
// Touching blobs in L2. We don't need data here
273277
auto& blobs = blob.Cached ? reqData->RequestedBlobs : reqData->MissedBlobs;
274-
blobs.emplace_back(kvReq.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, nullptr);
278+
blobs.emplace_back(kvReq.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, blob.Key.GetSuffix(), nullptr);
275279
}
276280

277281
auto l2Request = MakeHolder<TEvPqCache::TEvCacheL2Request>(reqData.Release());
@@ -298,7 +302,7 @@ namespace NKikimr::NPQ {
298302

299303
// there could be a new blob with same id (for big messages)
300304
if (RemoveExists(ctx, blob)) {
301-
reqData.RemovedBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount, nullptr);
305+
reqData.RemovedBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount, reqBlob.Key.GetSuffix(), nullptr);
302306
}
303307

304308
auto cached = std::make_shared<TCacheValue>(reqBlob.Value, ctx.SelfID, TAppData::TimeProvider->Now());
@@ -308,7 +312,7 @@ namespace NKikimr::NPQ {
308312
if (L1Strategy)
309313
L1Strategy->SaveHeadBlob(blob);
310314

311-
reqData.StoredBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, blob.Count, blob.InternalPartsCount, cached);
315+
reqData.StoredBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, blob.Count, blob.InternalPartsCount, blob.Suffix, cached);
312316

313317
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Caching head blob in L1. Partition "
314318
<< blob.Partition << " offset " << blob.Offset << " count " << blob.Count
@@ -322,10 +326,10 @@ namespace NKikimr::NPQ {
322326
TPartitionId partitionId;
323327
partitionId.OriginalPartitionId = FromString<ui32>(s.data() + 1, 10);
324328
partitionId.InternalPartitionId = partitionId.OriginalPartitionId;
325-
return {partitionId, 0, 0, 0, 0};
329+
return {partitionId, 0, 0, 0, 0, Nothing()};
326330
} else {
327331
auto key = TKey::FromString(s);
328-
return {key.GetPartition(), key.GetOffset(), key.GetPartNo(), key.GetCount(), key.GetInternalPartsCount()};
332+
return {key.GetPartition(), key.GetOffset(), key.GetPartNo(), key.GetCount(), key.GetInternalPartsCount(), key.GetSuffix()};
329333
}
330334
}
331335

@@ -336,8 +340,8 @@ namespace NKikimr::NPQ {
336340
TBlobId newBlob = MakeBlobId(newKey);
337341
if (RenameExists(ctx, oldBlob, newBlob)) {
338342
reqData.RenamedBlobs.emplace_back(std::piecewise_construct,
339-
std::make_tuple(oldBlob.Partition, oldBlob.Offset, oldBlob.PartNo, oldBlob.Count, oldBlob.InternalPartsCount, nullptr),
340-
std::make_tuple(newBlob.Partition, newBlob.Offset, newBlob.PartNo, newBlob.Count, newBlob.InternalPartsCount, nullptr));
343+
std::make_tuple(oldBlob.Partition, oldBlob.Offset, oldBlob.PartNo, oldBlob.Count, oldBlob.InternalPartsCount, oldBlob.Suffix, nullptr),
344+
std::make_tuple(newBlob.Partition, newBlob.Offset, newBlob.PartNo, newBlob.Count, newBlob.InternalPartsCount, newBlob.Suffix, nullptr));
341345

342346
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Renaming head blob in L1. Old partition "
343347
<< oldBlob.Partition << " old offset " << oldBlob.Offset << " old count " << oldBlob.Count
@@ -357,7 +361,7 @@ namespace NKikimr::NPQ {
357361
for (auto i = lowerBound; i != upperBound; ++i) {
358362
const auto& [blob, value] = *i;
359363

360-
reqData.RemovedBlobs.emplace_back(blob.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, nullptr);
364+
reqData.RemovedBlobs.emplace_back(blob.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, blob.Suffix, nullptr);
361365
Counters.Dec(value);
362366

363367
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Deleting head blob in L1. Partition "
@@ -395,7 +399,7 @@ namespace NKikimr::NPQ {
395399
Cache[blob] = valL1; // weak
396400
Counters.Inc(valL1);
397401

398-
reqData->StoredBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount, cached);
402+
reqData->StoredBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount, reqBlob.Key.GetSuffix(), cached);
399403

400404
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Prefetched blob in L1. Partition "
401405
<< blob.Partition << " offset " << blob.Offset << " count " << blob.Count
@@ -450,7 +454,7 @@ namespace NKikimr::NPQ {
450454
void PrepareTouch(const TActorContext& ctx, THolder<TCacheL2Request>& reqData, const TDeque<TBlobId>& used)
451455
{
452456
for (auto& blob : used) {
453-
reqData->ExpectedBlobs.emplace_back(blob.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, nullptr);
457+
reqData->ExpectedBlobs.emplace_back(blob.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, blob.Suffix, nullptr);
454458

455459
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Touching blob. Partition "
456460
<< blob.Partition << " offset " << blob.Offset << " count " << blob.Count);
@@ -462,14 +466,18 @@ namespace NKikimr::NPQ {
462466
const auto it = Cache.find(blobId);
463467
if (it == Cache.end()) {
464468
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "No blob in L1. Partition "
465-
<< blobId.Partition << " offset " << blobId.Offset << " actorID " << ctx.SelfID);
469+
<< blobId.Partition << " offset " << blobId.Offset <<
470+
" partno " << blobId.PartNo << " count " << blobId.Count << " parts_count " << blobId.InternalPartsCount <<
471+
" actorID " << ctx.SelfID);
466472
return nullptr;
467473
}
468474

469475
TCacheValue::TPtr data = it->second.GetBlob();
470476
if (!data) {
471477
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Evicted blob in L1. Partition "
472-
<< blobId.Partition << " offset " << blobId.Offset << " actorID " << ctx.SelfID);
478+
<< blobId.Partition << " offset " << blobId.Offset <<
479+
" partno " << blobId.PartNo << " count " << blobId.Count << " parts_count " << blobId.InternalPartsCount <<
480+
" actorID " << ctx.SelfID);
473481
RemoveBlob(it);
474482
return nullptr;
475483
}
@@ -478,7 +486,8 @@ namespace NKikimr::NPQ {
478486

479487
const TBlobId& blob = it->first;
480488
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Got data from cache. Partition "
481-
<< blob.Partition << " offset " << blob.Offset << " count " << blob.Count
489+
<< blob.Partition << " offset " << blob.Offset <<
490+
" partno " << blob.PartNo << " count " << blob.Count << " parts_count " << blob.InternalPartsCount
482491
<< " source " << (ui32)it->second.Source << " size " << data->DataSize()
483492
<< " accessed " << data->GetAccessCount() << " times before, last time " << data->GetAccessTime());
484493

0 commit comments

Comments
 (0)