@@ -108,14 +108,12 @@ namespace NKikimr {
108
108
};
109
109
110
110
111
- // //////////////////////////////////////////////////////////////////////////
112
- // TCompactRecordMergerBase
113
111
// //////////////////////////////////////////////////////////////////////////
114
112
// Valid call sequence:
115
113
// Clear(); Add(); ... Add(); Finish()
116
114
// GetMemRec(); GetData();
117
115
template <class TKey , class TMemRec >
118
- class TCompactRecordMergerBase : public TRecordMergerBase <TKey, TMemRec> {
116
+ class TCompactRecordMerger : public TRecordMergerBase <TKey, TMemRec> {
119
117
protected:
120
118
using TBase = TRecordMergerBase<TKey, TMemRec>;
121
119
using TBase::MemRec;
@@ -132,18 +130,16 @@ namespace NKikimr {
132
130
};
133
131
134
132
public:
135
- TCompactRecordMergerBase (const TBlobStorageGroupType >ype, bool addHeader)
133
+ TCompactRecordMerger (const TBlobStorageGroupType >ype, bool addHeader)
136
134
: TBase(gtype, true )
137
- , MemRecs()
138
- , ProducingSmallBlob(false )
139
- , NeedToLoadData(ELoadData::NotSet)
140
135
, AddHeader(addHeader)
141
136
{}
142
137
143
138
void Clear () {
144
139
TBase::Clear ();
145
140
MemRecs.clear ();
146
141
ProducingSmallBlob = false ;
142
+ ProducingHugeBlob = false ;
147
143
NeedToLoadData = ELoadData::NotSet;
148
144
DataMerger.Clear ();
149
145
}
@@ -156,51 +152,51 @@ namespace NKikimr {
156
152
}
157
153
158
154
void AddFromSegment (const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) {
159
- Y_DEBUG_ABORT_UNLESS (NeedToLoadData != ELoadData::NotSet);
160
- AddBasic (memRec, key);
161
- switch (memRec.GetType ()) {
162
- case TBlobType::DiskBlob: {
163
- if (memRec.HasData () && NeedToLoadData == ELoadData::LoadData) {
164
- MemRecs.push_back (memRec);
165
- ProducingSmallBlob = true ;
166
- }
167
- break ;
168
- }
169
- case TBlobType::HugeBlob:
170
- case TBlobType::ManyHugeBlobs: {
171
- TDiskDataExtractor extr;
172
- memRec.GetDiskData (&extr, outbound);
173
- const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
174
- DataMerger.AddHugeBlob (extr.Begin , extr.End , v, circaLsn);
175
- break ;
176
- }
177
- default :
178
- Y_ABORT (" Impossible case" );
179
- }
180
- VerifyConsistency (memRec, outbound);
155
+ Add (memRec, nullptr , outbound, key, circaLsn);
181
156
}
182
157
183
158
void AddFromFresh (const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
159
+ Add (memRec, data, nullptr , key, lsn);
160
+ }
161
+
162
+ void Add (const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) {
184
163
Y_DEBUG_ABORT_UNLESS (NeedToLoadData != ELoadData::NotSet);
185
164
AddBasic (memRec, key);
186
- if (memRec.HasData ()) {
187
- if (data) {
188
- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::MemBlob || memRec.GetType () == TBlobType::DiskBlob);
189
- if (NeedToLoadData == ELoadData::LoadData) {
190
- DataMerger.AddBlob (TDiskBlob (data, memRec.GetLocalParts (GType), GType, key.LogoBlobID ()));
191
- ProducingSmallBlob = true ;
192
- } else {
193
- // intentionally do nothing: don't add any data to DataMerger, because we don't need it
194
- }
195
- } else {
196
- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::HugeBlob);
197
- TDiskDataExtractor extr;
198
- memRec.GetDiskData (&extr, nullptr );
199
- const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
200
- DataMerger.AddHugeBlob (extr.Begin , extr.End , v, lsn);
165
+ if (const NMatrix::TVectorType local = memRec.GetLocalParts (GType); !local.Empty ()) {
166
+ TDiskDataExtractor extr;
167
+ switch (memRec.GetType ()) {
168
+ case TBlobType::MemBlob:
169
+ case TBlobType::DiskBlob:
170
+ if (NeedToLoadData == ELoadData::LoadData) {
171
+ if (data) {
172
+ // we have some data in-memory
173
+ DataMerger.AddBlob (TDiskBlob (data, local, GType, key.LogoBlobID ()));
174
+ }
175
+ if (memRec.GetType () == TBlobType::DiskBlob) {
176
+ if (memRec.HasData ()) { // there is something to read from the disk
177
+ MemRecs.push_back (memRec);
178
+ } else { // headerless metadata stored
179
+ static TRope emptyRope;
180
+ DataMerger.AddBlob (TDiskBlob (&emptyRope, local, GType, key.LogoBlobID ()));
181
+ }
182
+ }
183
+ Y_DEBUG_ABORT_UNLESS (!ProducingHugeBlob);
184
+ ProducingSmallBlob = true ;
185
+ }
186
+ break ;
187
+
188
+ case TBlobType::ManyHugeBlobs:
189
+ Y_ABORT_UNLESS (outbound);
190
+ [[fallthrough]];
191
+ case TBlobType::HugeBlob:
192
+ memRec.GetDiskData (&extr, outbound);
193
+ DataMerger.AddHugeBlob (extr.Begin , extr.End , local, lsn);
194
+ Y_DEBUG_ABORT_UNLESS (!ProducingSmallBlob);
195
+ ProducingHugeBlob = true ;
196
+ break ;
201
197
}
202
198
}
203
- VerifyConsistency (memRec, nullptr );
199
+ VerifyConsistency (memRec, outbound );
204
200
}
205
201
206
202
void VerifyConsistency (const TMemRec& memRec, const TDiskPart *outbound) {
@@ -239,6 +235,13 @@ namespace NKikimr {
239
235
}
240
236
241
237
void Finish () {
238
+ if (NeedToLoadData == ELoadData::DontLoadData) {
239
+ Y_ABORT_UNLESS (!DataMerger.HasSmallBlobs ()); // we didn't put any small blob to the data merger
240
+ // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
241
+ // TBlobType::ManyHugeBlobs a few lines below
242
+ MemRec.SetNoBlob ();
243
+ }
244
+
242
245
Y_DEBUG_ABORT_UNLESS (!Empty ());
243
246
VerifyConsistency ();
244
247
@@ -263,118 +266,22 @@ namespace NKikimr {
263
266
return &DataMerger;
264
267
}
265
268
266
- protected:
267
- TStackVec<TMemRec, 16 > MemRecs;
268
- bool ProducingSmallBlob;
269
- ELoadData NeedToLoadData;
270
- TDataMerger DataMerger;
271
- const bool AddHeader;
272
- };
273
-
274
- // //////////////////////////////////////////////////////////////////////////
275
- // TCompactRecordMergerIndexPass
276
- // //////////////////////////////////////////////////////////////////////////
277
- template <typename TKey, typename TMemRec>
278
- class TCompactRecordMergerIndexPass : public TCompactRecordMergerBase <TKey, TMemRec> {
279
- using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
280
-
281
- using ELoadData = typename TBase::ELoadData;
282
-
283
- using TBase::MemRecs;
284
- using TBase::ProducingSmallBlob;
285
- using TBase::NeedToLoadData;
286
- using TBase::DataMerger;
287
- using TBase::MemRec;
288
-
289
- public:
290
- TCompactRecordMergerIndexPass (const TBlobStorageGroupType >ype, bool addHeader)
291
- : TBase(gtype, addHeader)
292
- {}
293
-
294
- void Finish () {
295
- if (NeedToLoadData == ELoadData::DontLoadData) {
296
- Y_ABORT_UNLESS (!DataMerger.HasSmallBlobs ()); // we didn't put any small blob to the data merger
297
- // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
298
- // TBlobType::ManyHugeBlobs a few lines below
299
- MemRec.SetNoBlob ();
300
- }
301
-
302
- TBase::Finish ();
303
- }
304
-
305
269
template <typename TCallback>
306
270
void ForEachSmallDiskBlob (TCallback&& callback) {
307
271
for (const auto & memRec : MemRecs) {
308
272
callback (memRec);
309
273
}
310
274
}
311
- };
312
-
313
- // //////////////////////////////////////////////////////////////////////////
314
- // TCompactRecordMergerDataPass
315
- // //////////////////////////////////////////////////////////////////////////
316
- template <typename TKey, typename TMemRec>
317
- class TCompactRecordMergerDataPass : public TCompactRecordMergerBase <TKey, TMemRec> {
318
- using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
319
-
320
- using TBase::ProducingSmallBlob;
321
- using TBase::MemRecs;
322
- using TBase::MemRec;
323
- using TBase::DataMerger;
324
- using TBase::GType;
325
- using TBase::SetLoadDataMode;
326
-
327
- public:
328
- TCompactRecordMergerDataPass (const TBlobStorageGroupType >ype, bool addHeader)
329
- : TBase(gtype, addHeader)
330
- {
331
- SetLoadDataMode (true );
332
- }
333
275
334
- void Clear () {
335
- TBase::Clear ();
336
- ReadSmallBlobs.clear ();
337
- SetLoadDataMode (true );
338
- }
339
-
340
- // add read small blob content; they should come in order as returned from GetSmallBlobDiskParts by index merger
341
- void AddReadSmallBlob (TString data) {
342
- Y_ABORT_UNLESS (ProducingSmallBlob);
343
- ReadSmallBlobs.push_back (std::move (data));
344
- }
345
-
346
- void Finish () {
347
- // ensure we are producing small blobs; otherwise this merger should never be created
348
- Y_ABORT_UNLESS (ProducingSmallBlob);
349
-
350
- // add all read small blobs into blob merger
351
- const size_t count = ReadSmallBlobs.size ();
352
- Y_ABORT_UNLESS (count == +MemRecs, " count# %zu +MemRecs# %zu" , count, +MemRecs);
353
- for (size_t i = 0 ; i < count; ++i) {
354
- const TMemRec& memRec = MemRecs[i]->GetMemRec ();
355
- const TString& buffer = ReadSmallBlobs[i];
356
- Y_ABORT_UNLESS (buffer.size () == memRec.DataSize ());
357
- DataMerger.AddBlob (TDiskBlob (buffer.data (), buffer.size (), memRec.GetLocalParts (GType)));
358
- }
359
-
360
-
361
- // ensure that data merger has small blob
362
- Y_ABORT_UNLESS (DataMerger.HasSmallBlobs ());
363
-
364
- // finalize base class logic; it also generates blob record
365
- TBase::Finish ();
366
-
367
- // ensure that we have generated correct DiskBlob with full set of declared parts
368
- const TDiskBlob& blob = DataMerger.GetDiskBlobMerger ().GetDiskBlob ();
369
- Y_ABORT_UNLESS (blob.GetParts () == MemRec.GetLocalParts (GType));
370
- Y_ABORT_UNLESS (MemRec.GetType () == TBlobType::DiskBlob);
371
- }
372
-
373
- private:
374
- TVector<TString> ReadSmallBlobs;
276
+ protected:
277
+ TStackVec<TMemRec, 16 > MemRecs;
278
+ bool ProducingSmallBlob = false ;
279
+ bool ProducingHugeBlob = false ;
280
+ ELoadData NeedToLoadData = ELoadData::NotSet;
281
+ TDataMerger DataMerger;
282
+ const bool AddHeader;
375
283
};
376
284
377
-
378
285
// //////////////////////////////////////////////////////////////////////////
379
286
// TRecordMergerCallback
380
287
// //////////////////////////////////////////////////////////////////////////
@@ -412,9 +319,8 @@ namespace NKikimr {
412
319
case 1 : {
413
320
if (memRec.GetType () == TBlobType::DiskBlob) {
414
321
// don't deduplicate inplaced data
415
- const TDiskPart &data = extr.SwearOne ();
416
- if (data.ChunkIdx && data.Size ) {
417
- (*Callback)(data, v);
322
+ if (!v.Empty ()) {
323
+ (*Callback)(extr.SwearOne (), v);
418
324
}
419
325
} else if (memRec.GetType () == TBlobType::HugeBlob) {
420
326
Y_ABORT_UNLESS (v.CountBits () == 1u );
@@ -446,20 +352,31 @@ namespace NKikimr {
446
352
447
353
void AddFromFresh (const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
448
354
AddBasic (memRec, key);
449
- if (memRec.HasData ()) {
450
- const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
451
- if (data) {
452
- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::MemBlob);
453
- // we have in-memory data in a rope, it always wins among other data,
454
- // so we call Callback immediately and remove any data for this local part
455
- // from LastWriteWinsMerger
456
- (*Callback)(TDiskBlob (data, v, GType, key.LogoBlobID ()));
457
- } else {
458
- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::HugeBlob && v.CountBits () == 1u );
459
- TDiskDataExtractor extr;
460
- memRec.GetDiskData (&extr, nullptr );
461
- // deduplicate huge blob
462
- LastWriteWinsMerger.Add (extr.SwearOne (), v, lsn);
355
+ if (const NMatrix::TVectorType local = memRec.GetLocalParts (GType); !local.Empty ()) {
356
+ TDiskDataExtractor extr;
357
+ static TRope rope;
358
+ switch (memRec.GetType ()) {
359
+ case TBlobType::MemBlob:
360
+ // we have in-memory data in a rope, it always wins among other data,
361
+ // so we call Callback immediately and remove any data for this local part
362
+ // from LastWriteWinsMerger
363
+ Y_ABORT_UNLESS (data); // HaveToMergeData is true, so data must be present
364
+ (*Callback)(TDiskBlob (data, local, GType, key.LogoBlobID ()));
365
+ break ;
366
+
367
+ case TBlobType::DiskBlob:
368
+ Y_ABORT_UNLESS (!memRec.HasData ());
369
+ (*Callback)(TDiskBlob (&rope, local, GType, key.LogoBlobID ())); // pure metadata parts only
370
+ break ;
371
+
372
+ case TBlobType::HugeBlob:
373
+ Y_ABORT_UNLESS (local.CountBits () == 1 );
374
+ memRec.GetDiskData (&extr, nullptr );
375
+ LastWriteWinsMerger.Add (extr.SwearOne (), local, lsn);
376
+ break ;
377
+
378
+ case TBlobType::ManyHugeBlobs:
379
+ Y_ABORT (" unexpected case" );
463
380
}
464
381
}
465
382
}
0 commit comments