Skip to content

Commit 69903d0

Browse files
25-1: Fixed bugs with exporting/importing changegeeds and added tests (#15805)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
1 parent 464a31d commit 69903d0

File tree

14 files changed

+246
-25
lines changed

14 files changed

+246
-25
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2136,3 +2136,7 @@ message TImportTableChangefeeds {
21362136
}
21372137
repeated TImportChangefeedTopic Changefeeds = 1;
21382138
}
2139+
2140+
message TChangefeedUnderlyingTopics {
2141+
repeated TPathDescription ChangefeedUnderlyingTopics = 1;
2142+
}

ydb/core/tx/datashard/export_s3_uploader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
241241
}
242242

243243
void UploadChangefeed() {
244+
Y_ABORT_UNLESS(!ChangefeedsUploaded);
244245
if (IndexExportedChangefeed == Changefeeds.size()) {
245246
ChangefeedsUploaded = true;
246247
if (Scanner) {

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
804804
return true;
805805
}
806806

807-
typedef std::tuple<TPathId, TString, TString, TString, TString, bool, TString, ui32, bool, bool> TBackupSettingsRec;
807+
typedef std::tuple<TPathId, TString, TString, TString, TString, bool, TString, ui32, bool, bool, TString> TBackupSettingsRec;
808808
typedef TDeque<TBackupSettingsRec> TBackupSettingsRows;
809809

810810
template <typename SchemaTable, typename TRowSet>
@@ -818,7 +818,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
818818
rowSet.template GetValueOrDefault<typename SchemaTable::TableDescription>(""),
819819
rowSet.template GetValueOrDefault<typename SchemaTable::NumberOfRetries>(0),
820820
rowSet.template GetValueOrDefault<typename SchemaTable::EnableChecksums>(false),
821-
rowSet.template GetValueOrDefault<typename SchemaTable::EnablePermissions>(false)
821+
rowSet.template GetValueOrDefault<typename SchemaTable::EnablePermissions>(false),
822+
rowSet.template GetValueOrDefault<typename SchemaTable::ChangefeedUnderlyingTopics>("")
822823
);
823824
}
824825

@@ -3803,6 +3804,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
38033804
ui32 nRetries = std::get<7>(rec);
38043805
bool enableChecksums = std::get<8>(rec);
38053806
bool enablePermissions = std::get<9>(rec);
3807+
TString changefeedUnderlyingTopics = std::get<10>(rec);
38063808

38073809
Y_ABORT_UNLESS(tableName.size() > 0);
38083810

@@ -3834,6 +3836,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
38343836
auto desc = tableInfo->BackupSettings.MutableTable();
38353837
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(*desc, tableDesc));
38363838
}
3839+
3840+
if (changefeedUnderlyingTopics) {
3841+
NKikimrSchemeOp::TChangefeedUnderlyingTopics wrapperOverTopics;
3842+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(wrapperOverTopics, changefeedUnderlyingTopics));
3843+
for (const auto& topic : wrapperOverTopics.GetChangefeedUnderlyingTopics()) {
3844+
*tableInfo->BackupSettings.AddChangefeedUnderlyingTopics() = topic;
3845+
}
3846+
}
38373847

38383848
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Loaded backup settings"
38393849
<< ", pathId: " << pathId

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3361,6 +3361,16 @@ void TSchemeShard::PersistTxShardStatus(NIceDb::TNiceDb& db, TOperationId opId,
33613361
);
33623362
}
33633363

3364+
NKikimrSchemeOp::TChangefeedUnderlyingTopics ConvertChangefeedUnderlyingTopics(
3365+
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TPathDescription>& changefeedUnderlyingTopics
3366+
) {
3367+
NKikimrSchemeOp::TChangefeedUnderlyingTopics result;
3368+
for (const auto& x : changefeedUnderlyingTopics) {
3369+
*result.AddChangefeedUnderlyingTopics() = x;
3370+
}
3371+
return result;
3372+
}
3373+
33643374
void TSchemeShard::PersistBackupSettings(
33653375
NIceDb::TNiceDb& db,
33663376
TPathId pathId,
@@ -3375,6 +3385,7 @@ void TSchemeShard::PersistBackupSettings(
33753385
NIceDb::TUpdate<Schema::BackupSettings::ScanSettings>(settings.GetScanSettings().SerializeAsString()), \
33763386
NIceDb::TUpdate<Schema::BackupSettings::NeedToBill>(settings.GetNeedToBill()), \
33773387
NIceDb::TUpdate<Schema::BackupSettings::TableDescription>(settings.GetTable().SerializeAsString()), \
3388+
NIceDb::TUpdate<Schema::BackupSettings::ChangefeedUnderlyingTopics>(ConvertChangefeedUnderlyingTopics(settings.GetChangefeedUnderlyingTopics()).SerializeAsString()), \
33783389
NIceDb::TUpdate<Schema::BackupSettings::NumberOfRetries>(settings.GetNumberOfRetries()), \
33793390
NIceDb::TUpdate<Schema::BackupSettings::EnableChecksums>(settings.GetEnableChecksums()), \
33803391
NIceDb::TUpdate<Schema::BackupSettings::EnablePermissions>(settings.GetEnablePermissions())); \
@@ -3385,6 +3396,7 @@ void TSchemeShard::PersistBackupSettings(
33853396
NIceDb::TUpdate<Schema::MigratedBackupSettings::ScanSettings>(settings.GetScanSettings().SerializeAsString()), \
33863397
NIceDb::TUpdate<Schema::MigratedBackupSettings::NeedToBill>(settings.GetNeedToBill()), \
33873398
NIceDb::TUpdate<Schema::MigratedBackupSettings::TableDescription>(settings.GetTable().SerializeAsString()), \
3399+
NIceDb::TUpdate<Schema::MigratedBackupSettings::ChangefeedUnderlyingTopics>(ConvertChangefeedUnderlyingTopics(settings.GetChangefeedUnderlyingTopics()).SerializeAsString()), \
33883400
NIceDb::TUpdate<Schema::MigratedBackupSettings::NumberOfRetries>(settings.GetNumberOfRetries()), \
33893401
NIceDb::TUpdate<Schema::MigratedBackupSettings::EnableChecksums>(settings.GetEnableChecksums()), \
33903402
NIceDb::TUpdate<Schema::MigratedBackupSettings::EnablePermissions>(settings.GetEnablePermissions())); \

ydb/core/tx/schemeshard/schemeshard_import__create.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
526526
return true;
527527
}
528528

529-
void CreateChangefeed(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
529+
bool CreateChangefeed(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId, TString& error) {
530530
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
531531
auto& item = importInfo->Items.at(itemIdx);
532532
item.SubState = ESubState::Proposed;
@@ -538,7 +538,13 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
538538

539539
Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);
540540

541-
Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item));
541+
auto propose = CreateChangefeedPropose(Self, txId, item, error);
542+
if (!propose) {
543+
return false;
544+
}
545+
546+
Send(Self->SelfId(), std::move(propose));
547+
return true;
542548
}
543549

544550
void CreateConsumers(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
@@ -1077,7 +1083,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10771083

10781084
case EState::CreateChangefeed:
10791085
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1080-
CreateChangefeed(importInfo, i, txId);
1086+
TString error;
1087+
if (!CreateChangefeed(importInfo, i, txId, error)) {
1088+
NIceDb::TNiceDb db(txc.DB);
1089+
CancelAndPersist(db, importInfo, i, error, "creation changefeed failed");
1090+
}
10811091
} else {
10821092
CreateConsumers(importInfo, i, txId);
10831093
}

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose(
234234
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
235235
TSchemeShard* ss,
236236
TTxId txId,
237-
const TImportInfo::TItem& item
237+
const TImportInfo::TItem& item,
238+
TString& error
238239
) {
239240
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());
240241

@@ -252,10 +253,8 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
252253
modifyScheme.SetWorkingDir(dstPath.Parent().PathString());
253254
cdcStream.SetTableName(dstPath.LeafName());
254255

255-
TString error;
256-
Ydb::StatusIds::StatusCode status;
257-
258256
auto& cdcStreamDescription = *cdcStream.MutableStreamDescription();
257+
Ydb::StatusIds::StatusCode status;
259258
if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) {
260259
return nullptr;
261260
}
@@ -268,6 +267,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
268267
i64 minActivePartitions =
269268
topic.partitioning_settings().min_active_partitions();
270269
if (minActivePartitions < 0) {
270+
error = "minActivePartitions must be >= 0";
271271
return nullptr;
272272
} else if (minActivePartitions == 0) {
273273
minActivePartitions = 1;
@@ -281,6 +281,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
281281
i64 maxActivePartitions =
282282
topic.partitioning_settings().max_active_partitions();
283283
if (maxActivePartitions < 0) {
284+
error = "maxActivePartitions must be >= 0";
284285
return nullptr;
285286
} else if (maxActivePartitions == 0) {
286287
maxActivePartitions = 50;

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose(
4949
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
5050
TSchemeShard* ss,
5151
TTxId txId,
52-
const TImportInfo::TItem& item
52+
const TImportInfo::TItem& item,
53+
TString& error
5354
);
5455

5556
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(

ydb/core/tx/schemeshard/schemeshard_schema.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,7 @@ struct Schema : NIceDb::Schema {
583583
struct YTSettings : Column<3, NScheme::NTypeIds::String> {};
584584
struct S3Settings : Column<6, NScheme::NTypeIds::String> {};
585585
struct TableDescription : Column<7, NScheme::NTypeIds::String> {};
586+
struct ChangefeedUnderlyingTopics : Column<13, NScheme::NTypeIds::String> {};
586587
struct NumberOfRetries : Column<8, NScheme::NTypeIds::Uint32> {};
587588
struct ScanSettings : Column<9, NScheme::NTypeIds::String> {};
588589
struct NeedToBill : Column<10, NScheme::NTypeIds::Bool> {};
@@ -605,7 +606,8 @@ struct Schema : NIceDb::Schema {
605606
ScanSettings,
606607
NeedToBill,
607608
EnableChecksums,
608-
EnablePermissions
609+
EnablePermissions,
610+
ChangefeedUnderlyingTopics
609611
>;
610612
};
611613

@@ -617,6 +619,7 @@ struct Schema : NIceDb::Schema {
617619
struct YTSettings : Column<4, NScheme::NTypeIds::String> {};
618620
struct S3Settings : Column<7, NScheme::NTypeIds::String> {};
619621
struct TableDescription : Column<8, NScheme::NTypeIds::String> {};
622+
struct ChangefeedUnderlyingTopics : Column<14, NScheme::NTypeIds::String> {};
620623
struct NumberOfRetries : Column<9, NScheme::NTypeIds::Uint32> {};
621624
struct ScanSettings : Column<10, NScheme::NTypeIds::String> {};
622625
struct NeedToBill : Column<11, NScheme::NTypeIds::Bool> {};
@@ -640,7 +643,8 @@ struct Schema : NIceDb::Schema {
640643
ScanSettings,
641644
NeedToBill,
642645
EnableChecksums,
643-
EnablePermissions
646+
EnablePermissions,
647+
ChangefeedUnderlyingTopics
644648
>;
645649
};
646650

ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,4 +509,82 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) {
509509
}
510510
)");
511511
}
512+
513+
class TestData {
514+
public:
515+
static const TTypedScheme& Table() {
516+
return TableScheme;
517+
}
518+
519+
static const TTypedScheme& Changefeed() {
520+
return ChangefeedScheme;
521+
}
522+
523+
static const TString& Request() {
524+
return RequestString;
525+
}
526+
527+
private:
528+
static const char* TableName;
529+
static const TTypedScheme TableScheme;
530+
static const TTypedScheme ChangefeedScheme;
531+
static const TString RequestString;
532+
};
533+
534+
const char* TestData::TableName = "Table";
535+
536+
const TTypedScheme TestData::TableScheme = TTypedScheme {
537+
EPathTypeTable,
538+
Sprintf(R"(
539+
Name: "%s"
540+
Columns { Name: "key" Type: "Utf8" }
541+
Columns { Name: "value" Type: "Utf8" }
542+
KeyColumnNames: ["key"]
543+
)", TableName)
544+
};
545+
546+
const TTypedScheme TestData::ChangefeedScheme = TTypedScheme {
547+
EPathTypeCdcStream,
548+
Sprintf(R"(
549+
TableName: "%s"
550+
StreamDescription {
551+
Name: "update_feed"
552+
Mode: ECdcStreamModeUpdate
553+
Format: ECdcStreamFormatJson
554+
State: ECdcStreamStateReady
555+
}
556+
)", TableName)
557+
};
558+
559+
const TString TestData::RequestString = R"(
560+
ExportToS3Settings {
561+
endpoint: "localhost:%d"
562+
scheme: HTTP
563+
items {
564+
source_path: "/MyRoot/Table"
565+
destination_prefix: ""
566+
}
567+
}
568+
)";
569+
570+
Y_UNIT_TEST(ShouldSucceedOnSingleShardTableWithChangefeed) {
571+
RunS3({
572+
TestData::Table(),
573+
TestData::Changefeed()
574+
}, TestData::Request());
575+
}
576+
577+
Y_UNIT_TEST(CancelOnSingleShardTableWithChangefeed) {
578+
CancelS3({
579+
TestData::Table(),
580+
TestData::Changefeed()
581+
}, TestData::Request());
582+
}
583+
584+
Y_UNIT_TEST(ForgetShouldSucceedOnSingleShardTableWithChangefeed) {
585+
ForgetS3({
586+
TestData::Table(),
587+
TestData::Changefeed()
588+
}, TestData::Request());
589+
}
512590
}

ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,28 @@ using namespace NKikimrSchemeOp;
1111
namespace NSchemeShardUT_Private {
1212
namespace NExportReboots {
1313

14+
void TestCreate(TTestActorRuntime& runtime, ui64 txId, const TString& scheme, NKikimrSchemeOp::EPathType pathType) {
15+
using TTestCreateFunc = ui64(*)(TTestActorRuntime&, ui64, const TString&, const TString&,
16+
const TVector<TExpectedResult>&, const TApplyIf&);
17+
18+
static const THashMap<NKikimrSchemeOp::EPathType, TTestCreateFunc> functions = {
19+
{EPathTypeTable, &TestSimpleCreateTable},
20+
{EPathTypeView, &TestCreateView},
21+
{EPathTypeCdcStream, &TestCreateCdcStream},
22+
};
23+
24+
auto it = functions.find(pathType);
25+
if (it != functions.end()) {
26+
it->second(runtime, txId, "/MyRoot", scheme, {NKikimrScheme::StatusAccepted}, {});
27+
} else {
28+
UNIT_FAIL("export is not implemented for the scheme object type: " << pathType);
29+
}
30+
}
31+
1432
void CreateSchemeObjects(TTestWithReboots& t, TTestActorRuntime& runtime, const TVector<TTypedScheme>& schemeObjects) {
1533
TSet<ui64> toWait;
1634
for (const auto& [type, scheme, _] : schemeObjects) {
17-
switch (type) {
18-
case EPathTypeTable:
19-
TestCreateTable(runtime, ++t.TxId, "/MyRoot", scheme);
20-
break;
21-
case EPathTypeView:
22-
TestCreateView(runtime, ++t.TxId, "/MyRoot", scheme);
23-
break;
24-
default:
25-
UNIT_FAIL("export is not implemented for the scheme object type: " << type);
26-
return;
27-
}
35+
TestCreate(runtime, ++t.TxId, scheme, type);
2836
toWait.insert(t.TxId);
2937
}
3038
t.TestEnv->TestWaitNotification(runtime, toWait);

0 commit comments

Comments
 (0)