Skip to content

Commit e8e060a

Browse files
authored
Disable blob header by default in VDisk (#9491)
1 parent 185dd43 commit e8e060a

File tree

10 files changed

+104
-187
lines changed

10 files changed

+104
-187
lines changed

ydb/core/blobstorage/ut_blobstorage/defrag.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ static TIntrusivePtr<TBlobStorageGroupInfo> PrepareEnv(TEnvironmentSetup& env, T
1111
const TIntrusivePtr<TBlobStorageGroupInfo> info = env.GetGroupInfo(groups.front());
1212
env.Sim(TDuration::Minutes(5));
1313

14-
const ui32 dataLen = 512 * 1024;
14+
const ui32 dataLen = 512 * 1024 + 1;
1515
const TString data(dataLen, 'x');
1616
ui32 index = 0;
1717

@@ -257,7 +257,7 @@ Y_UNIT_TEST_SUITE(Defragmentation) {
257257

258258
const TEvDefragRewritten* msg = ev->Get<TEvDefragRewritten>();
259259
UNIT_ASSERT_VALUES_EQUAL(msg->RewrittenRecs, 18);
260-
UNIT_ASSERT_VALUES_EQUAL(msg->RewrittenBytes, 9961567);
260+
UNIT_ASSERT_VALUES_EQUAL(msg->RewrittenBytes, 9961491);
261261
}
262262
return true;
263263
case TEvBlobStorage::EvRestoreCorruptedBlob:

ydb/core/blobstorage/vdisk/common/disk_part.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,7 @@ namespace NKikimr {
131131
}
132132

133133
ui64 Hash() const {
134-
ui64 x = 0;
135-
x |= (ui64)ChunkIdx;
136-
x <<= 32u;
137-
x |= (ui64)Offset;
138-
return x;
134+
return MultiHash(ChunkIdx, Offset);
139135
}
140136

141137
inline bool operator <(const TDiskPart &x) const {

ydb/core/blobstorage/vdisk/common/vdisk_config.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ namespace NKikimr {
4343
HullCompMaxInFlightReads = 20;
4444
HullCompReadBatchEfficiencyThreshold = 0.5; // don't issue reads if there are more gaps than the useful data
4545
AnubisOsirisMaxInFly = 1000;
46-
AddHeader = true;
46+
AddHeader = false;
4747

4848
RecoveryLogCutterFirstDuration = TDuration::Seconds(10);
4949
RecoveryLogCutterRegularDuration = TDuration::Seconds(30);

ydb/core/blobstorage/vdisk/common/vdisk_events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,6 +1252,7 @@ namespace NKikimr {
12521252
if (record.GetIndexOnly())
12531253
str << " IndexOnly";
12541254
if (record.HasMsgQoS()) {
1255+
str << ' ';
12551256
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
12561257
}
12571258
str << " Notify# " << record.GetNotifyIfNotReady()

ydb/core/blobstorage/vdisk/hulldb/fresh/fresh_segment_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ namespace NKikimr {
379379
template <class TRecordMerger>
380380
void PutToMerger(const TMemRec &memRec, ui64 lsn, TRecordMerger *merger) {
381381
TKey key = It.GetValue().Key;
382-
if (merger->HaveToMergeData() && memRec.HasData() && memRec.GetType() == TBlobType::MemBlob) {
382+
if (merger->HaveToMergeData() && memRec.GetType() == TBlobType::MemBlob) {
383383
const TMemPart p = memRec.GetMemData();
384384
const TRope& rope = Seg->GetLogoBlobData(p);
385385
merger->AddFromFresh(memRec, &rope, key, lsn);

ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h

Lines changed: 78 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -108,14 +108,12 @@ namespace NKikimr {
108108
};
109109

110110

111-
////////////////////////////////////////////////////////////////////////////
112-
// TCompactRecordMergerBase
113111
////////////////////////////////////////////////////////////////////////////
114112
// Valid call sequence:
115113
// Clear(); Add(); ... Add(); Finish()
116114
// GetMemRec(); GetData();
117115
template <class TKey, class TMemRec>
118-
class TCompactRecordMergerBase : public TRecordMergerBase<TKey, TMemRec> {
116+
class TCompactRecordMerger : public TRecordMergerBase<TKey, TMemRec> {
119117
protected:
120118
using TBase = TRecordMergerBase<TKey, TMemRec>;
121119
using TBase::MemRec;
@@ -132,18 +130,16 @@ namespace NKikimr {
132130
};
133131

134132
public:
135-
TCompactRecordMergerBase(const TBlobStorageGroupType &gtype, bool addHeader)
133+
TCompactRecordMerger(const TBlobStorageGroupType &gtype, bool addHeader)
136134
: TBase(gtype, true)
137-
, MemRecs()
138-
, ProducingSmallBlob(false)
139-
, NeedToLoadData(ELoadData::NotSet)
140135
, AddHeader(addHeader)
141136
{}
142137

143138
void Clear() {
144139
TBase::Clear();
145140
MemRecs.clear();
146141
ProducingSmallBlob = false;
142+
ProducingHugeBlob = false;
147143
NeedToLoadData = ELoadData::NotSet;
148144
DataMerger.Clear();
149145
}
@@ -156,51 +152,47 @@ namespace NKikimr {
156152
}
157153

158154
void AddFromSegment(const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) {
159-
Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet);
160-
AddBasic(memRec, key);
161-
switch (memRec.GetType()) {
162-
case TBlobType::DiskBlob: {
163-
if (memRec.HasData() && NeedToLoadData == ELoadData::LoadData) {
164-
MemRecs.push_back(memRec);
165-
ProducingSmallBlob = true;
166-
}
167-
break;
168-
}
169-
case TBlobType::HugeBlob:
170-
case TBlobType::ManyHugeBlobs: {
171-
TDiskDataExtractor extr;
172-
memRec.GetDiskData(&extr, outbound);
173-
const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
174-
DataMerger.AddHugeBlob(extr.Begin, extr.End, v, circaLsn);
175-
break;
176-
}
177-
default:
178-
Y_ABORT("Impossible case");
179-
}
180-
VerifyConsistency(memRec, outbound);
155+
Add(memRec, nullptr, outbound, key, circaLsn);
181156
}
182157

183158
void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
159+
Add(memRec, data, nullptr, key, lsn);
160+
}
161+
162+
void Add(const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) {
184163
Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet);
185164
AddBasic(memRec, key);
186-
if (memRec.HasData()) {
187-
if (data) {
188-
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::MemBlob || memRec.GetType() == TBlobType::DiskBlob);
189-
if (NeedToLoadData == ELoadData::LoadData) {
190-
DataMerger.AddBlob(TDiskBlob(data, memRec.GetLocalParts(GType), GType, key.LogoBlobID()));
191-
ProducingSmallBlob = true;
192-
} else {
193-
// intentionally do nothing: don't add any data to DataMerger, because we don't need it
194-
}
195-
} else {
196-
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::HugeBlob);
197-
TDiskDataExtractor extr;
198-
memRec.GetDiskData(&extr, nullptr);
199-
const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
200-
DataMerger.AddHugeBlob(extr.Begin, extr.End, v, lsn);
165+
if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) {
166+
TDiskDataExtractor extr;
167+
switch (memRec.GetType()) {
168+
case TBlobType::MemBlob:
169+
case TBlobType::DiskBlob:
170+
if (NeedToLoadData == ELoadData::LoadData) {
171+
if (data) {
172+
// we have some data in-memory
173+
DataMerger.AddBlob(TDiskBlob(data, local, GType, key.LogoBlobID()));
174+
}
175+
if (memRec.HasData() && memRec.GetType() == TBlobType::DiskBlob) {
176+
// there is something to read from the disk
177+
MemRecs.push_back(memRec);
178+
}
179+
Y_DEBUG_ABORT_UNLESS(!ProducingHugeBlob);
180+
ProducingSmallBlob = true;
181+
}
182+
break;
183+
184+
case TBlobType::ManyHugeBlobs:
185+
Y_ABORT_UNLESS(outbound);
186+
[[fallthrough]];
187+
case TBlobType::HugeBlob:
188+
memRec.GetDiskData(&extr, outbound);
189+
DataMerger.AddHugeBlob(extr.Begin, extr.End, local, lsn);
190+
Y_DEBUG_ABORT_UNLESS(!ProducingSmallBlob);
191+
ProducingHugeBlob = true;
192+
break;
201193
}
202194
}
203-
VerifyConsistency(memRec, nullptr);
195+
VerifyConsistency(memRec, outbound);
204196
}
205197

206198
void VerifyConsistency(const TMemRec& memRec, const TDiskPart *outbound) {
@@ -239,6 +231,13 @@ namespace NKikimr {
239231
}
240232

241233
void Finish() {
234+
if (NeedToLoadData == ELoadData::DontLoadData) {
235+
Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger
236+
// if we have huge blobs for the record, than we set TBlobType::HugeBlob or
237+
// TBlobType::ManyHugeBlobs a few lines below
238+
MemRec.SetNoBlob();
239+
}
240+
242241
Y_DEBUG_ABORT_UNLESS(!Empty());
243242
VerifyConsistency();
244243

@@ -263,118 +262,22 @@ namespace NKikimr {
263262
return &DataMerger;
264263
}
265264

266-
protected:
267-
TStackVec<TMemRec, 16> MemRecs;
268-
bool ProducingSmallBlob;
269-
ELoadData NeedToLoadData;
270-
TDataMerger DataMerger;
271-
const bool AddHeader;
272-
};
273-
274-
////////////////////////////////////////////////////////////////////////////
275-
// TCompactRecordMergerIndexPass
276-
////////////////////////////////////////////////////////////////////////////
277-
template<typename TKey, typename TMemRec>
278-
class TCompactRecordMergerIndexPass : public TCompactRecordMergerBase<TKey, TMemRec> {
279-
using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
280-
281-
using ELoadData = typename TBase::ELoadData;
282-
283-
using TBase::MemRecs;
284-
using TBase::ProducingSmallBlob;
285-
using TBase::NeedToLoadData;
286-
using TBase::DataMerger;
287-
using TBase::MemRec;
288-
289-
public:
290-
TCompactRecordMergerIndexPass(const TBlobStorageGroupType &gtype, bool addHeader)
291-
: TBase(gtype, addHeader)
292-
{}
293-
294-
void Finish() {
295-
if (NeedToLoadData == ELoadData::DontLoadData) {
296-
Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger
297-
// if we have huge blobs for the record, than we set TBlobType::HugeBlob or
298-
// TBlobType::ManyHugeBlobs a few lines below
299-
MemRec.SetNoBlob();
300-
}
301-
302-
TBase::Finish();
303-
}
304-
305265
template<typename TCallback>
306266
void ForEachSmallDiskBlob(TCallback&& callback) {
307267
for (const auto& memRec : MemRecs) {
308268
callback(memRec);
309269
}
310270
}
311-
};
312-
313-
////////////////////////////////////////////////////////////////////////////
314-
// TCompactRecordMergerDataPass
315-
////////////////////////////////////////////////////////////////////////////
316-
template<typename TKey, typename TMemRec>
317-
class TCompactRecordMergerDataPass : public TCompactRecordMergerBase<TKey, TMemRec> {
318-
using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
319-
320-
using TBase::ProducingSmallBlob;
321-
using TBase::MemRecs;
322-
using TBase::MemRec;
323-
using TBase::DataMerger;
324-
using TBase::GType;
325-
using TBase::SetLoadDataMode;
326-
327-
public:
328-
TCompactRecordMergerDataPass(const TBlobStorageGroupType &gtype, bool addHeader)
329-
: TBase(gtype, addHeader)
330-
{
331-
SetLoadDataMode(true);
332-
}
333271

334-
void Clear() {
335-
TBase::Clear();
336-
ReadSmallBlobs.clear();
337-
SetLoadDataMode(true);
338-
}
339-
340-
// add read small blob content; they should come in order as returned from GetSmallBlobDiskParts by index merger
341-
void AddReadSmallBlob(TString data) {
342-
Y_ABORT_UNLESS(ProducingSmallBlob);
343-
ReadSmallBlobs.push_back(std::move(data));
344-
}
345-
346-
void Finish() {
347-
// ensure we are producing small blobs; otherwise this merger should never be created
348-
Y_ABORT_UNLESS(ProducingSmallBlob);
349-
350-
// add all read small blobs into blob merger
351-
const size_t count = ReadSmallBlobs.size();
352-
Y_ABORT_UNLESS(count == +MemRecs, "count# %zu +MemRecs# %zu", count, +MemRecs);
353-
for (size_t i = 0; i < count; ++i) {
354-
const TMemRec& memRec = MemRecs[i]->GetMemRec();
355-
const TString& buffer = ReadSmallBlobs[i];
356-
Y_ABORT_UNLESS(buffer.size() == memRec.DataSize());
357-
DataMerger.AddBlob(TDiskBlob(buffer.data(), buffer.size(), memRec.GetLocalParts(GType)));
358-
}
359-
360-
361-
// ensure that data merger has small blob
362-
Y_ABORT_UNLESS(DataMerger.HasSmallBlobs());
363-
364-
// finalize base class logic; it also generates blob record
365-
TBase::Finish();
366-
367-
// ensure that we have generated correct DiskBlob with full set of declared parts
368-
const TDiskBlob& blob = DataMerger.GetDiskBlobMerger().GetDiskBlob();
369-
Y_ABORT_UNLESS(blob.GetParts() == MemRec.GetLocalParts(GType));
370-
Y_ABORT_UNLESS(MemRec.GetType() == TBlobType::DiskBlob);
371-
}
372-
373-
private:
374-
TVector<TString> ReadSmallBlobs;
272+
protected:
273+
TStackVec<TMemRec, 16> MemRecs;
274+
bool ProducingSmallBlob = false;
275+
bool ProducingHugeBlob = false;
276+
ELoadData NeedToLoadData = ELoadData::NotSet;
277+
TDataMerger DataMerger;
278+
const bool AddHeader;
375279
};
376280

377-
378281
////////////////////////////////////////////////////////////////////////////
379282
// TRecordMergerCallback
380283
////////////////////////////////////////////////////////////////////////////
@@ -412,9 +315,8 @@ namespace NKikimr {
412315
case 1: {
413316
if (memRec.GetType() == TBlobType::DiskBlob) {
414317
// don't deduplicate inplaced data
415-
const TDiskPart &data = extr.SwearOne();
416-
if (data.ChunkIdx && data.Size) {
417-
(*Callback)(data, v);
318+
if (!v.Empty()) {
319+
(*Callback)(extr.SwearOne(), v);
418320
}
419321
} else if (memRec.GetType() == TBlobType::HugeBlob) {
420322
Y_ABORT_UNLESS(v.CountBits() == 1u);
@@ -446,20 +348,31 @@ namespace NKikimr {
446348

447349
void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
448350
AddBasic(memRec, key);
449-
if (memRec.HasData()) {
450-
const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
451-
if (data) {
452-
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::MemBlob);
453-
// we have in-memory data in a rope, it always wins among other data,
454-
// so we call Callback immediately and remove any data for this local part
455-
// from LastWriteWinsMerger
456-
(*Callback)(TDiskBlob(data, v, GType, key.LogoBlobID()));
457-
} else {
458-
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::HugeBlob && v.CountBits() == 1u);
459-
TDiskDataExtractor extr;
460-
memRec.GetDiskData(&extr, nullptr);
461-
// deduplicate huge blob
462-
LastWriteWinsMerger.Add(extr.SwearOne(), v, lsn);
351+
if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) {
352+
TDiskDataExtractor extr;
353+
static TRope rope;
354+
switch (memRec.GetType()) {
355+
case TBlobType::MemBlob:
356+
// we have in-memory data in a rope, it always wins among other data,
357+
// so we call Callback immediately and remove any data for this local part
358+
// from LastWriteWinsMerger
359+
Y_ABORT_UNLESS(data); // HaveToMergeData is true, so data must be present
360+
(*Callback)(TDiskBlob(data, local, GType, key.LogoBlobID()));
361+
break;
362+
363+
case TBlobType::DiskBlob:
364+
Y_ABORT_UNLESS(!memRec.HasData());
365+
(*Callback)(TDiskBlob(&rope, local, GType, key.LogoBlobID())); // pure metadata parts only
366+
break;
367+
368+
case TBlobType::HugeBlob:
369+
Y_ABORT_UNLESS(local.CountBits() == 1);
370+
memRec.GetDiskData(&extr, nullptr);
371+
LastWriteWinsMerger.Add(extr.SwearOne(), local, lsn);
372+
break;
373+
374+
case TBlobType::ManyHugeBlobs:
375+
Y_ABORT("unexpected case");
463376
}
464377
}
465378
}

ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ namespace NKikimr {
2020
////////////////////////////////////////////////////////////////////////////////////////
2121
typedef TLevelSegment<TKeyLogoBlob, TMemRecLogoBlob> TSstLogoBlob;
2222
typedef TSstLogoBlob::TWriter TWriterLogoBlob;
23-
typedef TCompactRecordMergerIndexPass<TKeyLogoBlob, TMemRecLogoBlob> TTLogoBlobCompactRecordMerger;
23+
typedef TCompactRecordMerger<TKeyLogoBlob, TMemRecLogoBlob> TTLogoBlobCompactRecordMerger;
2424

2525
typedef TLevelSegment<TKeyBlock, TMemRecBlock> TSstBlock;
2626
typedef TSstBlock::TWriter TWriterBlock;
27-
typedef TCompactRecordMergerIndexPass<TKeyBlock, TMemRecBlock> TBlockCompactRecordMerger;
27+
typedef TCompactRecordMerger<TKeyBlock, TMemRecBlock> TBlockCompactRecordMerger;
2828
TTestContexts TestCtx;
2929

3030

0 commit comments

Comments
 (0)