7
7
#include < ydb/core/persqueue/events/internal.h>
8
8
#include < ydb/core/persqueue/map_subrange.h>
9
9
10
- namespace NKikimr {
11
- namespace NPQ {
12
-
10
+ namespace NKikimr ::NPQ {
13
11
struct TBlobId {
14
12
TPartitionId Partition;
15
13
ui64 Offset;
@@ -26,10 +24,6 @@ namespace NPQ {
26
24
{
27
25
}
28
26
29
- bool operator ==(const TBlobId& r) const {
30
- return Partition.IsEqual (r.Partition ) && Offset == r.Offset && PartNo == r.PartNo ;
31
- }
32
-
33
27
bool operator <(const TBlobId& r) const {
34
28
auto makeTuple = [](const TBlobId& v) {
35
29
return std::make_tuple (v.Partition , v.Offset , v.PartNo , v.Count , v.InternalPartsCount );
@@ -38,11 +32,22 @@ namespace NPQ {
38
32
return makeTuple (*this ) < makeTuple (r);
39
33
}
40
34
35
+ bool operator ==(const TBlobId& r) const {
36
+ auto makeTuple = [](const TBlobId& v) {
37
+ return std::make_tuple (v.Partition , v.Offset , v.PartNo , v.Count , v.InternalPartsCount );
38
+ };
39
+
40
+ return makeTuple (*this ) == makeTuple (r);
41
+ }
42
+
41
43
ui64 Hash () const {
42
- return Hash128to32 ((ui64 (Partition.InternalPartitionId ) << 17 ) + (Partition.IsSupportivePartition () ? 0 : (1 << 16 )) + PartNo, Offset);
44
+ ui64 hash = Hash128to32 ((ui64 (Partition.InternalPartitionId ) << 17 ) + (Partition.IsSupportivePartition () ? 0 : (1 << 16 )) + PartNo, Offset);
45
+ hash = Hash128to32 (hash, Count);
46
+ hash = Hash128to32 (hash, InternalPartsCount);
47
+ return hash;
43
48
}
44
49
};
45
- }}
50
+ }
46
51
47
52
template <>
48
53
struct THash <NKikimr::NPQ::TBlobId> {
@@ -51,8 +56,10 @@ struct THash<NKikimr::NPQ::TBlobId> {
51
56
}
52
57
};
53
58
54
- namespace NKikimr {
55
- namespace NPQ {
59
+ namespace NKikimr ::NPQ {
60
+ inline TBlobId MakeBlobId (const TPartitionId& partitionId, const TRequestedBlob& blob) {
61
+ return {partitionId, blob.Offset , blob.PartNo , blob.Count , blob.InternalPartsCount };
62
+ }
56
63
57
64
struct TKvRequest {
58
65
enum ERequestType {
@@ -89,7 +96,9 @@ namespace NPQ {
89
96
, MetadataWritesCount(0 )
90
97
{}
91
98
92
- TBlobId GetBlobId (ui32 pos) const { return TBlobId (Partition, Blobs[pos].Offset , Blobs[pos].PartNo , Blobs[pos].Count , Blobs[pos].InternalPartsCount ); }
99
+ TBlobId GetBlobId (ui32 pos) const {
100
+ return NPQ::MakeBlobId (Partition, Blobs[pos]);
101
+ }
93
102
94
103
THolder<TEvKeyValue::TEvRequest> MakeKvRequest () const
95
104
{
@@ -262,7 +271,7 @@ namespace NPQ {
262
271
for (const auto & blob : kvReq.Blobs ) {
263
272
// Touching blobs in L2. We don't need data here
264
273
auto & blobs = blob.Cached ? reqData->RequestedBlobs : reqData->MissedBlobs ;
265
- blobs.emplace_back (kvReq.Partition , blob.Offset , blob.PartNo , nullptr );
274
+ blobs.emplace_back (kvReq.Partition , blob.Offset , blob.PartNo , blob. Count , blob. InternalPartsCount , nullptr );
266
275
}
267
276
268
277
auto l2Request = MakeHolder<TEvPqCache::TEvCacheL2Request>(reqData.Release ());
@@ -285,11 +294,11 @@ namespace NPQ {
285
294
void SaveBlobs (const TKvRequest& kvReq, TCacheL2Request& reqData, const TActorContext& ctx)
286
295
{
287
296
for (const TRequestedBlob& reqBlob : kvReq.Blobs ) {
288
- TBlobId blob (kvReq.Partition , reqBlob. Offset , reqBlob. PartNo , reqBlob. Count , reqBlob. InternalPartsCount );
297
+ TBlobId blob = NPQ::MakeBlobId (kvReq.Partition , reqBlob);
289
298
290
299
// there could be a new blob with same id (for big messages)
291
300
if (RemoveExists (ctx, blob)) {
292
- reqData.RemovedBlobs .emplace_back (kvReq.Partition , reqBlob.Offset , reqBlob.PartNo , nullptr );
301
+ reqData.RemovedBlobs .emplace_back (kvReq.Partition , reqBlob.Offset , reqBlob.PartNo , reqBlob. Count , reqBlob. InternalPartsCount , nullptr );
293
302
}
294
303
295
304
auto cached = std::make_shared<TCacheValue>(reqBlob.Value , ctx.SelfID , TAppData::TimeProvider->Now ());
@@ -299,15 +308,15 @@ namespace NPQ {
299
308
if (L1Strategy)
300
309
L1Strategy->SaveHeadBlob (blob);
301
310
302
- reqData.StoredBlobs .emplace_back (kvReq.Partition , reqBlob.Offset , reqBlob.PartNo , cached);
311
+ reqData.StoredBlobs .emplace_back (kvReq.Partition , reqBlob.Offset , reqBlob.PartNo , blob. Count , blob. InternalPartsCount , cached);
303
312
304
313
LOG_DEBUG_S (ctx, NKikimrServices::PERSQUEUE, " Caching head blob in L1. Partition "
305
314
<< blob.Partition << " offset " << blob.Offset << " count " << blob.Count
306
315
<< " size " << reqBlob.Value .size () << " actorID " << ctx.SelfID );
307
316
}
308
317
}
309
318
310
- TBlobId MakeBlobId (const TString& s)
319
+ static TBlobId MakeBlobId (const TString& s)
311
320
{
312
321
if (s.length () == TKeyPrefix::MarkPosition ()) {
313
322
TPartitionId partitionId;
@@ -327,8 +336,8 @@ namespace NPQ {
327
336
TBlobId newBlob = MakeBlobId (newKey);
328
337
if (RenameExists (ctx, oldBlob, newBlob)) {
329
338
reqData.RenamedBlobs .emplace_back (std::piecewise_construct,
330
- std::make_tuple (oldBlob.Partition , oldBlob.Offset , oldBlob.PartNo , nullptr ),
331
- std::make_tuple (newBlob.Partition , newBlob.Offset , newBlob.PartNo , nullptr ));
339
+ std::make_tuple (oldBlob.Partition , oldBlob.Offset , oldBlob.PartNo , oldBlob. Count , oldBlob. InternalPartsCount , nullptr ),
340
+ std::make_tuple (newBlob.Partition , newBlob.Offset , newBlob.PartNo , newBlob. Count , newBlob. InternalPartsCount , nullptr ));
332
341
333
342
LOG_DEBUG_S (ctx, NKikimrServices::PERSQUEUE, " Renaming head blob in L1. Old partition "
334
343
<< oldBlob.Partition << " old offset " << oldBlob.Offset << " old count " << oldBlob.Count
@@ -348,7 +357,7 @@ namespace NPQ {
348
357
for (auto i = lowerBound; i != upperBound; ++i) {
349
358
const auto & [blob, value] = *i;
350
359
351
- reqData.RemovedBlobs .emplace_back (blob.Partition , blob.Offset , blob.PartNo , nullptr );
360
+ reqData.RemovedBlobs .emplace_back (blob.Partition , blob.Offset , blob.PartNo , blob. Count , blob. InternalPartsCount , nullptr );
352
361
Counters.Dec (value);
353
362
354
363
LOG_DEBUG_S (ctx, NKikimrServices::PERSQUEUE, " Deleting head blob in L1. Partition "
@@ -372,7 +381,7 @@ namespace NPQ {
372
381
continue ;
373
382
374
383
const TRequestedBlob& reqBlob = kvReq.Blobs [i];
375
- TBlobId blob (kvReq.Partition , reqBlob. Offset , reqBlob. PartNo , reqBlob. Count , reqBlob. InternalPartsCount );
384
+ TBlobId blob = NPQ::MakeBlobId (kvReq.Partition , reqBlob);
376
385
{
377
386
TValueL1 value;
378
387
if (CheckExists (ctx, blob, value)) {
@@ -386,7 +395,7 @@ namespace NPQ {
386
395
Cache[blob] = valL1; // weak
387
396
Counters.Inc (valL1);
388
397
389
- reqData->StoredBlobs .emplace_back (kvReq.Partition , reqBlob.Offset , reqBlob.PartNo , cached);
398
+ reqData->StoredBlobs .emplace_back (kvReq.Partition , reqBlob.Offset , reqBlob.PartNo , reqBlob. Count , reqBlob. InternalPartsCount , cached);
390
399
391
400
LOG_DEBUG_S (ctx, NKikimrServices::PERSQUEUE, " Prefetched blob in L1. Partition "
392
401
<< blob.Partition << " offset " << blob.Offset << " count " << blob.Count
@@ -441,7 +450,7 @@ namespace NPQ {
441
450
void PrepareTouch (const TActorContext& ctx, THolder<TCacheL2Request>& reqData, const TDeque<TBlobId>& used)
442
451
{
443
452
for (auto & blob : used) {
444
- reqData->ExpectedBlobs .emplace_back (blob.Partition , blob.Offset , blob.PartNo , nullptr );
453
+ reqData->ExpectedBlobs .emplace_back (blob.Partition , blob.Offset , blob.PartNo , blob. Count , blob. InternalPartsCount , nullptr );
445
454
446
455
LOG_DEBUG_S (ctx, NKikimrServices::PERSQUEUE, " Touching blob. Partition "
447
456
<< blob.Partition << " offset " << blob.Offset << " count " << blob.Count );
@@ -484,7 +493,7 @@ namespace NPQ {
484
493
++numCached;
485
494
continue ;
486
495
}
487
- TBlobId blobId (kvReq.Partition , blob. Offset , blob. PartNo , blob. Count , blob. InternalPartsCount );
496
+ TBlobId blobId = NPQ::MakeBlobId (kvReq.Partition , blob);
488
497
TCacheValue::TPtr cached = GetValue (ctx, blobId);
489
498
if (cached) {
490
499
++numCached;
@@ -566,5 +575,4 @@ namespace NPQ {
566
575
}
567
576
};
568
577
569
- } // NPQ
570
- } // NKikimr
578
+ } // NKikimr::NPQ
0 commit comments