Skip to content

Commit cb76c1d

Browse files
authored
support CheckIntegrity request in blob depot (#20588)
1 parent 429161e commit cb76c1d

16 files changed

+292
-29
lines changed

ydb/core/base/blobstorage.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,21 +1482,20 @@ struct TEvBlobStorage {
14821482

14831483
enum EPlacementStatus {
14841484
PS_OK = 1, // blob parts are placed according to fail model
1485-
PS_BLOB_IS_LOST = 2, // blob is lost/unrecoverable
1485+
PS_REPLICATION_IN_PROGRESS = 2, // there are missing parts but status may become OK after replication
14861486
PS_UNKNOWN = 3, // status is unknown because of missing disks or network problems
1487-
PS_REPLICATION_IN_PROGRESS = 4, // there are missing parts but status may become OK after replication
1488-
PS_BLOB_IS_RECOVERABLE = 5, // blob parts are definitely placed incorrectly or there are missing parts
1489-
// but blob may be recovered
1487+
PS_BLOB_IS_RECOVERABLE = 4, // blob parts are definitely placed incorrectly or there are missing parts but blob may be recovered
1488+
PS_BLOB_IS_LOST = 5, // blob is lost/unrecoverable
14901489
};
14911490
EPlacementStatus PlacementStatus;
14921491

14931492
enum EDataStatus {
14941493
DS_OK = 1, // all data parts contain valid data
1495-
DS_ERROR = 2, // some parts definitely contain invalid data
1496-
DS_UNKNOWN = 3, // status is unknown because of missing disks or network problems
1494+
DS_UNKNOWN = 2, // status is unknown because of missing disks or network problems
1495+
DS_ERROR = 3, // some parts definitely contain invalid data
14971496
};
14981497
EDataStatus DataStatus;
1499-
TString DataErrorInfo; // textual info about errors in blob data
1498+
TString DataInfo; // textual info about checks in blob data
15001499

15011500
std::shared_ptr<TExecutionRelay> ExecutionRelay;
15021501

@@ -1512,7 +1511,7 @@ struct TEvBlobStorage {
15121511
<< " ErrorReason# " << ErrorReason
15131512
<< " PlacementStatus# " << (int)PlacementStatus
15141513
<< " DataStatus# " << (int)DataStatus
1515-
<< " DataErrorInfo# " << DataErrorInfo
1514+
<< " DataInfo# " << DataInfo
15161515
<< " }";
15171516
return str.Str();
15181517
}

ydb/core/blob_depot/agent/agent_impl.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ namespace NKikimr::NBlobDepot {
1717
XX(EvCollectGarbage) \
1818
XX(EvStatus) \
1919
XX(EvPatch) \
20+
XX(EvCheckIntegrity) \
2021
// END
2122

2223
class TBlobDepotAgent;
@@ -58,6 +59,7 @@ namespace NKikimr::NBlobDepot {
5859
bool Error() const { return std::holds_alternative<TError>(Outcome); }
5960
bool Success() const { return std::holds_alternative<TSuccess>(Outcome); }
6061
const TResolvedValue *GetResolvedValue() const { return std::get<TSuccess>(Outcome).Value; }
62+
TString GetErrorReason() const { return std::get<TError>(Outcome).ErrorReason; }
6163

6264
void Output(IOutputStream& s) const {
6365
if (auto *success = std::get_if<TSuccess>(&Outcome)) {
@@ -133,7 +135,8 @@ namespace NKikimr::NBlobDepot {
133135

134136
// underlying DS proxy responses
135137
TEvBlobStorage::TEvGetResult*,
136-
TEvBlobStorage::TEvPutResult*
138+
TEvBlobStorage::TEvPutResult*,
139+
TEvBlobStorage::TEvCheckIntegrityResult*
137140
>;
138141

139142
static TString ToString(const TResponse& response);
@@ -181,6 +184,14 @@ namespace NKikimr::NBlobDepot {
181184
}
182185
};
183186

187+
struct TCheckOutcome {
188+
std::unique_ptr<TEvBlobStorage::TEvCheckIntegrityResult> Result;
189+
190+
TString ToString() const {
191+
return TStringBuilder() << "{Result# " << (Result ? Result->ToString() : "") << "}";
192+
}
193+
};
194+
184195
class TBlobDepotAgent
185196
: public TActorBootstrapped<TBlobDepotAgent>
186197
, public TRequestSender
@@ -270,6 +281,7 @@ namespace NKikimr::NBlobDepot {
270281

271282
hFunc(TEvBlobStorage::TEvGetResult, HandleOtherResponse);
272283
hFunc(TEvBlobStorage::TEvPutResult, HandleOtherResponse);
284+
hFunc(TEvBlobStorage::TEvCheckIntegrityResult, HandleOtherResponse);
273285

274286
ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY)
275287
fFunc(TEvBlobStorage::EvAssimilate, HandleAssimilate);
@@ -429,6 +441,7 @@ namespace NKikimr::NBlobDepot {
429441
virtual void OnIdAllocated(bool /*success*/) {}
430442
virtual void OnDestroy(bool /*success*/) {}
431443
virtual void OnPutS3ObjectResponse(std::optional<TString>&& /*error*/) { Y_ABORT(); }
444+
virtual void OnCheckIntegrity(TCheckOutcome&& /*outcome*/) {}
432445

433446
NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, std::optional<ui32> generation,
434447
ui32 *blockedGeneration = nullptr);
@@ -450,11 +463,15 @@ namespace NKikimr::NBlobDepot {
450463
std::optional<TEvBlobStorage::TEvGet::TReaderTabletData> ReaderTabletData;
451464
TString Key; // the key we are reading -- this is used for retries when we are getting NODATA
452465
};
466+
struct TCheckContext;
453467

454468
bool IssueRead(TReadArg&& arg, TString& error);
455469
void HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg);
456470
void HandleResolveResult(const TRequestContext::TPtr& context, TEvBlobDepot::TEvResolveResult& msg);
457471

472+
void IssueCheckIntegrity(TReadArg&& arg);
473+
void HandleCheckIntegrityResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvCheckIntegrityResult& msg);
474+
458475
public:
459476
struct TDeleter {
460477
static void Destroy(TQuery *query) { delete query; }

ydb/core/blob_depot/agent/proxy.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ namespace NKikimr::NBlobDepot {
1414
case TEvBlobStorage::EvGet:
1515
static_cast<TEvBlobStorage::TEvGet&>(*event).ExecutionRelay = executionRelay;
1616
break;
17+
18+
case TEvBlobStorage::EvCheckIntegrity:
19+
static_cast<TEvBlobStorage::TEvCheckIntegrity&>(*event).ExecutionRelay = executionRelay;
20+
break;
1721
}
1822

1923
const ui64 id = NextOtherRequestId++;

ydb/core/blob_depot/agent/read.cpp

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,24 @@ namespace NKikimr::NBlobDepot {
5858
};
5959
};
6060

61+
struct TBlobDepotAgent::TQuery::TCheckContext
62+
: TRequestContext
63+
, std::enable_shared_from_this<TCheckContext>
64+
{
65+
TReadArg ReadArg;
66+
ui32 NumPartsPending = 0;
67+
std::unique_ptr<TEvBlobStorage::TEvCheckIntegrityResult> Result;
68+
69+
TCheckContext(TReadArg&& readArg)
70+
: ReadArg(std::move(readArg))
71+
, Result(new TEvBlobStorage::TEvCheckIntegrityResult(NKikimrProto::OK))
72+
{}
73+
74+
void End(TQuery *query) {
75+
query->OnCheckIntegrity(TCheckOutcome{std::move(Result)});
76+
}
77+
};
78+
6179
bool TBlobDepotAgent::TQuery::IssueRead(TReadArg&& arg, TString& error) {
6280
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA34, "IssueRead", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
6381
(ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size),
@@ -277,4 +295,66 @@ namespace NKikimr::NBlobDepot {
277295
}
278296
}
279297

298+
void TBlobDepotAgent::TQuery::IssueCheckIntegrity(TReadArg&& arg) {
299+
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA62, "IssueCheckIntegrity", (AgentId, Agent.LogId),
300+
(QueryId, GetQueryId()), (Key, Agent.PrettyKey(arg.Key)), (Value, arg.Value));
301+
302+
auto checkContext = std::make_shared<TCheckContext>(std::move(arg));
303+
304+
for (const auto& value : checkContext->ReadArg.Value.Chain) {
305+
if (value.Blob) {
306+
const auto& [blobId, groupId] = *value.Blob;
307+
auto event = std::make_unique<TEvBlobStorage::TEvCheckIntegrity>(
308+
blobId, TInstant::Max(), checkContext->ReadArg.GetHandleClass);
309+
310+
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA63, "issuing TEvCheckIntegrity", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
311+
(Key, Agent.PrettyKey(checkContext->ReadArg.Key)), (GroupId, groupId), (Msg, *event));
312+
313+
Agent.SendToProxy(groupId, std::move(event), this, checkContext);
314+
++checkContext->NumPartsPending;
315+
}
316+
}
317+
}
318+
319+
void TBlobDepotAgent::TQuery::HandleCheckIntegrityResult(const TRequestContext::TPtr& context,
320+
TEvBlobStorage::TEvCheckIntegrityResult& msg) {
321+
auto& checkContext = context->Obtain<TCheckContext>();
322+
323+
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA64, "HandleCheckIntegrityResult", (AgentId, Agent.LogId),
324+
(QueryId, GetQueryId()), (Key, Agent.PrettyKey(checkContext.ReadArg.Key)), (Msg, msg));
325+
326+
auto& result = checkContext.Result;
327+
switch (msg.Status) {
328+
case NKikimrProto::NODATA:
329+
if (result->Status == NKikimrProto::OK) {
330+
result->Status = NKikimrProto::NODATA;
331+
}
332+
break;
333+
case NKikimrProto::ERROR:
334+
result->Status = NKikimrProto::ERROR;
335+
break;
336+
default:
337+
break;
338+
}
339+
340+
if (msg.ErrorReason) {
341+
if (result->ErrorReason) {
342+
result->ErrorReason += "; ";
343+
}
344+
result->ErrorReason += msg.Id.ToString() + " " + msg.ErrorReason;
345+
}
346+
347+
result->PlacementStatus = std::max(msg.PlacementStatus, result->PlacementStatus);
348+
result->DataStatus = std::max(msg.DataStatus, result->DataStatus);
349+
350+
if (result->DataInfo) {
351+
result->DataInfo += "; ";
352+
}
353+
result->DataInfo += msg.Id.ToString() + " " + msg.DataInfo;
354+
355+
if (!--checkContext.NumPartsPending) {
356+
checkContext.End(this);
357+
}
358+
}
359+
280360
} // NKikimr::NBlobDepot

ydb/core/blob_depot/agent/request.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ namespace NKikimr::NBlobDepot {
109109

110110
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvGetResult::TPtr ev);
111111
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvPutResult::TPtr ev);
112+
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvCheckIntegrityResult::TPtr ev);
112113

113114
void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map,
114115
std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) {
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
#include "agent_impl.h"
2+
#include "blob_mapping_cache.h"
3+
4+
namespace NKikimr::NBlobDepot {
5+
6+
template<>
7+
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvCheckIntegrity>(
8+
std::unique_ptr<IEventHandle> ev, TMonotonic received) {
9+
10+
class TCheckIntegrityQuery : public TBlobStorageQuery<TEvBlobStorage::TEvCheckIntegrity> {
11+
public:
12+
using TBlobStorageQuery::TBlobStorageQuery;
13+
14+
void Initiate() override {
15+
if (IS_LOG_PRIORITY_ENABLED(NLog::PRI_TRACE, NKikimrServices::BLOB_DEPOT_EVENTS)) {
16+
BDEV_QUERY(BDEV25, "TEvCheckIntegrity_new", (U.BlobId, Request.Id));
17+
}
18+
19+
TString blobId = Request.Id.AsBinaryString();
20+
21+
if (const TResolvedValue *value = Agent.BlobMappingCache.ResolveKey(blobId, this,
22+
std::make_shared<TRequestContext>(), false)) {
23+
ProcessResolveResult(value);
24+
} else {
25+
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA58, "resolve pending", (AgentId, Agent.LogId),
26+
(QueryId, GetQueryId()), (BlobId, Request.Id));
27+
}
28+
}
29+
30+
void ProcessResolveResult(const TKeyResolved& result) {
31+
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA59, "ProcessResolveResult", (AgentId, Agent.LogId),
32+
(QueryId, GetQueryId()), (Result, result));
33+
34+
if (result.Error()) {
35+
EndWithError(NKikimrProto::ERROR, result.GetErrorReason());
36+
} else if (const TResolvedValue *value = result.GetResolvedValue(); !value) {
37+
EndWithError(NKikimrProto::NODATA, "no data");
38+
} else {
39+
TReadArg arg{
40+
*value,
41+
Request.GetHandleClass,
42+
false,
43+
0,
44+
Request.Id.BlobSize(),
45+
0,
46+
std::nullopt,
47+
Request.Id.AsBinaryString(),
48+
};
49+
IssueCheckIntegrity(std::move(arg));
50+
}
51+
}
52+
53+
void OnCheckIntegrity(TCheckOutcome&& outcome) override {
54+
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA60, "OnCheckIntegrity", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
55+
(Outcome, outcome));
56+
TraceResponse(outcome.Result->Status);
57+
TBlobStorageQuery::EndWithSuccess(std::move(outcome.Result));
58+
}
59+
60+
void EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) {
61+
TraceResponse(status);
62+
TBlobStorageQuery::EndWithError(status, errorReason);
63+
}
64+
65+
void TraceResponse(NKikimrProto::EReplyStatus status) {
66+
if (IS_LOG_PRIORITY_ENABLED(NLog::PRI_TRACE, NKikimrServices::BLOB_DEPOT_EVENTS)) {
67+
BDEV_QUERY(BDEV26, "TEvCheckIntegrity_end", (BlobId, Request.Id), (Status, status));
68+
}
69+
}
70+
71+
void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override {
72+
if (auto *p = std::get_if<TKeyResolved>(&response)) {
73+
ProcessResolveResult(*p);
74+
} else if (auto *p = std::get_if<TEvBlobStorage::TEvCheckIntegrityResult*>(&response)) {
75+
TQuery::HandleCheckIntegrityResult(context, **p);
76+
} else if (std::holds_alternative<TTabletDisconnected>(response)) {
77+
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA61, "TTabletDisconnected",
78+
(AgentId, Agent.LogId), (QueryId, GetQueryId()));
79+
EndWithError(NKikimrProto::ERROR, "Tablet disconnected");
80+
} else {
81+
Y_ABORT();
82+
}
83+
}
84+
85+
ui64 GetTabletId() const override {
86+
return Request.Id.TabletID();
87+
}
88+
};
89+
90+
return new TCheckIntegrityQuery(*this, std::move(ev), received);
91+
}
92+
93+
} // NKikimr::NBlobDepot

ydb/core/blob_depot/agent/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ LIBRARY()
3636
storage_get.cpp
3737
storage_get_block.cpp
3838
storage_block.cpp
39+
storage_check_integrity.cpp
3940
storage_discover.cpp
4041
storage_range.cpp
4142
storage_collect_garbage.cpp

ydb/core/blobstorage/dsproxy/dsproxy_check_integrity_get.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ class TBlobStorageGroupCheckIntegrityRequest : public TBlobStorageGroupRequestAc
188188
str << diskIdx << ": " << Info->CreateVDiskID(vDiskIdShort) << Endl;
189189
}
190190

191-
PendingResult->DataErrorInfo = str.Str();
192-
PendingResult->DataErrorInfo += partsState.DataErrorInfo;
191+
PendingResult->DataInfo = str.Str();
192+
PendingResult->DataInfo += partsState.DataInfo;
193193

194194
ReplyAndDie(NKikimrProto::OK);
195195
}

ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class TBlobStorageGroupInfo : public TThrRefBase {
169169

170170
struct TPartsState {
171171
bool IsOk = true;
172-
TString DataErrorInfo;
172+
TString DataInfo;
173173
};
174174

175175
virtual TPartsState GetDataState(const TLogoBlobID& id, const TPartsData& partsData) const = 0;

ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_data_check.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class TDataIntegrityCheckerBlock42 : public TDataIntegrityCheckerBase {
8888
partsState.IsOk = false;
8989
layoutReport << "ERROR: There are unequal parts" << Endl;
9090
}
91-
partsState.DataErrorInfo = layoutReport.Str();
91+
partsState.DataInfo = layoutReport.Str();
9292

9393
// checking erasure
9494
TStringStream erasureReport;
@@ -170,7 +170,7 @@ class TDataIntegrityCheckerBlock42 : public TDataIntegrityCheckerBase {
170170
if (!hasUnequalParts) {
171171
checkCombination();
172172
if (!erasureError) {
173-
partsState.DataErrorInfo += erasureReport.Str();
173+
partsState.DataInfo += erasureReport.Str();
174174
return partsState;
175175
}
176176
}
@@ -202,7 +202,7 @@ class TDataIntegrityCheckerBlock42 : public TDataIntegrityCheckerBase {
202202
erasureReport << "ERROR: There are erasure restore fails" << Endl;
203203
}
204204

205-
partsState.DataErrorInfo += erasureReport.Str();
205+
partsState.DataInfo += erasureReport.Str();
206206
return partsState;
207207
}
208208
};
@@ -267,7 +267,7 @@ class TDataIntegrityCheckerMirror : public TDataIntegrityCheckerBase {
267267
partsState.IsOk = false;
268268
layoutReport << "ERROR: There are unequal parts" << Endl;
269269
}
270-
partsState.DataErrorInfo = layoutReport.Str();
270+
partsState.DataInfo = layoutReport.Str();
271271

272272
return partsState;
273273
}

0 commit comments

Comments
 (0)