Skip to content

Commit ddb9664

Browse files
authored
Implement ImportService.ListObjectsInS3Export on server side (#18454)
1 parent 2ab1e19 commit ddb9664

23 files changed

+1021
-107
lines changed

ydb/core/backup/common/metadata.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "metadata.h"
22

3+
#include <ydb/core/base/path.h>
4+
35
#include <library/cpp/json/json_writer.h>
46
#include <library/cpp/json/json_reader.h>
57

@@ -53,6 +55,31 @@ TMetadata TMetadata::Deserialize(const TString& metadata) {
5355
return result;
5456
}
5557

58+
TString TSchemaMapping::Serialize() const {
59+
TString content;
60+
TStringOutput ss(content);
61+
NJson::TJsonWriter writer(&ss, false);
62+
63+
writer.OpenMap();
64+
writer.WriteKey("exportedObjects");
65+
writer.OpenMap();
66+
for (const auto& item : Items) {
67+
writer.WriteKey(item.ObjectPath);
68+
writer.OpenMap();
69+
writer.Write("exportPrefix", item.ExportPrefix);
70+
if (item.IV) {
71+
writer.Write("iv", item.IV->GetHexString());
72+
}
73+
writer.CloseMap();
74+
}
75+
writer.CloseMap();
76+
writer.CloseMap();
77+
78+
writer.Flush();
79+
ss.Flush();
80+
return content;
81+
}
82+
5683
bool TSchemaMapping::Deserialize(const TString& jsonContent, TString& error) {
5784
NJson::TJsonValue json;
5885
if (!NJson::ReadJsonTree(jsonContent, &json)) {
@@ -97,4 +124,35 @@ bool TSchemaMapping::Deserialize(const TString& jsonContent, TString& error) {
97124
return true;
98125
}
99126

127+
TString NormalizeItemPath(const TString& path) {
128+
TString result = CanonizePath(path);
129+
if (result.size() > 1 && result.front() == '/') {
130+
result.erase(0, 1);
131+
}
132+
return result;
133+
}
134+
135+
TString NormalizeItemPrefix(TString prefix) {
136+
// Cut slshes from the beginning and from the end
137+
size_t toCut = 0;
138+
while (toCut < prefix.size() && prefix[toCut] == '/') {
139+
++toCut;
140+
}
141+
if (toCut) {
142+
prefix.erase(0, toCut);
143+
}
144+
145+
while (prefix && prefix.back() == '/') {
146+
prefix.pop_back();
147+
}
148+
return prefix;
149+
}
150+
151+
TString NormalizeExportPrefix(TString prefix) {
152+
while (prefix && prefix.back() == '/') {
153+
prefix.pop_back();
154+
}
155+
return prefix;
156+
}
157+
100158
}

ydb/core/backup/common/metadata.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ class TMetadata {
5757
TMaybeFail<ui64> Version;
5858
};
5959

60+
TString NormalizeItemPath(const TString& path);
61+
TString NormalizeItemPrefix(TString prefix);
62+
TString NormalizeExportPrefix(TString prefix);
63+
6064
class TSchemaMapping {
6165
public:
6266
struct TItem {
@@ -67,6 +71,7 @@ class TSchemaMapping {
6771

6872
TSchemaMapping() = default;
6973

74+
TString Serialize() const;
7075
bool Deserialize(const TString& jsonContent, TString& error);
7176

7277
public:
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#include "metadata.h"
2+
3+
#include <library/cpp/testing/unittest/registar.h>
4+
5+
namespace NKikimr::NBackup {
6+
7+
Y_UNIT_TEST_SUITE(PathsNormalizationTest) {
8+
Y_UNIT_TEST(NormalizeItemPath) {
9+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPath("/a/b/c/"), "a/b/c");
10+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPath("/"), "");
11+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPath(""), "");
12+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPath("//"), "");
13+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPath("///a///b///"), "a/b");
14+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPath("a/b/c"), "a/b/c");
15+
}
16+
17+
Y_UNIT_TEST(NormalizeItemPrefix) {
18+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPrefix("///a///b///c///"), "a///b///c");
19+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPrefix("a///b///c"), "a///b///c");
20+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPrefix("//"), "");
21+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPrefix("/"), "");
22+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeItemPrefix(""), "");
23+
}
24+
25+
Y_UNIT_TEST(NormalizeExportPrefix) {
26+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeExportPrefix("///a///"), "///a");
27+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeExportPrefix("a/"), "a");
28+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeExportPrefix("a"), "a");
29+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeExportPrefix("/prefix//"), "/prefix");
30+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeExportPrefix(""), "");
31+
UNIT_ASSERT_STRINGS_EQUAL(NormalizeExportPrefix("/prefix"), "/prefix");
32+
}
33+
}
34+
35+
} // namespace NKikimr::NBackup

ydb/core/backup/common/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ UNITTEST_FOR(ydb/core/backup/common)
22

33
SRCS(
44
encryption_ut.cpp
5+
metadata_ut.cpp
56
)
67

78
END()
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
#include "service_import.h"
2+
#include "rpc_deferrable.h"
3+
4+
#include <ydb/core/base/tablet_pipe.h>
5+
#include <ydb/core/grpc_services/base/base.h>
6+
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
7+
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
8+
#include <ydb/public/api/protos/ydb_import.pb.h>
9+
10+
#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_PROXY, "[ListObjectsInS3Export] " << SelfId() << " " << stream)
11+
#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_PROXY, "[ListObjectsInS3Export] " << SelfId() << " " << stream)
12+
#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::TX_PROXY, "[ListObjectsInS3Export] " << SelfId() << " " << stream)
13+
#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::TX_PROXY, "[ListObjectsInS3Export] " << SelfId() << " " << stream)
14+
#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_PROXY, "[ListObjectsInS3Export] " << SelfId() << " " << stream)
15+
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_PROXY, "[ListObjectsInS3Export] " << SelfId() << " " << stream)
16+
17+
namespace NKikimr::NGRpcService {
18+
19+
using TEvListObjectsInS3ExportRequest = TGrpcRequestOperationCall<Ydb::Import::ListObjectsInS3ExportRequest,
20+
Ydb::Import::ListObjectsInS3ExportResponse>;
21+
22+
class TListObjectsInS3ExportRPC: public TRpcOperationRequestActor<TListObjectsInS3ExportRPC, TEvListObjectsInS3ExportRequest> {
23+
public:
24+
using TBase = TRpcOperationRequestActor<TListObjectsInS3ExportRPC, TEvListObjectsInS3ExportRequest>;
25+
using TRpcOperationRequestActor<TListObjectsInS3ExportRPC, TEvListObjectsInS3ExportRequest>::TRpcOperationRequestActor;
26+
27+
explicit TListObjectsInS3ExportRPC(IRequestOpCtx* request)
28+
: TBase(request)
29+
, UserToken(CreateUserToken(request))
30+
{
31+
}
32+
33+
STATEFN(StateFunc) {
34+
switch (ev->GetTypeRewrite()) {
35+
hFunc(NKikimr::NSchemeShard::TEvImport::TEvListObjectsInS3ExportResponse, Handle);
36+
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
37+
38+
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
39+
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
40+
default:
41+
return StateFuncBase(ev);
42+
}
43+
}
44+
45+
void Bootstrap() {
46+
if (!Request_ || !Request_->GetDatabaseName()) {
47+
return Reply(Ydb::StatusIds::BAD_REQUEST, "Database name is not specified", NKikimrIssues::TIssuesIds::YDB_API_VALIDATION_ERROR, NActors::TActivationContext::AsActorContext());
48+
}
49+
50+
ResolveDatabase();
51+
52+
Become(&TListObjectsInS3ExportRPC::StateFunc);
53+
}
54+
55+
void ResolveDatabase() {
56+
LOG_D("Resolve database"
57+
<< ": name# " << Request_->GetDatabaseName());
58+
59+
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
60+
request->DatabaseName = *Request_->GetDatabaseName();
61+
62+
auto& entry = request->ResultSet.emplace_back();
63+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
64+
entry.Path = NKikimr::SplitPath(*Request_->GetDatabaseName());
65+
66+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
67+
}
68+
69+
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
70+
const auto& request = ev->Get()->Request;
71+
72+
LOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult"
73+
<< ": request# " << (request ? request->ToString(*AppData()->TypeRegistry) : "nullptr"));
74+
75+
if (request->ResultSet.empty()) {
76+
return Reply(Ydb::StatusIds::SCHEME_ERROR, "Scheme error", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, NActors::TActivationContext::AsActorContext());
77+
}
78+
79+
const auto& entry = request->ResultSet.front();
80+
81+
if (request->ErrorCount > 0) {
82+
switch (entry.Status) {
83+
case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
84+
break;
85+
case NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied:
86+
return Reply(Ydb::StatusIds::UNAUTHORIZED, "Access denied", NKikimrIssues::TIssuesIds::ACCESS_DENIED, NActors::TActivationContext::AsActorContext());
87+
case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
88+
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
89+
return Reply(Ydb::StatusIds::SCHEME_ERROR, "Unknown database", NKikimrIssues::TIssuesIds::PATH_NOT_EXIST, NActors::TActivationContext::AsActorContext());
90+
case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError:
91+
case NSchemeCache::TSchemeCacheNavigate::EStatus::RedirectLookupError:
92+
return Reply(Ydb::StatusIds::UNAVAILABLE, "Database lookup error", NKikimrIssues::TIssuesIds::RESOLVE_LOOKUP_ERROR, NActors::TActivationContext::AsActorContext());
93+
default:
94+
return Reply(Ydb::StatusIds::SCHEME_ERROR, "Scheme error", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, NActors::TActivationContext::AsActorContext());
95+
}
96+
}
97+
98+
if (!this->CheckDatabaseAccess(CanonizePath(entry.Path), entry.SecurityObject)) {
99+
return;
100+
}
101+
102+
auto domainInfo = entry.DomainInfo;
103+
if (!domainInfo) {
104+
LOG_E("Got empty domain info");
105+
return Reply(Ydb::StatusIds::INTERNAL_ERROR, "Internal error", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, NActors::TActivationContext::AsActorContext());
106+
}
107+
108+
SchemeShardId = domainInfo->ExtractSchemeShard();
109+
SendRequestToSchemeShard();
110+
}
111+
112+
bool CheckDatabaseAccess(const TString& path, TIntrusivePtr<TSecurityObject> securityObject) {
113+
const ui32 access = NACLib::DescribeSchema;
114+
115+
if (!UserToken || !securityObject) {
116+
return true;
117+
}
118+
119+
if (securityObject->CheckAccess(access, *UserToken)) {
120+
return true;
121+
}
122+
123+
Reply(Ydb::StatusIds::UNAUTHORIZED,
124+
TStringBuilder() << "Access denied"
125+
<< ": for# " << UserToken->GetUserSID()
126+
<< ", path# " << path
127+
<< ", access# " << NACLib::AccessRightsToString(access),
128+
NKikimrIssues::TIssuesIds::ACCESS_DENIED,
129+
NActors::TActivationContext::AsActorContext());
130+
return false;
131+
}
132+
133+
void SendRequestToSchemeShard() {
134+
LOG_D("Send request: schemeShardId# " << SchemeShardId);
135+
136+
if (!PipeClient) {
137+
NTabletPipe::TClientConfig config;
138+
config.RetryPolicy = {.RetryLimitCount = 3};
139+
PipeClient = this->RegisterWithSameMailbox(NTabletPipe::CreateClient(this->SelfId(), SchemeShardId, config));
140+
}
141+
142+
auto request = MakeHolder<NSchemeShard::TEvImport::TEvListObjectsInS3ExportRequest>();
143+
144+
*request->Record.MutableOperationParams() = GetProtoRequest()->operation_params();
145+
*request->Record.MutableSettings() = GetProtoRequest()->settings();
146+
request->Record.SetPageSize(GetProtoRequest()->page_size());
147+
request->Record.SetPageToken(GetProtoRequest()->page_token());
148+
149+
NTabletPipe::SendData(this->SelfId(), PipeClient, std::move(request), 0, Span_.GetTraceId());
150+
}
151+
152+
void Handle(NKikimr::NSchemeShard::TEvImport::TEvListObjectsInS3ExportResponse::TPtr& ev) {
153+
const auto& record = ev->Get()->Record;
154+
155+
LOG_D("Handle TListObjectsInS3ExportRPC::TEvListObjectsInS3ExportResponse"
156+
<< ": record# " << record.ShortDebugString());
157+
158+
if (record.GetStatus() != Ydb::StatusIds::SUCCESS) {
159+
return Reply(record.GetStatus(), record.GetIssues(), NActors::TActivationContext::AsActorContext());
160+
} else {
161+
return ReplyWithResult(record.GetStatus(), record.GetIssues(), record.GetResult(), NActors::TActivationContext::AsActorContext());
162+
}
163+
}
164+
165+
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
166+
if (ev->Get()->Status != NKikimrProto::OK) {
167+
DeliveryProblem();
168+
}
169+
}
170+
171+
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
172+
DeliveryProblem();
173+
}
174+
175+
void DeliveryProblem() {
176+
LOG_W("Delivery problem");
177+
Reply(Ydb::StatusIds::UNAVAILABLE, "Delivery problem", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, NActors::TActivationContext::AsActorContext());
178+
}
179+
180+
void PassAway() override {
181+
NTabletPipe::CloseClient(this->SelfId(), PipeClient);
182+
TBase::PassAway();
183+
}
184+
185+
static THolder<const NACLib::TUserToken> CreateUserToken(IRequestOpCtx* request) {
186+
if (const auto& userToken = request->GetSerializedToken()) {
187+
return MakeHolder<NACLib::TUserToken>(userToken);
188+
} else {
189+
return {};
190+
}
191+
}
192+
193+
private:
194+
ui64 SchemeShardId = 0;
195+
TActorId PipeClient;
196+
const THolder<const NACLib::TUserToken> UserToken;
197+
};
198+
199+
void DoListObjectsInS3ExportRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
200+
f.RegisterActor(new TListObjectsInS3ExportRPC(p.release()));
201+
}
202+
203+
} // namespace NKikimr::NGRpcService

ydb/core/grpc_services/service_import.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class IRequestOpCtx;
99
class IFacilityProvider;
1010

1111
void DoImportFromS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
12+
void DoListObjectsInS3ExportRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1213
void DoImportDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1314

1415
}

ydb/core/grpc_services/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ SRCS(
6060
rpc_kh_describe.cpp
6161
rpc_kh_snapshots.cpp
6262
rpc_kqp_base.cpp
63+
rpc_list_objects_in_s3_export.cpp
6364
rpc_list_operations.cpp
6465
rpc_load_rows.cpp
6566
rpc_log_store.cpp

ydb/core/protos/import.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,16 @@ message TListImportsResponse {
129129
message TEvListImportsResponse {
130130
optional TListImportsResponse Response = 1;
131131
}
132+
133+
message TEvListObjectsInS3ExportRequest {
134+
optional Ydb.Operations.OperationParams OperationParams = 1;
135+
optional Ydb.Import.ListObjectsInS3ExportSettings Settings = 2;
136+
optional int64 PageSize = 3;
137+
optional string PageToken = 4;
138+
}
139+
140+
message TEvListObjectsInS3ExportResponse {
141+
optional Ydb.StatusIds.StatusCode Status = 1;
142+
repeated Ydb.Issue.IssueMessage Issues = 2;
143+
optional Ydb.Import.ListObjectsInS3ExportResult Result = 3;
144+
}

0 commit comments

Comments
 (0)