Skip to content

Commit 0036ea2

Browse files
committed
TImportClient::ListObjectsInS3Export in C++ SDK (#18366)
1 parent 4042262 commit 0036ea2

File tree

5 files changed

+253
-27
lines changed

5 files changed

+253
-27
lines changed

include/ydb-cpp-sdk/client/import/import.h

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55

66
#include <ydb-cpp-sdk/client/types/s3_settings.h>
77

8+
namespace Ydb::Import {
9+
class ListObjectsInS3ExportResult;
10+
}
11+
812
namespace NYdb::inline V3 {
913
namespace NImport {
1014

@@ -77,6 +81,49 @@ class TImportFromS3Response : public TOperation {
7781
TMetadata Metadata_;
7882
};
7983

84+
using TAsyncImportFromS3Response = NThreading::TFuture<TImportFromS3Response>;
85+
86+
struct TListObjectsInS3ExportSettings : public TOperationRequestSettings<TListObjectsInS3ExportSettings>,
87+
public TS3Settings<TListObjectsInS3ExportSettings> {
88+
using TSelf = TListObjectsInS3ExportSettings;
89+
90+
struct TItem {
91+
// Database object path.
92+
std::string Path = {};
93+
};
94+
95+
FLUENT_SETTING_VECTOR(TItem, Item);
96+
FLUENT_SETTING_OPTIONAL(uint32_t, NumberOfRetries);
97+
FLUENT_SETTING_OPTIONAL(std::string, Prefix);
98+
FLUENT_SETTING_OPTIONAL(std::string, SymmetricKey);
99+
};
100+
101+
class TListObjectsInS3ExportResult : public TStatus {
102+
public:
103+
struct TItem {
104+
// S3 object prefix
105+
std::string Prefix;
106+
107+
// Database object path
108+
std::string Path;
109+
110+
void Out(IOutputStream& out) const;
111+
};
112+
113+
TListObjectsInS3ExportResult(TStatus&& status, const ::Ydb::Import::ListObjectsInS3ExportResult& proto);
114+
115+
const std::vector<TItem>& GetItems() const;
116+
const std::string& NextPageToken() const { return NextPageToken_; }
117+
118+
void Out(IOutputStream& out) const;
119+
120+
private:
121+
std::vector<TItem> Items_;
122+
std::string NextPageToken_;
123+
};
124+
125+
using TAsyncListObjectsInS3ExportResult = NThreading::TFuture<TListObjectsInS3ExportResult>;
126+
80127
/// Data
81128
struct TImportYdbDumpDataSettings : public TOperationRequestSettings<TImportYdbDumpDataSettings> {
82129
using TSelf = TImportYdbDumpDataSettings;
@@ -99,7 +146,9 @@ class TImportClient {
99146
public:
100147
TImportClient(const TDriver& driver, const TCommonClientSettings& settings = TCommonClientSettings());
101148

102-
NThreading::TFuture<TImportFromS3Response> ImportFromS3(const TImportFromS3Settings& settings);
149+
TAsyncImportFromS3Response ImportFromS3(const TImportFromS3Settings& settings);
150+
151+
TAsyncListObjectsInS3ExportResult ListObjectsInS3Export(const TListObjectsInS3ExportSettings& settings, std::int64_t pageSize = 0, const std::string& pageToken = {});
103152

104153
// ydb dump format
105154
TAsyncImportDataResult ImportData(const std::string& table, std::string&& data, const TImportYdbDumpDataSettings& settings);
@@ -111,8 +160,3 @@ class TImportClient {
111160

112161
} // namespace NImport
113162
} // namespace NYdb
114-
115-
template<>
116-
inline void Out<NYdb::NImport::TImportFromS3Response>(IOutputStream& o, const NYdb::NImport::TImportFromS3Response& x) {
117-
return x.Out(o);
118-
}

src/api/grpc/ydb_import_v1.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ service ImportService {
1111
// Method starts an asynchronous operation that can be cancelled while it is in progress.
1212
rpc ImportFromS3(Import.ImportFromS3Request) returns (Import.ImportFromS3Response);
1313

14+
// List objects from existing export stored in S3 bucket
15+
rpc ListObjectsInS3Export(Import.ListObjectsInS3ExportRequest) returns (Import.ListObjectsInS3ExportResponse);
16+
1417
// Writes data to a table.
1518
// Method accepts serialized data in the selected format and writes it non-transactionally.
1619
rpc ImportData(Import.ImportDataRequest) returns (Import.ImportDataResponse);

src/api/protos/ydb_import.proto

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,87 @@ message ImportFromS3Response {
121121
Ydb.Operations.Operation operation = 1;
122122
}
123123

124+
message ListObjectsInS3ExportSettings {
125+
message Item {
126+
// Database object path
127+
// Recursive for directories
128+
string path = 1;
129+
}
130+
131+
string endpoint = 1 [(required) = true];
132+
ImportFromS3Settings.Scheme scheme = 2; // HTTPS if not specified
133+
string bucket = 3 [(required) = true];
134+
string access_key = 4 [(required) = true];
135+
string secret_key = 5 [(required) = true];
136+
repeated Item items = 6;
137+
uint32 number_of_retries = 7;
138+
139+
// Region to use in requests
140+
string region = 8;
141+
142+
// disables virtual hosting style buckets aws s3 feature
143+
// it changes the way bucket appended to url. e.g. https//bucket_name.example.com/ vs https://example.com/bucket_name
144+
// details: https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html
145+
// it is especially useful for custom s3 implementations
146+
bool disable_virtual_addressing = 9;
147+
148+
// A default path prefix for all items,
149+
// determines that the import works with the list of objects in the SchemaMapping file.
150+
// Must be provided for encrypted exports.
151+
string prefix = 10;
152+
153+
// Settings how data is encrypted.
154+
// If encryption_settings field is not specified,
155+
// the resulting data is considered not encrypted.
156+
Ydb.Export.EncryptionSettings encryption_settings = 11;
157+
}
158+
159+
message ListObjectsInS3ExportResult {
160+
message Item {
161+
/* YDB database objects in S3 are stored in one or more S3 objects (see ydb_export.proto).
162+
The S3 object name begins with a prefix, followed by:
163+
* '/data_PartNumber', where 'PartNumber' represents the index of the part, starting at zero;
164+
* '/scheme.pb' - object with information about scheme, indexes, etc;
165+
* '/permissions.pb' - object with information about ACL and owner.
166+
*/
167+
168+
// S3 object prefix
169+
string prefix = 1;
170+
171+
// Database object path
172+
string path = 2;
173+
}
174+
175+
repeated Item items = 1;
176+
177+
// This token allows you to get the next page of results for ListObjectsInS3Export requests,
178+
// if the number of results is larger than `page_size` specified in the request.
179+
// To get the next page, specify the value of `next_page_token` as a value for
180+
// the `page_token` parameter in the next ListObjectsInS3Export request. Subsequent ListObjectsInS3Export
181+
// requests will have their own `next_page_token` to continue paging through the results.
182+
string next_page_token = 2;
183+
}
184+
185+
message ListObjectsInS3ExportRequest {
186+
Ydb.Operations.OperationParams operation_params = 1;
187+
ListObjectsInS3ExportSettings settings = 2 [(required) = true];
188+
189+
// The maximum number of results per page that should be returned. If the number of available
190+
// results is larger than `page_size`, the service returns a `next_page_token` that can be used
191+
// to get the next page of results in subsequent ListObjectsInS3Export requests.
192+
// 0 means that server returns all objects.
193+
int64 page_size = 3 [(value) = "<= 10000"];
194+
195+
// Page token. Set `page_token` to the `next_page_token` returned by a previous ListObjectsInS3Export
196+
// request to get the next page of results.
197+
string page_token = 4;
198+
}
199+
200+
message ListObjectsInS3ExportResponse {
201+
// operation.result = ListObjectsInS3ExportResult
202+
Ydb.Operations.Operation operation = 1;
203+
}
204+
124205
/// Data
125206
message YdbDumpFormat {
126207
repeated string columns = 1;

src/client/import/import.cpp

Lines changed: 107 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <src/client/common_client/impl/client.h>
1111
#include <ydb-cpp-sdk/client/proto/accessor.h>
1212

13+
#include <util/string/join.h>
14+
1315
namespace NYdb::inline V3 {
1416
namespace NImport {
1517

@@ -34,6 +36,25 @@ std::vector<TImportItemProgress> ItemsProgressFromProto(const google::protobuf::
3436
return result;
3537
}
3638

39+
template <class TS3SettingsProto, class TSettings>
40+
void FillS3Settings(TS3SettingsProto& proto, const TSettings& settings) {
41+
proto.set_endpoint(TStringType{settings.Endpoint_});
42+
proto.set_scheme(TProtoAccessor::GetProto<ImportFromS3Settings>(settings.Scheme_));
43+
proto.set_bucket(TStringType{settings.Bucket_});
44+
proto.set_access_key(TStringType{settings.AccessKey_});
45+
proto.set_secret_key(TStringType{settings.SecretKey_});
46+
47+
if (settings.NumberOfRetries_) {
48+
proto.set_number_of_retries(settings.NumberOfRetries_.value());
49+
}
50+
51+
if (settings.SymmetricKey_) {
52+
proto.mutable_encryption_settings()->mutable_symmetric_key()->set_key(*settings.SymmetricKey_);
53+
}
54+
55+
proto.set_disable_virtual_addressing(!settings.UseVirtualAddressing_);
56+
}
57+
3758
} // anonymous
3859

3960
/// S3
@@ -66,6 +87,36 @@ const TImportFromS3Response::TMetadata& TImportFromS3Response::Metadata() const
6687
return Metadata_;
6788
}
6889

90+
TListObjectsInS3ExportResult::TListObjectsInS3ExportResult(TStatus&& status, const ::Ydb::Import::ListObjectsInS3ExportResult& proto)
91+
: TStatus(std::move(status))
92+
{
93+
Items_.reserve(proto.items_size());
94+
for (const auto& item : proto.items()) {
95+
Items_.emplace_back(TItem{
96+
.Prefix = item.prefix(),
97+
.Path = item.path()
98+
});
99+
}
100+
NextPageToken_ = proto.next_page_token();
101+
}
102+
103+
const std::vector<TListObjectsInS3ExportResult::TItem>& TListObjectsInS3ExportResult::GetItems() const {
104+
return Items_;
105+
}
106+
107+
void TListObjectsInS3ExportResult::Out(IOutputStream& out) const {
108+
if (IsSuccess()) {
109+
out << "{ items: [" << JoinSeq(", ", Items_) << "], next_page_token: \"" << NextPageToken_ << "\" }";
110+
} else {
111+
return TStatus::Out(out);
112+
}
113+
}
114+
115+
void TListObjectsInS3ExportResult::TItem::Out(IOutputStream& out) const {
116+
out << "{ prefix: \"" << Prefix << "\""
117+
<< ", path: \"" << Path << "\" }";
118+
}
119+
69120
/// Data
70121
TImportDataResult::TImportDataResult(TStatus&& status)
71122
: TStatus(std::move(status))
@@ -80,13 +131,37 @@ class TImportClient::TImpl : public TClientImplCommon<TImportClient::TImpl> {
80131
{
81132
}
82133

83-
TFuture<TImportFromS3Response> ImportFromS3(ImportFromS3Request&& request, const TImportFromS3Settings& settings) {
134+
TAsyncImportFromS3Response ImportFromS3(ImportFromS3Request&& request, const TImportFromS3Settings& settings) {
84135
return RunOperation<V1::ImportService, ImportFromS3Request, ImportFromS3Response, TImportFromS3Response>(
85136
std::move(request),
86137
&V1::ImportService::Stub::AsyncImportFromS3,
87138
TRpcRequestSettings::Make(settings));
88139
}
89140

141+
TAsyncListObjectsInS3ExportResult ListObjectsInS3Export(ListObjectsInS3ExportRequest&& request, const TListObjectsInS3ExportSettings& settings) {
142+
auto promise = NThreading::NewPromise<TListObjectsInS3ExportResult>();
143+
144+
auto extractor = [promise]
145+
(google::protobuf::Any* any, TPlainStatus status) mutable {
146+
ListObjectsInS3ExportResult result;
147+
if (any) {
148+
any->UnpackTo(&result);
149+
}
150+
151+
promise.SetValue(TListObjectsInS3ExportResult(TStatus(std::move(status)), result));
152+
};
153+
154+
Connections_->RunDeferred<V1::ImportService, ListObjectsInS3ExportRequest, ListObjectsInS3ExportResponse>(
155+
std::move(request),
156+
extractor,
157+
&V1::ImportService::Stub::AsyncListObjectsInS3Export,
158+
DbDriverState_,
159+
INITIAL_DEFERRED_CALL_DELAY,
160+
TRpcRequestSettings::Make(settings));
161+
162+
return promise.GetFuture();
163+
}
164+
90165
template <typename TSettings>
91166
TAsyncImportDataResult ImportData(ImportDataRequest&& request, const TSettings& settings) {
92167
auto promise = NThreading::NewPromise<TImportDataResult>();
@@ -131,22 +206,18 @@ TImportClient::TImportClient(const TDriver& driver, const TCommonClientSettings&
131206
{
132207
}
133208

134-
TFuture<TImportFromS3Response> TImportClient::ImportFromS3(const TImportFromS3Settings& settings) {
209+
TAsyncImportFromS3Response TImportClient::ImportFromS3(const TImportFromS3Settings& settings) {
135210
auto request = MakeOperationRequest<ImportFromS3Request>(settings);
136-
137-
request.mutable_settings()->set_endpoint(TStringType{settings.Endpoint_});
138-
request.mutable_settings()->set_scheme(TProtoAccessor::GetProto<ImportFromS3Settings>(settings.Scheme_));
139-
request.mutable_settings()->set_bucket(TStringType{settings.Bucket_});
140-
request.mutable_settings()->set_access_key(TStringType{settings.AccessKey_});
141-
request.mutable_settings()->set_secret_key(TStringType{settings.SecretKey_});
211+
Ydb::Import::ImportFromS3Settings& settingsProto = *request.mutable_settings();
212+
FillS3Settings(settingsProto, settings);
142213

143214
for (const auto& item : settings.Item_) {
144215
if (!item.Src.empty() && !item.SrcPath.empty()) {
145216
throw TContractViolation(
146217
TStringBuilder() << "Invalid item: both source prefix and source path are set: \"" << item.Src << "\" and \"" << item.SrcPath << "\"");
147218
}
148219

149-
auto& protoItem = *request.mutable_settings()->mutable_items()->Add();
220+
auto& protoItem = *settingsProto.mutable_items()->Add();
150221
if (!item.Src.empty()) {
151222
protoItem.set_source_prefix(item.Src);
152223
}
@@ -157,32 +228,47 @@ TFuture<TImportFromS3Response> TImportClient::ImportFromS3(const TImportFromS3Se
157228
}
158229

159230
if (settings.Description_) {
160-
request.mutable_settings()->set_description(TStringType{settings.Description_.value()});
161-
}
162-
163-
if (settings.NumberOfRetries_) {
164-
request.mutable_settings()->set_number_of_retries(settings.NumberOfRetries_.value());
231+
settingsProto.set_description(TStringType{settings.Description_.value()});
165232
}
166233

167234
if (settings.NoACL_) {
168-
request.mutable_settings()->set_no_acl(settings.NoACL_.value());
235+
settingsProto.set_no_acl(settings.NoACL_.value());
169236
}
170237

171238
if (settings.SourcePrefix_) {
172-
request.mutable_settings()->set_source_prefix(settings.SourcePrefix_.value());
239+
settingsProto.set_source_prefix(settings.SourcePrefix_.value());
173240
}
174241

175242
if (settings.DestinationPath_) {
176-
request.mutable_settings()->set_destination_path(settings.DestinationPath_.value());
243+
settingsProto.set_destination_path(settings.DestinationPath_.value());
177244
}
178245

179-
if (settings.SymmetricKey_) {
180-
request.mutable_settings()->mutable_encryption_settings()->mutable_symmetric_key()->set_key(*settings.SymmetricKey_);
246+
return Impl_->ImportFromS3(std::move(request), settings);
247+
}
248+
249+
TAsyncListObjectsInS3ExportResult TImportClient::ListObjectsInS3Export(const TListObjectsInS3ExportSettings& settings, std::int64_t pageSize, const std::string& pageToken) {
250+
auto request = MakeOperationRequest<ListObjectsInS3ExportRequest>(settings);
251+
Ydb::Import::ListObjectsInS3ExportSettings& settingsProto = *request.mutable_settings();
252+
FillS3Settings(settingsProto, settings);
253+
254+
if (settings.Prefix_) {
255+
settingsProto.set_prefix(settings.Prefix_.value());
181256
}
182257

183-
request.mutable_settings()->set_disable_virtual_addressing(!settings.UseVirtualAddressing_);
258+
for (const auto& item : settings.Item_) {
259+
if (item.Path.empty()) {
260+
throw TContractViolation(
261+
TStringBuilder() << "Invalid item: path is not set");
262+
}
184263

185-
return Impl_->ImportFromS3(std::move(request), settings);
264+
settingsProto.add_items()->set_path(item.Path);
265+
}
266+
267+
// Paging
268+
request.set_page_size(pageSize);
269+
request.set_page_token(pageToken);
270+
271+
return Impl_->ListObjectsInS3Export(std::move(request), settings);
186272
}
187273

188274
TAsyncImportDataResult TImportClient::ImportData(const std::string& table, std::string&& data, const TImportYdbDumpDataSettings& settings) {

src/client/import/out.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,15 @@
33
Y_DECLARE_OUT_SPEC(, NYdb::NImport::TImportDataResult, o, x) {
44
return x.Out(o);
55
}
6+
7+
Y_DECLARE_OUT_SPEC(, NYdb::NImport::TImportFromS3Response, o, x) {
8+
return x.Out(o);
9+
}
10+
11+
Y_DECLARE_OUT_SPEC(, NYdb::NImport::TListObjectsInS3ExportResult, o, x) {
12+
return x.Out(o);
13+
}
14+
15+
Y_DECLARE_OUT_SPEC(, NYdb::NImport::TListObjectsInS3ExportResult::TItem, o, x) {
16+
return x.Out(o);
17+
}

0 commit comments

Comments
 (0)