Skip to content

Commit 7d8473e

Browse files
authored
optimize nodes handler (#8111)
1 parent 70f66fb commit 7d8473e

23 files changed

+2367
-1021
lines changed

ydb/core/base/board_lookup.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ namespace NKikimr {
2525
class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
2626
const TString Path;
2727
const TActorId Owner;
28+
const ui64 Cookie;
2829
const EBoardLookupMode Mode;
2930
const bool Subscriber;
3031
TBoardRetrySettings BoardRetrySettings;
@@ -111,12 +112,12 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
111112
void NotAvailable() {
112113
if (CurrentStateFunc() != &TThis::StateSubscribe) {
113114
Send(Owner, new TEvStateStorage::TEvBoardInfo(
114-
TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path));
115+
TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path), 0, Cookie);
115116
} else {
116117
Send(Owner,
117118
new TEvStateStorage::TEvBoardInfoUpdate(
118119
TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path
119-
)
120+
), 0, Cookie
120121
);
121122
}
122123
return PassAway();
@@ -129,7 +130,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
129130
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfo>(
130131
TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
131132
reply->InfoEntries = std::move(Info);
132-
Send(Owner, std::move(reply));
133+
Send(Owner, std::move(reply), 0, Cookie);
133134
if (Subscriber) {
134135
Become(&TThis::StateSubscribe);
135136
return;
@@ -240,7 +241,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
240241
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
241242
TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
242243
reply->Updates = { { oid, std::move(update.value()) } };
243-
Send(Owner, std::move(reply));
244+
Send(Owner, std::move(reply), 0, Cookie);
244245
}
245246
} else {
246247
if (info.GetDropped()) {
@@ -308,7 +309,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
308309
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
309310
TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
310311
reply->Updates = std::move(updates);
311-
Send(Owner, std::move(reply));
312+
Send(Owner, std::move(reply), 0, Cookie);
312313
}
313314
}
314315

@@ -484,7 +485,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
484485
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
485486
TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
486487
reply->Updates = std::move(updates);
487-
Send(Owner, std::move(reply));
488+
Send(Owner, std::move(reply), 0, Cookie);
488489
}
489490
}
490491

@@ -495,9 +496,10 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
495496

496497
TBoardLookupActor(
497498
const TString &path, TActorId owner, EBoardLookupMode mode,
498-
TBoardRetrySettings boardRetrySettings)
499+
TBoardRetrySettings boardRetrySettings, ui64 cookie = 0)
499500
: Path(path)
500501
, Owner(owner)
502+
, Cookie(cookie)
501503
, Mode(mode)
502504
, Subscriber(Mode == EBoardLookupMode::Subscription)
503505
, BoardRetrySettings(std::move(boardRetrySettings))
@@ -545,8 +547,8 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
545547

546548
IActor* CreateBoardLookupActor(
547549
const TString &path, const TActorId &owner, EBoardLookupMode mode,
548-
TBoardRetrySettings boardRetrySettings) {
549-
return new TBoardLookupActor(path, owner, mode, std::move(boardRetrySettings));
550+
TBoardRetrySettings boardRetrySettings, ui64 cookie) {
551+
return new TBoardLookupActor(path, owner, mode, std::move(boardRetrySettings), cookie);
550552
}
551553

552554
}

ydb/core/base/statestorage.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ IActor* CreateStateStorageBoardReplica(const TIntrusivePtr<TStateStorageInfo> &,
547547
IActor* CreateSchemeBoardReplica(const TIntrusivePtr<TStateStorageInfo>&, ui32);
548548
IActor* CreateBoardLookupActor(
549549
const TString &path, const TActorId &owner, EBoardLookupMode mode,
550-
TBoardRetrySettings boardRetrySettings = {});
550+
TBoardRetrySettings boardRetrySettings = {}, ui64 cookie = 0);
551551
IActor* CreateBoardPublishActor(
552552
const TString &path, const TString &payload, const TActorId &owner, ui32 ttlMs, bool reg,
553553
TBoardRetrySettings boardRetrySettings = {});

ydb/core/mon/async_http_mon.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,10 +554,26 @@ class THttpMonServiceNodeRequest : public TActorBootstrapped<THttpMonServiceNode
554554
}
555555
}
556556

557+
TString RewriteWithForwardedFromNode(const TString& response) {
558+
NHttp::THttpParser<NHttp::THttpRequest, NHttp::TSocketBuffer> parser(response);
559+
560+
NHttp::THeadersBuilder headers(parser.Headers);
561+
headers.Set("X-Forwarded-From-Node", TStringBuilder() << Event->Sender.NodeId());
562+
563+
NHttp::THttpRenderer<NHttp::THttpRequest, NHttp::TSocketBuffer> renderer;
564+
renderer.InitRequest(parser.Method, parser.URL, parser.Protocol, parser.Version);
565+
renderer.Set(headers);
566+
if (parser.HaveBody()) {
567+
renderer.SetBody(parser.Body); // it shouldn't be here, 30x with a body is a bad idea
568+
}
569+
renderer.Finish();
570+
return renderer.AsString();
571+
}
572+
557573
void Bootstrap() {
558574
NHttp::THttpConfig::SocketAddressType address;
559575
FromProto(address, Event->Get()->Record.GetAddress());
560-
NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(Event->Get()->Record.GetHttpRequest(), Endpoint, address);
576+
NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(RewriteWithForwardedFromNode(Event->Get()->Record.GetHttpRequest()), Endpoint, address);
561577
TStringBuilder prefix;
562578
prefix << "/node/" << TActivationContext::ActorSystem()->NodeId;
563579
if (request->URL.SkipPrefix(prefix)) {

ydb/core/node_whiteboard/node_whiteboard.h

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -375,12 +375,6 @@ struct TEvWhiteboard{
375375

376376
TEvSystemStateUpdate(const TNodeLocation& systemLocation) {
377377
systemLocation.Serialize(Record.MutableLocation(), false);
378-
const auto& x = systemLocation.GetLegacyValue();
379-
auto *pb = Record.MutableSystemLocation();
380-
pb->SetDataCenter(x.DataCenter);
381-
pb->SetRoom(x.Room);
382-
pb->SetRack(x.Rack);
383-
pb->SetBody(x.Body);
384378
}
385379

386380
TEvSystemStateUpdate(const NKikimrWhiteboard::TSystemStateInfo& systemStateInfo) {
@@ -503,5 +497,38 @@ inline TActorId MakeNodeWhiteboardServiceId(ui32 node) {
503497

504498
IActor* CreateNodeWhiteboardService();
505499

506-
} // NTabletState
500+
template<typename TRequestType>
501+
struct WhiteboardResponse {};
502+
503+
template<>
504+
struct WhiteboardResponse<TEvWhiteboard::TEvTabletStateRequest> {
505+
using Type = TEvWhiteboard::TEvTabletStateResponse;
506+
};
507+
508+
template<>
509+
struct WhiteboardResponse<TEvWhiteboard::TEvPDiskStateRequest> {
510+
using Type = TEvWhiteboard::TEvPDiskStateResponse;
511+
};
512+
513+
template<>
514+
struct WhiteboardResponse<TEvWhiteboard::TEvVDiskStateRequest> {
515+
using Type = TEvWhiteboard::TEvVDiskStateResponse;
516+
};
517+
518+
template<>
519+
struct WhiteboardResponse<TEvWhiteboard::TEvSystemStateRequest> {
520+
using Type = TEvWhiteboard::TEvSystemStateResponse;
521+
};
522+
523+
template<>
524+
struct WhiteboardResponse<TEvWhiteboard::TEvBSGroupStateRequest> {
525+
using Type = TEvWhiteboard::TEvBSGroupStateResponse;
526+
};
527+
528+
template<>
529+
struct WhiteboardResponse<TEvWhiteboard::TEvNodeStateRequest> {
530+
using Type = TEvWhiteboard::TEvNodeStateResponse;
531+
};
532+
533+
} // NNodeWhiteboard
507534
} // NKikimr

ydb/core/testlib/test_client.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,12 @@ namespace Tests {
606606

607607
NKikimrBlobStorage::TDefineHostConfig hostConfig;
608608
hostConfig.SetHostConfigId(nodeId);
609-
TString path = TStringBuilder() << Runtime->GetTempDir() << "pdisk_1.dat";
609+
TString path;
610+
if (Settings->UseSectorMap) {
611+
path ="SectorMap:test-client[:2000]";
612+
} else {
613+
path = TStringBuilder() << Runtime->GetTempDir() << "pdisk_1.dat";
614+
}
610615
hostConfig.AddDrive()->SetPath(path);
611616
if (Settings->Verbose) {
612617
Cerr << "test_client.cpp: SetPath # " << path << Endl;

ydb/core/testlib/test_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ namespace Tests {
159159
bool InitializeFederatedQuerySetupFactory = false;
160160
TString ServerCertFilePath;
161161
bool Verbose = true;
162+
bool UseSectorMap = false;
162163

163164
std::function<IActor*(const TTicketParserSettings&)> CreateTicketParser = NKikimr::CreateTicketParser;
164165
std::shared_ptr<TGrpcServiceFactory> GrpcServiceFactory;
@@ -211,6 +212,7 @@ namespace Tests {
211212
TServerSettings& SetYtGateway(NYql::IYtGateway::TPtr ytGateway) { YtGateway = std::move(ytGateway); return *this; }
212213
TServerSettings& SetInitializeFederatedQuerySetupFactory(bool value) { InitializeFederatedQuerySetupFactory = value; return *this; }
213214
TServerSettings& SetVerbose(bool value) { Verbose = value; return *this; }
215+
TServerSettings& SetUseSectorMap(bool value) { UseSectorMap = value; return *this; }
214216
TServerSettings& SetPersQueueGetReadSessionsInfoWorkerFactory(
215217
std::shared_ptr<NKikimr::NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> factory
216218
) {

ydb/core/viewer/json_handlers_viewer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ void InitViewerHealthCheckJsonHandler(TJsonHandlers& handlers) {
243243
}
244244

245245
void InitViewerNodesJsonHandler(TJsonHandlers& handlers) {
246-
handlers.AddHandler("/viewer/nodes", new TJsonHandler<TJsonNodes>(TJsonNodes::GetSwagger()));
246+
handlers.AddHandler("/viewer/nodes", new TJsonHandler<TJsonNodes>(TJsonNodes::GetSwagger()), 2);
247247
}
248248

249249
void InitViewerACLJsonHandler(TJsonHandlers &jsonHandlers) {

ydb/core/viewer/json_pipe_req.cpp

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ TViewerPipeClient::~TViewerPipeClient() = default;
1515

1616
TViewerPipeClient::TViewerPipeClient() = default;
1717

18+
TViewerPipeClient::TViewerPipeClient(NWilson::TTraceId traceId) {
19+
if (traceId) {
20+
Span = {TComponentTracingLevels::THttp::TopLevel, std::move(traceId), "viewer", NWilson::EFlags::AUTO_END};
21+
}
22+
}
23+
1824
TViewerPipeClient::TViewerPipeClient(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
1925
: Viewer(viewer)
2026
, Event(ev)
@@ -133,6 +139,49 @@ void TViewerPipeClient::RequestHiveStorageStats(NNodeWhiteboard::TTabletId hiveI
133139
SendRequestToPipe(pipeClient, request.Release(), hiveId);
134140
}
135141

142+
TViewerPipeClient::TRequestResponse<TEvViewer::TEvViewerResponse> TViewerPipeClient::MakeViewerRequest(TNodeId nodeId, TEvViewer::TEvViewerRequest* ev, ui32 flags) {
143+
TActorId viewerServiceId = MakeViewerID(nodeId);
144+
TRequestResponse<TEvViewer::TEvViewerResponse> response(Span.CreateChild(TComponentTracingLevels::THttp::Detailed, TypeName(*ev)));
145+
if (response.Span) {
146+
response.Span.Attribute("target_node_id", nodeId);
147+
TStringBuilder askFor;
148+
askFor << ev->Record.GetLocation().NodeIdSize() << " nodes (";
149+
for (size_t i = 0; i < std::min<size_t>(ev->Record.GetLocation().NodeIdSize(), 16); ++i) {
150+
if (i) {
151+
askFor << ", ";
152+
}
153+
askFor << ev->Record.GetLocation().GetNodeId(i);
154+
}
155+
if (ev->Record.GetLocation().NodeIdSize() > 16) {
156+
askFor << ", ...";
157+
}
158+
askFor << ")";
159+
response.Span.Attribute("ask_for", askFor);
160+
switch (ev->Record.Request_case()) {
161+
case NKikimrViewer::TEvViewerRequest::kTabletRequest:
162+
response.Span.Attribute("request_type", "TabletRequest");
163+
break;
164+
case NKikimrViewer::TEvViewerRequest::kSystemRequest:
165+
response.Span.Attribute("request_type", "SystemRequest");
166+
break;
167+
case NKikimrViewer::TEvViewerRequest::kQueryRequest:
168+
response.Span.Attribute("request_type", "QueryRequest");
169+
break;
170+
case NKikimrViewer::TEvViewerRequest::kRenderRequest:
171+
response.Span.Attribute("request_type", "RenderRequest");
172+
break;
173+
case NKikimrViewer::TEvViewerRequest::kAutocompleteRequest:
174+
response.Span.Attribute("request_type", "AutocompleteRequest");
175+
break;
176+
default:
177+
response.Span.Attribute("request_type", ::ToString(static_cast<int>(ev->Record.Request_case())));
178+
break;
179+
}
180+
}
181+
SendRequest(viewerServiceId, ev, flags, nodeId, response.Span.GetTraceId());
182+
return response;
183+
}
184+
136185
TViewerPipeClient::TRequestResponse<TEvHive::TEvResponseHiveDomainStats> TViewerPipeClient::MakeRequestHiveDomainStats(NNodeWhiteboard::TTabletId hiveId) {
137186
TActorId pipeClient = ConnectTabletPipe(hiveId);
138187
THolder<TEvHive::TEvRequestHiveDomainStats> request = MakeHolder<TEvHive::TEvRequestHiveDomainStats>();
@@ -157,6 +206,16 @@ TViewerPipeClient::TRequestResponse<TEvHive::TEvResponseHiveStorageStats> TViewe
157206
return response;
158207
}
159208

209+
TViewerPipeClient::TRequestResponse<TEvHive::TEvResponseHiveNodeStats> TViewerPipeClient::MakeRequestHiveNodeStats(TTabletId hiveId, TEvHive::TEvRequestHiveNodeStats* request) {
210+
TActorId pipeClient = ConnectTabletPipe(hiveId);
211+
auto response = MakeRequestToPipe<TEvHive::TEvResponseHiveNodeStats>(pipeClient, request, hiveId);
212+
if (response.Span) {
213+
auto hive_id = "#" + ::ToString(hiveId);
214+
response.Span.Attribute("hive_id", hive_id);
215+
}
216+
return response;
217+
}
218+
160219
TViewerPipeClient::TRequestResponse<TEvViewer::TEvViewerResponse> TViewerPipeClient::MakeRequestViewer(TNodeId nodeId, TEvViewer::TEvViewerRequest* request, ui32 flags) {
161220
auto requestType = request->Record.GetRequestCase();
162221
auto response = MakeRequest<TEvViewer::TEvViewerResponse>(MakeViewerID(nodeId), request, flags, nodeId);
@@ -415,6 +474,18 @@ void TViewerPipeClient::RequestStateStorageEndpointsLookup(const TString& path)
415474
++Requests;
416475
}
417476

477+
TViewerPipeClient::TRequestResponse<TEvStateStorage::TEvBoardInfo> TViewerPipeClient::MakeRequestStateStorageEndpointsLookup(const TString& path, ui64 cookie) {
478+
TRequestResponse<TEvStateStorage::TEvBoardInfo> response(Span.CreateChild(TComponentTracingLevels::THttp::Detailed, "BoardLookupActor"));
479+
RegisterWithSameMailbox(CreateBoardLookupActor(MakeEndpointsBoardPath(path),
480+
SelfId(),
481+
EBoardLookupMode::Second, {}, cookie));
482+
if (response.Span) {
483+
response.Span.Attribute("path", path);
484+
}
485+
++Requests;
486+
return response;
487+
}
488+
418489
void TViewerPipeClient::RequestStateStorageMetadataCacheEndpointsLookup(const TString& path) {
419490
if (!AppData()->DomainsInfo->Domain) {
420491
return;
@@ -519,6 +590,9 @@ TString TViewerPipeClient::MakeForward(const std::vector<ui32>& nodes) {
519590
}
520591

521592
void TViewerPipeClient::RequestDone(ui32 requests) {
593+
if (requests == 0) {
594+
return;
595+
}
522596
Requests -= requests;
523597
if (!DelayedRequests.empty()) {
524598
SendDelayedRequests();
@@ -535,13 +609,56 @@ void TViewerPipeClient::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
535609
}
536610
}
537611

612+
void TViewerPipeClient::HandleResolveDatabase(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
613+
if (ev->Get()->Request->ResultSet.size() == 1 && ev->Get()->Request->ResultSet.begin()->Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
614+
TSchemeCacheNavigate::TEntry& entry(ev->Get()->Request->ResultSet.front());
615+
if (entry.DomainInfo) {
616+
if (entry.DomainInfo->ResourcesDomainKey && entry.DomainInfo->DomainKey != entry.DomainInfo->ResourcesDomainKey) {
617+
RequestSchemeCacheNavigate(TPathId(entry.DomainInfo->ResourcesDomainKey));
618+
} else {
619+
RequestStateStorageEndpointsLookup(CanonizePath(entry.Path));
620+
}
621+
}
622+
} else {
623+
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database"));
624+
}
625+
}
626+
627+
void TViewerPipeClient::HandleResolveDatabase(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
628+
ReplyAndPassAway(MakeForward(GetNodesFromBoardReply(ev)));
629+
}
630+
538631
void TViewerPipeClient::HandleTimeout() {
539632
ReplyAndPassAway(GetHTTPGATEWAYTIMEOUT());
540633
}
541634

635+
STATEFN(TViewerPipeClient::StateResolveDatabase) {
636+
switch (ev->GetTypeRewrite()) {
637+
hFunc(TEvStateStorage::TEvBoardInfo, HandleResolveDatabase);
638+
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleResolveDatabase);
639+
cFunc(TEvents::TEvWakeup::EventType, HandleTimeout);
640+
}
641+
}
642+
643+
void TViewerPipeClient::RedirectToDatabase(const TString& database) {
644+
RequestSchemeCacheNavigate(database);
645+
Become(&TViewerPipeClient::StateResolveDatabase, TDuration::MilliSeconds(1000), new TEvents::TEvWakeup());
646+
}
647+
542648
void TViewerPipeClient::PassAway() {
649+
std::sort(SubscriptionNodeIds.begin(), SubscriptionNodeIds.end());
650+
SubscriptionNodeIds.erase(std::unique(SubscriptionNodeIds.begin(), SubscriptionNodeIds.end()), SubscriptionNodeIds.end());
651+
for (TNodeId nodeId : SubscriptionNodeIds) {
652+
Send(TActivationContext::InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe());
653+
}
543654
ClosePipes();
544655
TBase::PassAway();
545656
}
546657

658+
void TViewerPipeClient::AddEvent(const TString& name) {
659+
if (Span) {
660+
Span.Event(name);
661+
}
662+
}
663+
547664
}

0 commit comments

Comments
 (0)