Skip to content

Commit 0d6580b

Browse files
authored
25-1: view import: retry failed creation if dependencies are missing (#15757) (#15841)
1 parent 72f8af9 commit 0d6580b

File tree

3 files changed

+164
-49
lines changed

3 files changed

+164
-49
lines changed

ydb/core/tx/schemeshard/schemeshard_import__create.cpp

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,24 +35,27 @@ bool IsWaiting(const TItem& item) {
3535
return item.State == EState::Waiting;
3636
}
3737

38-
THashSet<EState> CollectItemStates(const TVector<TItem>& items) {
39-
THashSet<EState> itemStates;
38+
THashMap<EState, int> CountItemsByState(const TVector<TItem>& items) {
39+
THashMap<EState, int> counter;
4040
for (const auto& item : items) {
41-
itemStates.emplace(item.State);
41+
counter[item.State]++;
4242
}
43-
return itemStates;
43+
return counter;
4444
}
4545

46-
bool AllDone(const THashSet<EState>& itemStates) {
47-
return AllOf(itemStates, [](EState state) { return state == EState::Done; });
46+
bool AllDone(const THashMap<EState, int>& stateCounts) {
47+
return AllOf(stateCounts, [](const auto& stateCount) { return stateCount.first == EState::Done; });
4848
}
4949

50-
bool AllWaiting(const THashSet<EState>& itemStates) {
51-
return AllOf(itemStates, [](EState state) { return state == EState::Waiting; });
50+
bool AllWaiting(const THashMap<EState, int>& stateCounts) {
51+
return AllOf(stateCounts, [](const auto& stateCount) { return stateCount.first == EState::Waiting; });
5252
}
5353

54-
bool AllDoneOrWaiting(const THashSet<EState>& itemStates) {
55-
return AllOf(itemStates, [](EState state) { return state == EState::Done || state == EState::Waiting; });
54+
bool AllDoneOrWaiting(const THashMap<EState, int>& stateCounts) {
55+
return AllOf(stateCounts, [](const auto& stateCount) {
56+
return stateCount.first == EState::Done
57+
|| stateCount.first == EState::Waiting;
58+
});
5659
}
5760

5861
// the item is to be created by query, i.e. it is not a table
@@ -431,20 +434,20 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
431434
TVector<ui32> retriedItems;
432435
for (ui32 itemIdx : xrange(importInfo->Items.size())) {
433436
auto& item = importInfo->Items[itemIdx];
434-
if (IsWaiting(item) && IsCreatedByQuery(item) && item.ViewCreationRetries == 0) {
437+
if (IsWaiting(item) && IsCreatedByQuery(item)) {
435438
item.SchemeQueryExecutor = ctx.Register(CreateSchemeQueryExecutor(
436439
Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database
437440
));
438441
Self->RunningImportSchemeQueryExecutors.emplace(item.SchemeQueryExecutor);
439442

440443
item.State = EState::CreateSchemeObject;
441-
item.ViewCreationRetries++;
442444
Self->PersistImportItemState(db, importInfo, itemIdx);
443445

444446
retriedItems.emplace_back(itemIdx);
445447
}
446448
}
447449
if (!retriedItems.empty()) {
450+
importInfo->WaitingViews = std::ssize(retriedItems);
448451
LOG_D("TImport::TTxProgress: retry view creation"
449452
<< ": id# " << importInfo->Id
450453
<< ", retried items# " << JoinSeq(", ", retriedItems)
@@ -995,17 +998,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
995998
auto& item = importInfo->Items[message.ItemIdx];
996999
Self->RunningImportSchemeQueryExecutors.erase(std::exchange(item.SchemeQueryExecutor, {}));
9971000

998-
if (message.Status == Ydb::StatusIds::SCHEME_ERROR && item.ViewCreationRetries == 0) {
1001+
if (message.Status == Ydb::StatusIds::SCHEME_ERROR) {
9991002
// Scheme error happens when the view depends on a table (or a view) that is not yet imported.
10001003
// Instead of tracking view dependencies, we simply retry the creation of the view later.
10011004
item.State = EState::Waiting;
10021005
Self->PersistImportItemState(db, importInfo, message.ItemIdx);
10031006

1004-
const auto itemStates = CollectItemStates(importInfo->Items);
1005-
if (AllWaiting(itemStates)) {
1007+
const auto stateCounts = CountItemsByState(importInfo->Items);
1008+
if (AllWaiting(stateCounts)) {
10061009
// Cancel the import, or we will end up waiting indefinitely.
10071010
return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed");
1008-
} else if (AllDoneOrWaiting(itemStates)) {
1011+
} else if (AllDoneOrWaiting(stateCounts)) {
1012+
if (stateCounts.at(EState::Waiting) == importInfo->WaitingViews) {
1013+
// No progress has been made since the last view creation retry.
1014+
return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed");
1015+
}
10091016
RetryViewsCreation(importInfo, db, ctx);
10101017
}
10111018
return;
@@ -1080,7 +1087,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10801087
BuildIndex(importInfo, i, txId);
10811088
itemIdx = i;
10821089
break;
1083-
1090+
10841091
case EState::CreateChangefeed:
10851092
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
10861093
TString error;
@@ -1159,7 +1166,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
11591166
} else {
11601167
txId = GetActiveCreateConsumerTxId(importInfo, itemIdx);
11611168
}
1162-
1169+
11631170
}
11641171
}
11651172

@@ -1352,7 +1359,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
13521359
}
13531360
}
13541361
break;
1355-
1362+
13561363
case EState::CreateChangefeed:
13571364
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
13581365
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
@@ -1369,11 +1376,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
13691376
return SendNotificationsIfFinished(importInfo);
13701377
}
13711378

1372-
const auto itemStates = CollectItemStates(importInfo->Items);
1373-
if (AllDone(itemStates)) {
1379+
const auto stateCounts = CountItemsByState(importInfo->Items);
1380+
if (AllDone(stateCounts)) {
13741381
importInfo->State = EState::Done;
13751382
importInfo->EndTime = TAppData::TimeProvider->Now();
1376-
} else if (AllDoneOrWaiting(itemStates)) {
1383+
} else if (AllDoneOrWaiting(stateCounts)) {
13771384
RetryViewsCreation(importInfo, db, ctx);
13781385
}
13791386

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2869,7 +2869,6 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
28692869
int NextIndexIdx = 0;
28702870
int NextChangefeedIdx = 0;
28712871
TString Issue;
2872-
int ViewCreationRetries = 0;
28732872
TPathId StreamImplPathId;
28742873

28752874
TItem() = default;
@@ -2901,6 +2900,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
29012900
EState State = EState::Invalid;
29022901
TString Issue;
29032902
TVector<TItem> Items;
2903+
int WaitingViews = 0;
29042904

29052905
TSet<TActorId> Subscribers;
29062906

0 commit comments

Comments
 (0)