Skip to content

Commit 439d011

Browse files
authored
Safe operation convertations (#13409)
1 parent 40f57dc commit 439d011

File tree

4 files changed

+106
-81
lines changed

4 files changed

+106
-81
lines changed

ydb/core/grpc_services/rpc_export_base.h

Lines changed: 12 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#pragma once
22

3+
#include "rpc_operation_conv_base.h"
4+
35
#include <ydb/core/protos/export.pb.h>
4-
#include <ydb/public/api/protos/ydb_operation.pb.h>
56
#include <ydb/public/api/protos/ydb_export.pb.h>
67
#include <ydb/public/lib/operation_id/operation_id.h>
78

@@ -10,7 +11,7 @@
1011
namespace NKikimr {
1112
namespace NGRpcService {
1213

13-
struct TExportConv {
14+
struct TExportConv: public TOperationConv<NKikimrExport::TExport> {
1415
static Ydb::TOperationId MakeOperationId(const ui64 id, NKikimrExport::TExport::SettingsCase kind) {
1516
Ydb::TOperationId operationId;
1617
operationId.SetKind(Ydb::TOperationId::EXPORT);
@@ -31,38 +32,24 @@ struct TExportConv {
3132
return operationId;
3233
}
3334

34-
static Ydb::Operations::Operation ToOperation(const NKikimrExport::TExport& exprt) {
35-
Ydb::Operations::Operation operation;
35+
static Operation ToOperation(const NKikimrExport::TExport& in) {
36+
auto operation = TOperationConv::ToOperation(in);
3637

37-
operation.set_id(NOperationId::ProtoToString(MakeOperationId(exprt.GetId(), exprt.GetSettingsCase())));
38-
operation.set_status(exprt.GetStatus());
3938
if (operation.status() == Ydb::StatusIds::SUCCESS) {
40-
operation.set_ready(exprt.GetProgress() == Ydb::Export::ExportProgress::PROGRESS_DONE);
41-
} else {
42-
operation.set_ready(true);
43-
}
44-
if (exprt.IssuesSize()) {
45-
operation.mutable_issues()->CopyFrom(exprt.GetIssues());
39+
operation.set_ready(in.GetProgress() == Ydb::Export::ExportProgress::PROGRESS_DONE);
40+
} else if (operation.status() != Ydb::StatusIds::CANCELLED) {
41+
return operation;
4642
}
4743

48-
if (exprt.HasStartTime()) {
49-
*operation.mutable_create_time() = exprt.GetStartTime();
50-
}
51-
if (exprt.HasEndTime()) {
52-
*operation.mutable_end_time() = exprt.GetEndTime();
53-
}
54-
55-
if (exprt.HasUserSID()) {
56-
operation.set_created_by(exprt.GetUserSID());
57-
}
44+
operation.set_id(NOperationId::ProtoToString(MakeOperationId(in.GetId(), in.GetSettingsCase())));
5845

5946
using namespace Ydb::Export;
60-
switch (exprt.GetSettingsCase()) {
47+
switch (in.GetSettingsCase()) {
6148
case NKikimrExport::TExport::kExportToYtSettings:
62-
Fill<ExportToYtMetadata, ExportToYtResult>(operation, exprt, exprt.GetExportToYtSettings());
49+
Fill<ExportToYtMetadata, ExportToYtResult>(operation, in, in.GetExportToYtSettings());
6350
break;
6451
case NKikimrExport::TExport::kExportToS3Settings:
65-
Fill<ExportToS3Metadata, ExportToS3Result>(operation, exprt, exprt.GetExportToS3Settings());
52+
Fill<ExportToS3Metadata, ExportToS3Result>(operation, in, in.GetExportToS3Settings());
6653
break;
6754
default:
6855
Y_DEBUG_ABORT("Unknown export kind");
@@ -72,22 +59,6 @@ struct TExportConv {
7259
return operation;
7360
}
7461

75-
private:
76-
template <typename TMetadata, typename TResult, typename TSettings>
77-
static void Fill(
78-
Ydb::Operations::Operation& operation,
79-
const NKikimrExport::TExport& exprt,
80-
const TSettings& settings) {
81-
TMetadata metadata;
82-
metadata.mutable_settings()->CopyFrom(settings);
83-
metadata.set_progress(exprt.GetProgress());
84-
metadata.mutable_items_progress()->CopyFrom(exprt.GetItemsProgress());
85-
operation.mutable_metadata()->PackFrom(metadata);
86-
87-
TResult result;
88-
operation.mutable_result()->PackFrom(result);
89-
}
90-
9162
}; // TExportConv
9263

9364
} // namespace NGRpcService

ydb/core/grpc_services/rpc_import_base.h

Lines changed: 11 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#pragma once
22

3+
#include "rpc_operation_conv_base.h"
4+
35
#include <ydb/core/protos/import.pb.h>
4-
#include <ydb/public/api/protos/ydb_operation.pb.h>
56
#include <ydb/public/api/protos/ydb_import.pb.h>
67
#include <ydb/public/lib/operation_id/operation_id.h>
78

@@ -10,7 +11,7 @@
1011
namespace NKikimr {
1112
namespace NGRpcService {
1213

13-
struct TImportConv {
14+
struct TImportConv: public TOperationConv<NKikimrImport::TImport> {
1415
static Ydb::TOperationId MakeOperationId(const ui64 id, NKikimrImport::TImport::SettingsCase kind) {
1516
Ydb::TOperationId operationId;
1617
operationId.SetKind(Ydb::TOperationId::IMPORT);
@@ -28,35 +29,21 @@ struct TImportConv {
2829
return operationId;
2930
}
3031

31-
static Ydb::Operations::Operation ToOperation(const NKikimrImport::TImport& import) {
32-
Ydb::Operations::Operation operation;
32+
static Operation ToOperation(const NKikimrImport::TImport& in) {
33+
auto operation = TOperationConv::ToOperation(in);
3334

34-
operation.set_id(NOperationId::ProtoToString(MakeOperationId(import.GetId(), import.GetSettingsCase())));
35-
operation.set_status(import.GetStatus());
3635
if (operation.status() == Ydb::StatusIds::SUCCESS) {
37-
operation.set_ready(import.GetProgress() == Ydb::Import::ImportProgress::PROGRESS_DONE);
38-
} else {
39-
operation.set_ready(true);
40-
}
41-
if (import.IssuesSize()) {
42-
operation.mutable_issues()->CopyFrom(import.GetIssues());
36+
operation.set_ready(in.GetProgress() == Ydb::Import::ImportProgress::PROGRESS_DONE);
37+
} else if (operation.status() != Ydb::StatusIds::CANCELLED) {
38+
return operation;
4339
}
4440

45-
if (import.HasStartTime()) {
46-
*operation.mutable_create_time() = import.GetStartTime();
47-
}
48-
if (import.HasEndTime()) {
49-
*operation.mutable_end_time() = import.GetEndTime();
50-
}
51-
52-
if (import.HasUserSID()) {
53-
operation.set_created_by(import.GetUserSID());
54-
}
41+
operation.set_id(NOperationId::ProtoToString(MakeOperationId(in.GetId(), in.GetSettingsCase())));
5542

5643
using namespace Ydb::Import;
57-
switch (import.GetSettingsCase()) {
44+
switch (in.GetSettingsCase()) {
5845
case NKikimrImport::TImport::kImportFromS3Settings:
59-
Fill<ImportFromS3Metadata, ImportFromS3Result>(operation, import, import.GetImportFromS3Settings());
46+
Fill<ImportFromS3Metadata, ImportFromS3Result>(operation, in, in.GetImportFromS3Settings());
6047
break;
6148
default:
6249
Y_DEBUG_ABORT("Unknown import kind");
@@ -66,22 +53,6 @@ struct TImportConv {
6653
return operation;
6754
}
6855

69-
private:
70-
template <typename TMetadata, typename TResult, typename TSettings>
71-
static void Fill(
72-
Ydb::Operations::Operation& operation,
73-
const NKikimrImport::TImport& import,
74-
const TSettings& settings) {
75-
TMetadata metadata;
76-
metadata.mutable_settings()->CopyFrom(settings);
77-
metadata.set_progress(import.GetProgress());
78-
metadata.mutable_items_progress()->CopyFrom(import.GetItemsProgress());
79-
operation.mutable_metadata()->PackFrom(metadata);
80-
81-
TResult result;
82-
operation.mutable_result()->PackFrom(result);
83-
}
84-
8556
}; // TImportConv
8657

8758
} // namespace NGRpcService
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#pragma once
2+
3+
#include <ydb/public/api/protos/ydb_operation.pb.h>
4+
5+
namespace NKikimr::NGRpcService {
6+
7+
template <typename TOperation>
8+
struct TOperationConv {
9+
using Operation = Ydb::Operations::Operation;
10+
11+
static Operation ToOperation(const TOperation& in) {
12+
Operation operation;
13+
14+
operation.set_ready(true);
15+
operation.set_status(in.GetStatus());
16+
17+
if (in.IssuesSize()) {
18+
operation.mutable_issues()->CopyFrom(in.GetIssues());
19+
}
20+
21+
if (in.HasStartTime()) {
22+
*operation.mutable_create_time() = in.GetStartTime();
23+
}
24+
25+
if (in.HasEndTime()) {
26+
*operation.mutable_end_time() = in.GetEndTime();
27+
}
28+
29+
if (in.HasUserSID()) {
30+
operation.set_created_by(in.GetUserSID());
31+
}
32+
33+
return operation;
34+
}
35+
36+
protected:
37+
template <typename TMetadata, typename TResult, typename TSettings>
38+
static void Fill(Operation& out, const TOperation& in, const TSettings& settings) {
39+
TMetadata metadata;
40+
metadata.mutable_settings()->CopyFrom(settings);
41+
metadata.set_progress(in.GetProgress());
42+
metadata.mutable_items_progress()->CopyFrom(in.GetItemsProgress());
43+
out.mutable_metadata()->PackFrom(metadata);
44+
45+
TResult result;
46+
out.mutable_result()->PackFrom(result);
47+
}
48+
49+
}; // TOperationConv
50+
51+
}

ydb/services/ydb/ydb_import_ut.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,36 @@ Y_UNIT_TEST_SUITE(YdbImport) {
127127
}
128128
}
129129

130+
Y_UNIT_TEST(ImportFromS3ToExistingTable) {
131+
TKikimrWithGrpcAndRootSchema server;
132+
auto driver = TDriver(TDriverConfig().SetEndpoint(TStringBuilder()
133+
<< "localhost:" << server.GetPort()));
134+
135+
{
136+
NYdb::NTable::TTableClient client(driver);
137+
auto session = client.GetSession().ExtractValueSync().GetSession();
138+
139+
auto builder = NYdb::NTable::TTableBuilder()
140+
.AddNullableColumn("Key", EPrimitiveType::Uint64)
141+
.AddNullableColumn("Value", EPrimitiveType::String)
142+
.SetPrimaryKeyColumn("Key");
143+
144+
auto result = session.CreateTable("/Root/Table", builder.Build()).ExtractValueSync();
145+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
146+
}
147+
148+
{
149+
NYdb::NImport::TImportClient client(driver);
150+
auto settings = NYdb::NImport::TImportFromS3Settings()
151+
.AppendItem({.Src = "Fake/S3/Prefix", .Dst = "/Root/Table"})
152+
.Endpoint("s3.fake.endpoint.net")
153+
.Bucket("fake_bucket")
154+
.AccessKey("fake_access_key")
155+
.SecretKey("fake_secret_key");
156+
157+
auto result = client.ImportFromS3(settings).ExtractValueSync();
158+
UNIT_ASSERT_VALUES_EQUAL_C(result.Status().GetStatus(), EStatus::BAD_REQUEST, result.Status().GetIssues().ToString());
159+
}
160+
}
161+
130162
}

0 commit comments

Comments
 (0)