@@ -108,12 +108,14 @@ namespace NKikimr {
108
108
};
109
109
110
110
111
+ // //////////////////////////////////////////////////////////////////////////
112
+ // TCompactRecordMergerBase
111
113
// //////////////////////////////////////////////////////////////////////////
112
114
// Valid call sequence:
113
115
// Clear(); Add(); ... Add(); Finish()
114
116
// GetMemRec(); GetData();
115
117
template <class TKey , class TMemRec >
116
- class TCompactRecordMerger : public TRecordMergerBase <TKey, TMemRec> {
118
+ class TCompactRecordMergerBase : public TRecordMergerBase <TKey, TMemRec> {
117
119
protected:
118
120
using TBase = TRecordMergerBase<TKey, TMemRec>;
119
121
using TBase::MemRec;
@@ -130,16 +132,18 @@ namespace NKikimr {
130
132
};
131
133
132
134
public:
133
- TCompactRecordMerger (const TBlobStorageGroupType >ype, bool addHeader)
135
+ TCompactRecordMergerBase (const TBlobStorageGroupType >ype, bool addHeader)
134
136
: TBase(gtype, true )
137
+ , MemRecs()
138
+ , ProducingSmallBlob(false )
139
+ , NeedToLoadData(ELoadData::NotSet)
135
140
, AddHeader(addHeader)
136
141
{}
137
142
138
143
void Clear () {
139
144
TBase::Clear ();
140
145
MemRecs.clear ();
141
146
ProducingSmallBlob = false ;
142
- ProducingHugeBlob = false ;
143
147
NeedToLoadData = ELoadData::NotSet;
144
148
DataMerger.Clear ();
145
149
}
@@ -152,47 +156,51 @@ namespace NKikimr {
152
156
}
153
157
154
158
void AddFromSegment (const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) {
155
- Add (memRec, nullptr , outbound, key, circaLsn);
159
+ Y_DEBUG_ABORT_UNLESS (NeedToLoadData != ELoadData::NotSet);
160
+ AddBasic (memRec, key);
161
+ switch (memRec.GetType ()) {
162
+ case TBlobType::DiskBlob: {
163
+ if (memRec.HasData () && NeedToLoadData == ELoadData::LoadData) {
164
+ MemRecs.push_back (memRec);
165
+ ProducingSmallBlob = true ;
166
+ }
167
+ break ;
168
+ }
169
+ case TBlobType::HugeBlob:
170
+ case TBlobType::ManyHugeBlobs: {
171
+ TDiskDataExtractor extr;
172
+ memRec.GetDiskData (&extr, outbound);
173
+ const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
174
+ DataMerger.AddHugeBlob (extr.Begin , extr.End , v, circaLsn);
175
+ break ;
176
+ }
177
+ default :
178
+ Y_ABORT (" Impossible case" );
179
+ }
180
+ VerifyConsistency (memRec, outbound);
156
181
}
157
182
158
183
void AddFromFresh (const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
159
- Add (memRec, data, nullptr , key, lsn);
160
- }
161
-
162
- void Add (const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) {
163
184
Y_DEBUG_ABORT_UNLESS (NeedToLoadData != ELoadData::NotSet);
164
185
AddBasic (memRec, key);
165
- if (const NMatrix::TVectorType local = memRec.GetLocalParts (GType); !local.Empty ()) {
166
- TDiskDataExtractor extr;
167
- switch (memRec.GetType ()) {
168
- case TBlobType::MemBlob:
169
- case TBlobType::DiskBlob:
170
- if (NeedToLoadData == ELoadData::LoadData) {
171
- if (data) {
172
- // we have some data in-memory
173
- DataMerger.AddBlob (TDiskBlob (data, local, GType, key.LogoBlobID ()));
174
- }
175
- if (memRec.HasData () && memRec.GetType () == TBlobType::DiskBlob) {
176
- // there is something to read from the disk
177
- MemRecs.push_back (memRec);
178
- }
179
- Y_DEBUG_ABORT_UNLESS (!ProducingHugeBlob);
180
- ProducingSmallBlob = true ;
181
- }
182
- break ;
183
-
184
- case TBlobType::ManyHugeBlobs:
185
- Y_ABORT_UNLESS (outbound);
186
- [[fallthrough]];
187
- case TBlobType::HugeBlob:
188
- memRec.GetDiskData (&extr, outbound);
189
- DataMerger.AddHugeBlob (extr.Begin , extr.End , local, lsn);
190
- Y_DEBUG_ABORT_UNLESS (!ProducingSmallBlob);
191
- ProducingHugeBlob = true ;
192
- break ;
186
+ if (memRec.HasData ()) {
187
+ if (data) {
188
+ Y_ABORT_UNLESS (memRec.GetType () == TBlobType::MemBlob || memRec.GetType () == TBlobType::DiskBlob);
189
+ if (NeedToLoadData == ELoadData::LoadData) {
190
+ DataMerger.AddBlob (TDiskBlob (data, memRec.GetLocalParts (GType), GType, key.LogoBlobID ()));
191
+ ProducingSmallBlob = true ;
192
+ } else {
193
+ // intentionally do nothing: don't add any data to DataMerger, because we don't need it
194
+ }
195
+ } else {
196
+ Y_ABORT_UNLESS (memRec.GetType () == TBlobType::HugeBlob);
197
+ TDiskDataExtractor extr;
198
+ memRec.GetDiskData (&extr, nullptr );
199
+ const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
200
+ DataMerger.AddHugeBlob (extr.Begin , extr.End , v, lsn);
193
201
}
194
202
}
195
- VerifyConsistency (memRec, outbound );
203
+ VerifyConsistency (memRec, nullptr );
196
204
}
197
205
198
206
void VerifyConsistency (const TMemRec& memRec, const TDiskPart *outbound) {
@@ -231,13 +239,6 @@ namespace NKikimr {
231
239
}
232
240
233
241
void Finish () {
234
- if (NeedToLoadData == ELoadData::DontLoadData) {
235
- Y_ABORT_UNLESS (!DataMerger.HasSmallBlobs ()); // we didn't put any small blob to the data merger
236
- // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
237
- // TBlobType::ManyHugeBlobs a few lines below
238
- MemRec.SetNoBlob ();
239
- }
240
-
241
242
Y_DEBUG_ABORT_UNLESS (!Empty ());
242
243
VerifyConsistency ();
243
244
@@ -262,22 +263,118 @@ namespace NKikimr {
262
263
return &DataMerger;
263
264
}
264
265
266
+ protected:
267
+ TStackVec<TMemRec, 16 > MemRecs;
268
+ bool ProducingSmallBlob;
269
+ ELoadData NeedToLoadData;
270
+ TDataMerger DataMerger;
271
+ const bool AddHeader;
272
+ };
273
+
274
+ // //////////////////////////////////////////////////////////////////////////
275
+ // TCompactRecordMergerIndexPass
276
+ // //////////////////////////////////////////////////////////////////////////
277
+ template <typename TKey, typename TMemRec>
278
+ class TCompactRecordMergerIndexPass : public TCompactRecordMergerBase <TKey, TMemRec> {
279
+ using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
280
+
281
+ using ELoadData = typename TBase::ELoadData;
282
+
283
+ using TBase::MemRecs;
284
+ using TBase::ProducingSmallBlob;
285
+ using TBase::NeedToLoadData;
286
+ using TBase::DataMerger;
287
+ using TBase::MemRec;
288
+
289
+ public:
290
+ TCompactRecordMergerIndexPass (const TBlobStorageGroupType >ype, bool addHeader)
291
+ : TBase(gtype, addHeader)
292
+ {}
293
+
294
+ void Finish () {
295
+ if (NeedToLoadData == ELoadData::DontLoadData) {
296
+ Y_ABORT_UNLESS (!DataMerger.HasSmallBlobs ()); // we didn't put any small blob to the data merger
297
+ // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
298
+ // TBlobType::ManyHugeBlobs a few lines below
299
+ MemRec.SetNoBlob ();
300
+ }
301
+
302
+ TBase::Finish ();
303
+ }
304
+
265
305
template <typename TCallback>
266
306
void ForEachSmallDiskBlob (TCallback&& callback) {
267
307
for (const auto & memRec : MemRecs) {
268
308
callback (memRec);
269
309
}
270
310
}
311
+ };
271
312
272
- protected:
273
- TStackVec<TMemRec, 16 > MemRecs;
274
- bool ProducingSmallBlob = false ;
275
- bool ProducingHugeBlob = false ;
276
- ELoadData NeedToLoadData = ELoadData::NotSet;
277
- TDataMerger DataMerger;
278
- const bool AddHeader;
313
+ // //////////////////////////////////////////////////////////////////////////
314
+ // TCompactRecordMergerDataPass
315
+ // //////////////////////////////////////////////////////////////////////////
316
+ template <typename TKey, typename TMemRec>
317
+ class TCompactRecordMergerDataPass : public TCompactRecordMergerBase <TKey, TMemRec> {
318
+ using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
319
+
320
+ using TBase::ProducingSmallBlob;
321
+ using TBase::MemRecs;
322
+ using TBase::MemRec;
323
+ using TBase::DataMerger;
324
+ using TBase::GType;
325
+ using TBase::SetLoadDataMode;
326
+
327
+ public:
328
+ TCompactRecordMergerDataPass (const TBlobStorageGroupType >ype, bool addHeader)
329
+ : TBase(gtype, addHeader)
330
+ {
331
+ SetLoadDataMode (true );
332
+ }
333
+
334
+ void Clear () {
335
+ TBase::Clear ();
336
+ ReadSmallBlobs.clear ();
337
+ SetLoadDataMode (true );
338
+ }
339
+
340
+ // add read small blob content; they should come in order as returned from GetSmallBlobDiskParts by index merger
341
+ void AddReadSmallBlob (TString data) {
342
+ Y_ABORT_UNLESS (ProducingSmallBlob);
343
+ ReadSmallBlobs.push_back (std::move (data));
344
+ }
345
+
346
+ void Finish () {
347
+ // ensure we are producing small blobs; otherwise this merger should never be created
348
+ Y_ABORT_UNLESS (ProducingSmallBlob);
349
+
350
+ // add all read small blobs into blob merger
351
+ const size_t count = ReadSmallBlobs.size ();
352
+ Y_ABORT_UNLESS (count == +MemRecs, " count# %zu +MemRecs# %zu" , count, +MemRecs);
353
+ for (size_t i = 0 ; i < count; ++i) {
354
+ const TMemRec& memRec = MemRecs[i]->GetMemRec ();
355
+ const TString& buffer = ReadSmallBlobs[i];
356
+ Y_ABORT_UNLESS (buffer.size () == memRec.DataSize ());
357
+ DataMerger.AddBlob (TDiskBlob (buffer.data (), buffer.size (), memRec.GetLocalParts (GType)));
358
+ }
359
+
360
+
361
+ // ensure that data merger has small blob
362
+ Y_ABORT_UNLESS (DataMerger.HasSmallBlobs ());
363
+
364
+ // finalize base class logic; it also generates blob record
365
+ TBase::Finish ();
366
+
367
+ // ensure that we have generated correct DiskBlob with full set of declared parts
368
+ const TDiskBlob& blob = DataMerger.GetDiskBlobMerger ().GetDiskBlob ();
369
+ Y_ABORT_UNLESS (blob.GetParts () == MemRec.GetLocalParts (GType));
370
+ Y_ABORT_UNLESS (MemRec.GetType () == TBlobType::DiskBlob);
371
+ }
372
+
373
+ private:
374
+ TVector<TString> ReadSmallBlobs;
279
375
};
280
376
377
+
281
378
// //////////////////////////////////////////////////////////////////////////
282
379
// TRecordMergerCallback
283
380
// //////////////////////////////////////////////////////////////////////////
@@ -315,8 +412,9 @@ namespace NKikimr {
315
412
case 1 : {
316
413
if (memRec.GetType () == TBlobType::DiskBlob) {
317
414
// don't deduplicate inplaced data
318
- if (!v.Empty ()) {
319
- (*Callback)(extr.SwearOne (), v);
415
+ const TDiskPart &data = extr.SwearOne ();
416
+ if (data.ChunkIdx && data.Size ) {
417
+ (*Callback)(data, v);
320
418
}
321
419
} else if (memRec.GetType () == TBlobType::HugeBlob) {
322
420
Y_ABORT_UNLESS (v.CountBits () == 1u );
@@ -348,31 +446,20 @@ namespace NKikimr {
348
446
349
447
void AddFromFresh (const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
350
448
AddBasic (memRec, key);
351
- if (const NMatrix::TVectorType local = memRec.GetLocalParts (GType); !local.Empty ()) {
352
- TDiskDataExtractor extr;
353
- static TRope rope;
354
- switch (memRec.GetType ()) {
355
- case TBlobType::MemBlob:
356
- // we have in-memory data in a rope, it always wins among other data,
357
- // so we call Callback immediately and remove any data for this local part
358
- // from LastWriteWinsMerger
359
- Y_ABORT_UNLESS (data); // HaveToMergeData is true, so data must be present
360
- (*Callback)(TDiskBlob (data, local, GType, key.LogoBlobID ()));
361
- break ;
362
-
363
- case TBlobType::DiskBlob:
364
- Y_ABORT_UNLESS (!memRec.HasData ());
365
- (*Callback)(TDiskBlob (&rope, local, GType, key.LogoBlobID ())); // pure metadata parts only
366
- break ;
367
-
368
- case TBlobType::HugeBlob:
369
- Y_ABORT_UNLESS (local.CountBits () == 1 );
370
- memRec.GetDiskData (&extr, nullptr );
371
- LastWriteWinsMerger.Add (extr.SwearOne (), local, lsn);
372
- break ;
373
-
374
- case TBlobType::ManyHugeBlobs:
375
- Y_ABORT (" unexpected case" );
449
+ if (memRec.HasData ()) {
450
+ const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
451
+ if (data) {
452
+ Y_ABORT_UNLESS (memRec.GetType () == TBlobType::MemBlob);
453
+ // we have in-memory data in a rope, it always wins among other data,
454
+ // so we call Callback immediately and remove any data for this local part
455
+ // from LastWriteWinsMerger
456
+ (*Callback)(TDiskBlob (data, v, GType, key.LogoBlobID ()));
457
+ } else {
458
+ Y_ABORT_UNLESS (memRec.GetType () == TBlobType::HugeBlob && v.CountBits () == 1u );
459
+ TDiskDataExtractor extr;
460
+ memRec.GetDiskData (&extr, nullptr );
461
+ // deduplicate huge blob
462
+ LastWriteWinsMerger.Add (extr.SwearOne (), v, lsn);
376
463
}
377
464
}
378
465
}
0 commit comments