Skip to content

Commit c9f4f5a

Browse files
committed
add local cache (for BSC data) in viewer handlers (#15863)
1 parent af3c4bb commit c9f4f5a

File tree

12 files changed

+461
-131
lines changed

12 files changed

+461
-131
lines changed

ydb/core/viewer/json_pipe_req.cpp

Lines changed: 246 additions & 62 deletions
Large diffs are not rendered by default.

ydb/core/viewer/json_pipe_req.h

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,13 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
4343
TString SharedDatabase;
4444
bool Direct = false;
4545
bool NeedRedirect = true;
46-
ui32 Requests = 0;
46+
i32 DataRequests = 0; // how many requests we wait to process data
4747
bool PassedAway = false;
48-
ui32 MaxRequestsInFlight = 200;
48+
bool ReplySent = false;
49+
bool UseCache = false;
50+
TDuration CachedDataMaxAge;
51+
TString Error;
52+
i32 MaxRequestsInFlight = 200;
4953
NWilson::TSpan Span;
5054
IViewer* Viewer = nullptr;
5155
NMon::TEvHttpInfo::TPtr Event;
@@ -57,7 +61,7 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
5761

5862
struct TPipeInfo {
5963
TActorId PipeClient;
60-
ui32 Requests = 0;
64+
i32 Requests = 0;
6165
};
6266

6367
std::unordered_map<TTabletId, TPipeInfo> PipeInfo;
@@ -71,7 +75,7 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
7175

7276
template<typename T>
7377
struct TRequestResponse {
74-
std::variant<std::monostate, std::unique_ptr<T>, TString> Response;
78+
std::variant<std::monostate, std::shared_ptr<T>, TString> Response;
7579
NWilson::TSpan Span;
7680

7781
TRequestResponse() = default;
@@ -80,27 +84,41 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
8084
{}
8185

8286
TRequestResponse(const TRequestResponse&) = delete;
87+
TRequestResponse& operator =(const TRequestResponse& other) = delete;
8388
TRequestResponse(TRequestResponse&&) = default;
84-
TRequestResponse& operator =(const TRequestResponse&) = delete;
8589
TRequestResponse& operator =(TRequestResponse&&) = default;
8690

87-
bool Set(std::unique_ptr<T>&& response) {
88-
if (IsDone()) {
89-
return false;
90-
}
91-
constexpr bool hasErrorCheck = requires(const std::unique_ptr<T>& r) {TViewerPipeClient::IsSuccess(r);};
91+
TRequestResponse(std::shared_ptr<T>&& response)
92+
: Response(std::move(response))
93+
{}
94+
95+
void SetInternal(std::shared_ptr<T>&& response) {
96+
Response = std::move(response);
97+
}
98+
99+
bool Set(std::shared_ptr<T>&& response) {
100+
constexpr bool hasErrorCheck = requires(const T& r) {TViewerPipeClient::IsSuccess(r);};
101+
constexpr bool hasUpdateCache = requires(std::shared_ptr<T>&& r) {TEvViewer::TEvUpdateSharedCacheTabletResponse(r);};
92102
if constexpr (hasErrorCheck) {
93-
if (!TViewerPipeClient::IsSuccess(response)) {
94-
return Error(TViewerPipeClient::GetError(response));
103+
if (!TViewerPipeClient::IsSuccess(*response)) {
104+
return Error(TViewerPipeClient::GetError(*response));
95105
}
96106
}
97-
Span.EndOk();
107+
if (Span) {
108+
Span.EndOk();
109+
}
110+
if constexpr (hasUpdateCache) {
111+
TActivationContext::Send(MakeViewerID(TActivationContext::ActorSystem()->NodeId), std::make_unique<TEvViewer::TEvUpdateSharedCacheTabletResponse>(response));
112+
}
113+
if (IsDone()) {
114+
return false;
115+
}
98116
Response = std::move(response);
99117
return true;
100118
}
101119

102120
bool Set(TAutoPtr<TEventHandle<T>>&& response) {
103-
return Set(std::unique_ptr<T>(response->Release().Release()));
121+
return Set(std::shared_ptr<T>(response->Release().Release()));
104122
}
105123

106124
bool Error(const TString& error) {
@@ -113,7 +131,7 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
113131
}
114132

115133
bool IsOk() const {
116-
return std::holds_alternative<std::unique_ptr<T>>(Response);
134+
return std::holds_alternative<std::shared_ptr<T>>(Response);
117135
}
118136

119137
bool IsError() const {
@@ -129,11 +147,11 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
129147
}
130148

131149
T* Get() {
132-
return std::get<std::unique_ptr<T>>(Response).get();
150+
return std::get<std::shared_ptr<T>>(Response).get();
133151
}
134152

135153
const T* Get() const {
136-
return std::get<std::unique_ptr<T>>(Response).get();
154+
return std::get<std::shared_ptr<T>>(Response).get();
137155
}
138156

139157
T& GetRef() {
@@ -246,14 +264,16 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
246264
static TPathId GetPathId(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
247265
static TString GetPath(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
248266

249-
static bool IsSuccess(const std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySetResult>& ev);
250-
static TString GetError(const std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySetResult>& ev);
267+
static bool IsSuccess(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& ev);
268+
static TString GetError(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& ev);
269+
270+
static bool IsSuccess(const TEvStateStorage::TEvBoardInfo& ev);
271+
static TString GetError(const TEvStateStorage::TEvBoardInfo& ev);
251272

252-
static bool IsSuccess(const std::unique_ptr<TEvStateStorage::TEvBoardInfo>& ev);
253-
static TString GetError(const std::unique_ptr<TEvStateStorage::TEvBoardInfo>& ev);
273+
static bool IsSuccess(const NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult& ev);
274+
static TString GetError(const NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult& ev);
254275

255-
static bool IsSuccess(const std::unique_ptr<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>& ev);
256-
static TString GetError(const std::unique_ptr<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>& ev);
276+
void UpdateSharedCacheTablet(TTabletId tabletId, std::unique_ptr<IEventBase> request);
257277

258278
TRequestResponse<TEvHive::TEvResponseHiveDomainStats> MakeRequestHiveDomainStats(TTabletId hiveId);
259279
TRequestResponse<TEvHive::TEvResponseHiveStorageStats> MakeRequestHiveStorageStats(TTabletId hiveId);
@@ -279,6 +299,11 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
279299
TRequestResponse<NSysView::TEvSysView::TEvGetVSlotsResponse> RequestBSControllerVSlots();
280300
TRequestResponse<NSysView::TEvSysView::TEvGetPDisksResponse> RequestBSControllerPDisks();
281301
TRequestResponse<NSysView::TEvSysView::TEvGetStorageStatsResponse> RequestBSControllerStorageStats();
302+
TRequestResponse<NSysView::TEvSysView::TEvGetGroupsResponse> MakeCachedRequestBSControllerGroups();
303+
TRequestResponse<NSysView::TEvSysView::TEvGetStoragePoolsResponse> MakeCachedRequestBSControllerPools();
304+
TRequestResponse<NSysView::TEvSysView::TEvGetVSlotsResponse> MakeCachedRequestBSControllerVSlots();
305+
TRequestResponse<NSysView::TEvSysView::TEvGetPDisksResponse> MakeCachedRequestBSControllerPDisks();
306+
TRequestResponse<NSysView::TEvSysView::TEvGetStorageStatsResponse> MakeCachedRequestBSControllerStorageStats();
282307
void RequestBSControllerPDiskUpdateStatus(const NKikimrBlobStorage::TUpdateDriveStatus& driveStatus, bool force = false);
283308

284309
THolder<NSchemeCache::TSchemeCacheNavigate> SchemeCacheNavigateRequestBuilder(NSchemeCache::TSchemeCacheNavigate::TEntry&& entry);
@@ -315,18 +340,18 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
315340
}
316341

317342
void ClosePipes();
318-
ui32 FailPipeConnect(TTabletId tabletId);
343+
i32 FailPipeConnect(TTabletId tabletId);
319344

320345
bool IsLastRequest() const {
321-
return Requests == 1;
346+
return DataRequests == 1;
322347
}
323348

324349
bool WaitingForResponse() const {
325-
return Requests != 0;
350+
return DataRequests != 0;
326351
}
327352

328-
bool NoMoreRequests(ui32 requestsDone = 0) const {
329-
return Requests == requestsDone;
353+
bool NoMoreRequests(i32 requestsDone = 0) const {
354+
return DataRequests == requestsDone;
330355
}
331356

332357
TRequestState GetRequest() const;
@@ -343,10 +368,12 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
343368
TString GetHTTPFORBIDDEN(TString contentType = {}, TString response = {});
344369
TString MakeForward(const std::vector<ui32>& nodes);
345370

346-
void RequestDone(ui32 requests = 1);
371+
void RequestDone(i32 requests = 1);
372+
void CacheRequestDone();
347373
void CancelAllRequests();
348374
void AddEvent(const TString& name);
349375
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev);
376+
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev);
350377
void HandleResolveDatabase(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
351378
void HandleResolveResource(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
352379
void HandleResolve(TEvStateStorage::TEvBoardInfo::TPtr& ev);

ydb/core/viewer/json_storage_base.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ class TJsonStorageBase : public TViewerPipeClient {
159159
SendNodeRequests(nodeId);
160160
}
161161
}
162-
if (Requests == 0) {
162+
if (!WaitingForResponse()) {
163163
ReplyAndPassAway();
164164
return;
165165
}
@@ -439,7 +439,7 @@ class TJsonStorageBase : public TViewerPipeClient {
439439
}
440440
}
441441
}
442-
return Requests != 0;
442+
return WaitingForResponse();
443443
}
444444

445445
void CollectDiskInfo(bool needDonors) {

ydb/core/viewer/protos/viewer.proto

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ message TClusterInfo {
319319
string Name = 2;
320320
string Domain = 3;
321321
EFlag Overall = 5;
322+
optional uint64 CachedDataMaxAge = 8;
322323
repeated string Problems = 9;
323324
uint32 NodesTotal = 10;
324325
uint32 NodesAlive = 11;
@@ -516,9 +517,10 @@ message TStorageGroupsInfo {
516517
optional bool NeedGroup = 7;
517518
optional bool NeedSort = 8;
518519
optional bool NeedLimit = 9;
519-
repeated string Problems = 10;
520-
repeated TStorageGroupInfo StorageGroups = 11;
521-
repeated TStorageGroupGroup StorageGroupGroups = 12;
520+
optional uint64 CachedDataMaxAge = 10;
521+
repeated string Problems = 11;
522+
repeated TStorageGroupInfo StorageGroups = 12;
523+
repeated TStorageGroupGroup StorageGroupGroups = 13;
522524
}
523525

524526
message TStorageUsageStats {
@@ -578,6 +580,7 @@ message TNodesInfo {
578580
optional bool NeedSort = 8;
579581
optional bool NeedLimit = 9;
580582
repeated string Problems = 10;
583+
optional uint64 CachedDataMaxAge = 11;
581584
repeated TNodeInfo Nodes = 20;
582585
repeated TNodeGroup NodeGroups = 30;
583586
optional uint64 MaximumDisksPerNode = 50;

ydb/core/viewer/storage_groups.h

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#pragma once
2-
#include "json_handlers.h"
32
#include "json_pipe_req.h"
43
#include "log.h"
54
#include "viewer_helper.h"
@@ -860,7 +859,7 @@ class TStorageGroups : public TViewerPipeClient {
860859

861860
public:
862861
void Bootstrap() override {
863-
if (TBase::NeedToRedirect()) {
862+
if (NeedToRedirect()) {
864863
return;
865864
}
866865
if (Database) {
@@ -877,25 +876,26 @@ class TStorageGroups : public TViewerPipeClient {
877876
RequestWhiteboard();
878877
} else {
879878
if (FieldsNeeded(FieldsBsGroups)) {
880-
GetGroupsResponse = RequestBSControllerGroups();
879+
GetGroupsResponse = MakeCachedRequestBSControllerGroups();
881880
}
882881
if (FieldsNeeded(FieldsBsPools)) {
883-
GetStoragePoolsResponse = RequestBSControllerPools();
882+
GetStoragePoolsResponse = MakeCachedRequestBSControllerPools();
884883
}
885884
if (FieldsNeeded(FieldsBsVSlots)) {
886-
GetVSlotsResponse = RequestBSControllerVSlots();
885+
GetVSlotsResponse = MakeCachedRequestBSControllerVSlots();
887886
}
888887
if (FieldsNeeded(FieldsBsPDisks)) {
889-
GetPDisksResponse = RequestBSControllerPDisks();
888+
GetPDisksResponse = MakeCachedRequestBSControllerPDisks();
890889
}
891890
}
892-
893-
if (Requests == 0) {
894-
return ReplyAndPassAway();
895-
}
896891
TBase::Become(&TThis::StateWork);
897-
Schedule(TDuration::MilliSeconds(Timeout * 50 / 100), new TEvents::TEvWakeup(TimeoutBSC)); // 50% timeout (for bsc)
898-
Schedule(TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup(TimeoutFinal)); // timeout for the rest
892+
ProcessResponses(); // to process cached data
893+
if (WaitingForResponse()) {
894+
Schedule(TDuration::MilliSeconds(Timeout * 50 / 100), new TEvents::TEvWakeup(TimeoutBSC)); // 50% timeout (for bsc)
895+
Schedule(TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup(TimeoutFinal)); // timeout for the rest
896+
} else {
897+
ReplyAndPassAway();
898+
}
899899
}
900900

901901
void ApplyFilter() {
@@ -1980,7 +1980,9 @@ class TStorageGroups : public TViewerPipeClient {
19801980
AddProblem("wb-incomplete-disks");
19811981
ProcessWhiteboardDisks();
19821982
}
1983-
ReplyAndPassAway();
1983+
if (!ReplySent) {
1984+
ReplyAndPassAway();
1985+
}
19841986
break;
19851987
}
19861988
}
@@ -2062,6 +2064,9 @@ class TStorageGroups : public TViewerPipeClient {
20622064
if (NeedLimit) {
20632065
json.SetNeedLimit(true);
20642066
}
2067+
if (CachedDataMaxAge) {
2068+
json.SetCachedDataMaxAge(CachedDataMaxAge.MilliSeconds());
2069+
}
20652070
json.SetTotalGroups(TotalGroups);
20662071
json.SetFoundGroups(FoundGroups);
20672072
for (auto problem : Problems) {

ydb/core/viewer/viewer.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ class TViewer : public TActorBootstrapped<TViewer>, public IViewer {
193193
}
194194
}
195195
}
196+
Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup());
196197
}
197198

198199
const TKikimrRunConfig& GetKikimrRunConfig() const override {
@@ -384,11 +385,68 @@ class TViewer : public TActorBootstrapped<TViewer>, public IViewer {
384385
ui32 CurrentMonitoringPort;
385386
TString CurrentWorkerName;
386387

388+
void Handle(TEvents::TEvWakeup::TPtr&) {
389+
DeleteOldSharedCacheData();
390+
Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup());
391+
}
392+
393+
std::unordered_map<TTabletId, TActorId> TabletPipes;
394+
395+
static NTabletPipe::TClientConfig GetPipeClientConfig() {
396+
return {
397+
.RetryPolicy = NTabletPipe::TClientRetryPolicy::WithRetries()
398+
};
399+
}
400+
401+
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
402+
if (ev->Get()->Status != NKikimrProto::OK) {
403+
TabletPipes.erase(ev->Get()->TabletId);
404+
}
405+
}
406+
407+
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
408+
TabletPipes.erase(ev->Get()->TabletId);
409+
}
410+
411+
void Handle(TEvViewer::TEvUpdateSharedCacheTabletRequest::TPtr& ev) {
412+
auto itPipe = TabletPipes.find(ev->Get()->TabletId);
413+
if (itPipe == TabletPipes.end()) {
414+
auto pipe = NTabletPipe::CreateClient(SelfId(), ev->Get()->TabletId, GetPipeClientConfig());
415+
itPipe = TabletPipes.emplace(ev->Get()->TabletId, RegisterWithSameMailbox(pipe)).first;
416+
}
417+
NTabletPipe::SendData(SelfId(), itPipe->second, ev->Get()->Request.release());
418+
}
419+
420+
void Handle(TEvViewer::TEvUpdateSharedCacheTabletResponse::TPtr& ev) {
421+
UpdateSharedCacheData(std::unique_ptr<TEvViewer::TEvUpdateSharedCacheTabletResponse>(ev->Release().Release()));
422+
}
423+
424+
template<typename TEvent>
425+
void HandleForUpdateSharedCacheData(TAutoPtr<TEventHandle<TEvent>>& ev) {
426+
UpdateSharedCacheData(std::make_unique<TEvViewer::TEvUpdateSharedCacheTabletResponse>(std::shared_ptr<TEvent>(ev->Release().Release())));
427+
}
428+
429+
void PassAway() override {
430+
for (const auto& [tabletId, pipe] : TabletPipes) {
431+
NTabletPipe::CloseClient(SelfId(), pipe);
432+
}
433+
}
434+
387435
STFUNC(StateWork) {
388436
switch (ev->GetTypeRewrite()) {
389437
HFunc(NMon::TEvHttpInfo, Handle);
390438
hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
391439
hFunc(TEvViewer::TEvViewerRequest, Handle);
440+
hFunc(TEvViewer::TEvUpdateSharedCacheTabletRequest, Handle);
441+
hFunc(TEvViewer::TEvUpdateSharedCacheTabletResponse, Handle);
442+
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
443+
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
444+
hFunc(NSysView::TEvSysView::TEvGetStoragePoolsResponse, HandleForUpdateSharedCacheData);
445+
hFunc(NSysView::TEvSysView::TEvGetGroupsResponse, HandleForUpdateSharedCacheData);
446+
hFunc(NSysView::TEvSysView::TEvGetVSlotsResponse, HandleForUpdateSharedCacheData);
447+
hFunc(NSysView::TEvSysView::TEvGetPDisksResponse, HandleForUpdateSharedCacheData);
448+
hFunc(NSysView::TEvSysView::TEvGetStorageStatsResponse, HandleForUpdateSharedCacheData);
449+
hFunc(TEvents::TEvWakeup, Handle);
392450
}
393451
}
394452

0 commit comments

Comments
 (0)