Skip to content

Commit 3bca1eb

Browse files
authored
Add fetch and replace public api (#11313)
1 parent 2ff8722 commit 3bca1eb

29 files changed

+1094
-83
lines changed

ydb/core/driver_lib/run/run.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898

9999
#include <ydb/services/auth/grpc_service.h>
100100
#include <ydb/services/cms/grpc_service.h>
101+
#include <ydb/services/bsconfig/grpc_service.h>
101102
#include <ydb/services/dynamic_config/grpc_service.h>
102103
#include <ydb/services/datastreams/grpc_service.h>
103104
#include <ydb/services/discovery/grpc_service.h>
@@ -613,6 +614,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
613614
names["tablet_service"] = &hasTabletService;
614615
TServiceCfg hasView = services.empty();
615616
names["view"] = &hasView;
617+
TServiceCfg hasBSConfig = services.empty();
618+
names["bsconfig"] = &hasBSConfig;
616619

617620
std::unordered_set<TString> enabled;
618621
for (const auto& name : services) {
@@ -903,6 +906,10 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
903906
grpcRequestProxies[0], hasView.IsRlAllowed()));
904907
}
905908

909+
if (hasBSConfig) {
910+
server.AddService(new NGRpcService::TBSConfigGRpcService(ActorSystem.Get(), Counters, grpcRequestProxies[0]));
911+
}
912+
906913
if (ModuleFactories) {
907914
for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) {
908915
server.AddService(service);

ydb/core/driver_lib/run/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ PEERDIR(
155155
ydb/public/lib/deprecated/client
156156
ydb/services/auth
157157
ydb/services/backup
158+
ydb/services/bsconfig
158159
ydb/services/cms
159160
ydb/services/dynamic_config
160161
ydb/services/datastreams
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#include "service_keyvalue.h"
2+
#include <ydb/library/yaml_config/yaml_config_parser.h>
3+
#include <ydb/library/yaml_config/tools/util/defaults.h>
4+
#include "rpc_bsconfig_base.h"
5+
6+
#include <ydb/core/base/path.h>
7+
#include <ydb/core/grpc_services/rpc_common/rpc_common.h>
8+
#include <ydb/core/mind/local.h>
9+
#include <ydb/core/protos/local.pb.h>
10+
11+
namespace NKikimr::NGRpcService {
12+
13+
using TEvReplaceStorageConfigRequest =
14+
TGrpcRequestOperationCall<Ydb::BSConfig::ReplaceStorageConfigRequest,
15+
Ydb::BSConfig::ReplaceStorageConfigResponse>;
16+
using TEvFetchStorageConfigRequest =
17+
TGrpcRequestOperationCall<Ydb::BSConfig::FetchStorageConfigRequest,
18+
Ydb::BSConfig::FetchStorageConfigResponse>;
19+
20+
using namespace NActors;
21+
using namespace Ydb;
22+
23+
bool CopyToConfigRequest(const Ydb::BSConfig::ReplaceStorageConfigRequest &from, NKikimrBlobStorage::TConfigRequest *to) {
24+
to->CopyFrom(NKikimr::NYaml::BuildInitDistributedStorageCommand(from.yaml_config()));
25+
return true;
26+
}
27+
28+
void CopyFromConfigResponse(const NKikimrBlobStorage::TConfigResponse &/*from*/, Ydb::BSConfig::ReplaceStorageConfigResult* /*to*/) {
29+
}
30+
31+
bool CopyToConfigRequest(const Ydb::BSConfig::FetchStorageConfigRequest &/*from*/, NKikimrBlobStorage::TConfigRequest *to) {
32+
to->AddCommand()->MutableReadHostConfig();
33+
to->AddCommand()->MutableReadBox();
34+
return true;
35+
}
36+
37+
void CopyFromConfigResponse(const NKikimrBlobStorage::TConfigResponse &from, Ydb::BSConfig::FetchStorageConfigResult *to) {
38+
auto hostConfigStatus = from.GetStatus()[0];
39+
auto boxStatus = from.GetStatus()[1];
40+
NKikimrConfig::StorageConfig storageConfig;
41+
int itemConfigGeneration = 0;
42+
for (const auto& hostConfig: hostConfigStatus.GetHostConfig()) {
43+
itemConfigGeneration = std::max(itemConfigGeneration, static_cast<int>(hostConfig.GetItemConfigGeneration()));
44+
auto *newHostConfig = storageConfig.add_host_config();
45+
newHostConfig->set_host_config_id(hostConfig.GetHostConfigId());
46+
for (const auto& drive : hostConfig.GetDrive()) {
47+
auto *newDrive = newHostConfig->add_drive();
48+
newDrive->set_path(drive.GetPath());
49+
newDrive->set_type(GetDiskType(drive.GetType()));
50+
newDrive->set_shared_with_os(drive.GetSharedWithOs());
51+
newDrive->set_read_centric(drive.GetReadCentric());
52+
newDrive->set_kind(drive.GetKind());
53+
newDrive->set_expected_slot_count(hostConfig.GetDefaultHostPDiskConfig().GetExpectedSlotCount());
54+
}
55+
}
56+
auto boxes = boxStatus.GetBox();
57+
if (!boxes.empty()) {
58+
auto box = boxes[0];
59+
itemConfigGeneration = std::max(itemConfigGeneration, static_cast<int>(box.GetItemConfigGeneration()));
60+
for (const auto& host : box.GetHost()) {
61+
auto *newHost = storageConfig.add_host();
62+
newHost->set_host_config_id(host.GetHostConfigId());
63+
auto *newHostKey = newHost->mutable_key();
64+
const auto& hostKey = host.GetKey();
65+
if (hostKey.GetNodeId()) {
66+
newHostKey->set_node_id(hostKey.GetNodeId());
67+
}
68+
else {
69+
auto *endpoint = newHostKey->mutable_endpoint();
70+
endpoint->set_fqdn(hostKey.GetFqdn());
71+
endpoint->set_ic_port(hostKey.GetIcPort());
72+
}
73+
}
74+
}
75+
storageConfig.set_item_config_generation(itemConfigGeneration);
76+
to->set_yaml_config(NYaml::ParseProtoToYaml(storageConfig));
77+
}
78+
79+
class TReplaceStorageConfigRequest : public TBSConfigRequestGrpc<TReplaceStorageConfigRequest, TEvReplaceStorageConfigRequest,
80+
Ydb::BSConfig::ReplaceStorageConfigResult> {
81+
public:
82+
using TBase = TBSConfigRequestGrpc<TReplaceStorageConfigRequest, TEvReplaceStorageConfigRequest, Ydb::BSConfig::ReplaceStorageConfigResult>;
83+
using TBase::TBase;
84+
85+
bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override {
86+
return true;
87+
}
88+
NACLib::EAccessRights GetRequiredAccessRights() const {
89+
return NACLib::GenericManage;
90+
}
91+
};
92+
93+
class TFetchStorageConfigRequest : public TBSConfigRequestGrpc<TFetchStorageConfigRequest, TEvFetchStorageConfigRequest,
94+
Ydb::BSConfig::FetchStorageConfigResult> {
95+
public:
96+
using TBase = TBSConfigRequestGrpc<TFetchStorageConfigRequest, TEvFetchStorageConfigRequest, Ydb::BSConfig::FetchStorageConfigResult>;
97+
using TBase::TBase;
98+
99+
bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) override {
100+
return true;
101+
}
102+
NACLib::EAccessRights GetRequiredAccessRights() const {
103+
return NACLib::GenericManage;
104+
}
105+
};
106+
107+
void DoReplaceBSConfig(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
108+
TActivationContext::AsActorContext().Register(new TReplaceStorageConfigRequest(p.release()));
109+
}
110+
111+
void DoFetchBSConfig(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
112+
TActivationContext::AsActorContext().Register(new TFetchStorageConfigRequest(p.release()));
113+
}
114+
115+
116+
} // namespace NKikimr::NGRpcService
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
#pragma once
2+
#include "defs.h"
3+
4+
#include "rpc_deferrable.h"
5+
6+
#include <ydb/public/api/protos/ydb_bsconfig.pb.h>
7+
#include <ydb/core/blobstorage/base/blobstorage_events.h>
8+
#include <ydb/core/protos/blobstorage_base3.pb.h>
9+
#include <ydb/core/base/tabletid.h>
10+
#include <ydb/library/ydb_issue/issue_helpers.h>
11+
#include <ydb/core/cms/console/configs_dispatcher.h>
12+
#include <ydb/core/ydb_convert/ydb_convert.h>
13+
#include <ydb/public/lib/operation_id/operation_id.h>
14+
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
15+
16+
namespace NKikimr::NGRpcService {
17+
18+
struct TDriveDevice {
19+
public:
20+
21+
const TString GetPath() const {
22+
return Path;
23+
}
24+
25+
constexpr NKikimrBlobStorage::EPDiskType GetType() const {
26+
return Type;
27+
}
28+
29+
TDriveDevice(TString path, NKikimrBlobStorage::EPDiskType type)
30+
: Path(path), Type(type) {}
31+
32+
auto operator<=>(const TDriveDevice &) const = default;
33+
34+
private:
35+
TString Path;
36+
NKikimrBlobStorage::EPDiskType Type;
37+
};
38+
39+
template <typename TResult>
40+
Ydb::StatusIds::StatusCode PullStatus(const TResult& status) {
41+
if (!status.GetResponse().GetSuccess()) {
42+
return Ydb::StatusIds::INTERNAL_ERROR;
43+
}
44+
return Ydb::StatusIds::SUCCESS;
45+
}
46+
47+
TString GetDiskType(NKikimrBlobStorage::EPDiskType type) {
48+
switch (type) {
49+
case NKikimrBlobStorage::EPDiskType::ROT:
50+
return "HDD";
51+
case NKikimrBlobStorage::EPDiskType::SSD:
52+
return "SSD";
53+
case NKikimrBlobStorage::EPDiskType::NVME:
54+
return "NVME";
55+
default:
56+
return "UNKNOWN";
57+
}
58+
}
59+
60+
}
61+
62+
template <>
63+
struct THash<NKikimr::NGRpcService::TDriveDevice> {
64+
std::size_t operator()(const NKikimr::NGRpcService::TDriveDevice &device) const {
65+
return THash<TString>()(device.GetPath()) ^ THash<NKikimrBlobStorage::EPDiskType>()(device.GetType());
66+
}
67+
};
68+
69+
namespace NKikimr::NGRpcService {
70+
71+
class TDriveDeviceSet {
72+
public:
73+
void AddDevice(const TDriveDevice& device) {
74+
if (Devices.insert(device).second) {
75+
Hash ^= THash<TDriveDevice>()(device);
76+
}
77+
}
78+
79+
void RemoveDevice(const TDriveDevice& device) {
80+
if (Devices.erase(device)) {
81+
Hash ^= THash<TDriveDevice>()(device);
82+
}
83+
}
84+
85+
std::size_t GetHash() const {
86+
return Hash;
87+
}
88+
89+
const THashSet<TDriveDevice>& GetDevices() const {
90+
return Devices;
91+
}
92+
93+
bool operator==(const TDriveDeviceSet& other) const {
94+
return Devices == other.Devices;
95+
}
96+
97+
private:
98+
THashSet<TDriveDevice> Devices;
99+
std::size_t Hash = 0;
100+
};
101+
102+
103+
bool CopyToConfigRequest(const Ydb::BSConfig::ReplaceStorageConfigRequest &from, NKikimrBlobStorage::TConfigRequest *to);
104+
bool CopyToConfigRequest(const Ydb::BSConfig::FetchStorageConfigRequest &from, NKikimrBlobStorage::TConfigRequest *to);
105+
void CopyFromConfigResponse(const NKikimrBlobStorage::TConfigResponse &/*from*/, Ydb::BSConfig::ReplaceStorageConfigResult* /*to*/);
106+
void CopyFromConfigResponse(const NKikimrBlobStorage::TConfigResponse &from, Ydb::BSConfig::FetchStorageConfigResult *to);
107+
108+
template <typename TDerived>
109+
class TBaseBSConfigRequest {
110+
protected:
111+
void OnBootstrap() {
112+
auto self = static_cast<TDerived*>(this);
113+
Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
114+
NYql::TIssues issues;
115+
if (!self->ValidateRequest(status, issues)) {
116+
self->Reply(status, issues, self->ActorContext());
117+
return;
118+
}
119+
if (const auto& userToken = self->Request_->GetSerializedToken()) {
120+
UserToken = new NACLib::TUserToken(userToken);
121+
}
122+
}
123+
124+
bool CheckAccess(const TString& path, TIntrusivePtr<TSecurityObject> securityObject, ui32 access) {
125+
auto self = static_cast<TDerived*>(this);
126+
if (!UserToken || !securityObject) {
127+
return true;
128+
}
129+
if (securityObject->CheckAccess(access, *UserToken)) {
130+
return true;
131+
}
132+
self->Reply(Ydb::StatusIds::UNAUTHORIZED,
133+
TStringBuilder() << "Access denied"
134+
<< ": for# " << UserToken->GetUserSID()
135+
<< ", path# " << path
136+
<< ", access# " << NACLib::AccessRightsToString(access),
137+
NKikimrIssues::TIssuesIds::ACCESS_DENIED,
138+
self->ActorContext());
139+
return false;
140+
}
141+
142+
private:
143+
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
144+
};
145+
146+
template <typename TDerived, typename TRequest, typename TResultRecord>
147+
class TBSConfigRequestGrpc : public TRpcOperationRequestActor<TDerived, TRequest>
148+
, public TBaseBSConfigRequest<TBSConfigRequestGrpc<TDerived, TRequest, TResultRecord>> {
149+
using TBase = TRpcOperationRequestActor<TDerived, TRequest>;
150+
151+
friend class TBaseBSConfigRequest<TBSConfigRequestGrpc<TDerived, TRequest, TResultRecord>>;
152+
public:
153+
TBSConfigRequestGrpc(IRequestOpCtx* request)
154+
: TBase(request) {}
155+
156+
void Bootstrap(const TActorContext &ctx) {
157+
TBase::Bootstrap(ctx);
158+
this->OnBootstrap();
159+
this->Become(&TBSConfigRequestGrpc::StateFunc);
160+
BSCTabletId = MakeBSControllerID();
161+
CreatePipe();
162+
SendRequest();
163+
}
164+
165+
protected:
166+
STFUNC(StateFunc) {
167+
switch (ev->GetTypeRewrite()) {
168+
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
169+
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
170+
hFunc(TEvBlobStorage::TEvControllerConfigResponse, Handle);
171+
default:
172+
return TBase::StateFuncBase(ev);
173+
}
174+
}
175+
176+
NTabletPipe::TClientConfig GetPipeConfig() {
177+
NTabletPipe::TClientConfig cfg;
178+
cfg.RetryPolicy = {
179+
.RetryLimitCount = 3u
180+
};
181+
return cfg;
182+
}
183+
184+
void CreatePipe() {
185+
BSCPipeClient = this->Register(NTabletPipe::CreateClient(this->SelfId(), BSCTabletId, GetPipeConfig()));
186+
}
187+
188+
void SendRequest() {
189+
auto self = static_cast<TDerived*>(this);
190+
std::unique_ptr<TEvBlobStorage::TEvControllerConfigRequest> req = std::make_unique<TEvBlobStorage::TEvControllerConfigRequest>();
191+
auto &rec = *this->GetProtoRequest();
192+
if (!CopyToConfigRequest(rec, req->Record.MutableRequest())) {
193+
return this->Reply(Ydb::StatusIds::BAD_REQUEST, self->ActorContext());
194+
}
195+
NTabletPipe::SendData(this->SelfId(), BSCPipeClient, req.release(), 0, TBase::Span_.GetTraceId());
196+
}
197+
198+
void Handle(typename TEvBlobStorage::TEvControllerConfigResponse::TPtr &ev) {
199+
auto self = static_cast<TDerived*>(this);
200+
auto status = PullStatus(ev->Get()->Record);
201+
auto ctx = self->ActorContext();
202+
if (status != Ydb::StatusIds::SUCCESS) {
203+
this->Reply(status, ev->Get()->Record.GetResponse().GetErrorDescription(), NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
204+
return;
205+
}
206+
TResultRecord result;
207+
CopyFromConfigResponse(ev->Get()->Record.GetResponse(), &result);
208+
this->ReplyWithResult(status, result, TActivationContext::AsActorContext());
209+
}
210+
211+
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
212+
if (ev->Get()->Status != NKikimrProto::OK) {
213+
this->Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to connect to coordination node.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, this->ActorContext());
214+
}
215+
}
216+
217+
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
218+
this->Reply(Ydb::StatusIds::UNAVAILABLE, "Connection to coordination node was lost.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, this->ActorContext());
219+
}
220+
221+
virtual bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) = 0;
222+
private:
223+
ui64 BSCTabletId = 0;
224+
TActorId BSCPipeClient;
225+
};
226+
227+
} // namespace NKikimr::NGRpcService
228+
229+
template <>
230+
struct THash<NKikimr::NGRpcService::TDriveDeviceSet> {
231+
std::size_t operator()(const NKikimr::NGRpcService::TDriveDeviceSet &deviceSet) const {
232+
return deviceSet.GetHash();
233+
}
234+
};

0 commit comments

Comments
 (0)