Skip to content

Commit f4914ce

Browse files
authored
Fix partitioning for empty tables (#9372)
1 parent bf1a83a commit f4914ce

File tree

4 files changed

+31
-9
lines changed

4 files changed

+31
-9
lines changed

ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
411411

412412
public:
413413
TAsyncReshardingTest() {
414-
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 24, 4);
414+
TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 128, 4);
415415
}
416416

417417
void AddBatch(int numRows) {
@@ -475,6 +475,18 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
475475
tester.CheckCount();
476476
}
477477

478+
Y_UNIT_TEST(SplitEmpty) {
479+
TAsyncReshardingTest tester;
480+
481+
tester.CheckCount();
482+
483+
tester.StartResharding("SPLIT");
484+
485+
tester.CheckCount();
486+
tester.WaitResharding();
487+
tester.CheckCount();
488+
}
489+
478490
Y_UNIT_TEST(ChangeSchemaAndSplit) {
479491
TAsyncReshardingTest tester;
480492
tester.DisableCompaction();

ydb/core/tx/columnshard/data_sharing/common/session/common.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,12 @@ bool TCommonSession::TryStart(const NColumnShard::TColumnShard& shard) {
2222
THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>> portionsByPath;
2323
THashSet<TString> StoragesIds;
2424
for (auto&& i : GetPathIdsForStart()) {
25-
auto& portionsVector = portionsByPath[i];
2625
const auto& g = index.GetGranuleVerified(i);
2726
for (auto&& p : g.GetPortionsOlderThenSnapshot(GetSnapshotBarrier())) {
2827
if (shard.GetDataLocksManager()->IsLocked(*p.second, { "sharing_session:" + GetSessionId() })) {
2928
return false;
3029
}
31-
portionsVector.emplace_back(p.second);
30+
portionsByPath[i].emplace_back(p.second);
3231
}
3332
}
3433

@@ -52,7 +51,7 @@ void TCommonSession::PrepareToStart(const NColumnShard::TColumnShard& shard) {
5251
}
5352

5453
void TCommonSession::Finish(const NColumnShard::TColumnShard& shard, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) {
55-
AFL_VERIFY(State == EState::InProgress);
54+
AFL_VERIFY(State == EState::InProgress || State == EState::Prepared);
5655
State = EState::Finished;
5756
shard.GetSharingSessionsManager()->FinishSharingSession();
5857
AFL_VERIFY(LockGuard);

ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ bool TSessionsManager::Load(NTable::TDatabase& database, const TColumnEngineForL
7979
AFL_VERIFY(protoSessionCursorStatic->ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorStatic>()));
8080
}
8181

82+
if (protoSessionCursorDynamic && !protoSessionCursorStatic) {
83+
protoSessionCursorStatic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic{};
84+
}
85+
8286
AFL_VERIFY(index);
8387
session->DeserializeFromProto(protoSession, protoSessionCursorDynamic, protoSessionCursorStatic).Validate();
8488
AFL_VERIFY(SourceSessions.emplace(session->GetSessionId(), session).second);

ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,11 @@ NKikimr::TConclusionStatus TSourceCursor::DeserializeFromProto(const NKikimrColu
132132
for (auto&& i : protoStatic.GetPathHashes()) {
133133
PathPortionHashes.emplace(i.GetPathId(), i.GetHash());
134134
}
135-
AFL_VERIFY(PathPortionHashes.size());
136-
IsStaticSaved = true;
135+
if (PathPortionHashes.empty()) {
136+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", "empty static cursor");
137+
} else {
138+
IsStaticSaved = true;
139+
}
137140
return TConclusionStatus::Success();
138141
}
139142

@@ -178,10 +181,14 @@ bool TSourceCursor::Start(const std::shared_ptr<IStoragesManager>& storagesManag
178181
local.emplace(i.first, std::move(portionsMap));
179182
}
180183
std::swap(PortionsForSend, local);
181-
if (!StartPathId) {
182-
AFL_VERIFY(PortionsForSend.size());
183-
AFL_VERIFY(PortionsForSend.begin()->second.size());
184184

185+
if (PortionsForSend.empty()) {
186+
AFL_VERIFY(!StartPortionId);
187+
NextPathId = std::nullopt;
188+
NextPortionId = std::nullopt;
189+
return true;
190+
} else if (!StartPathId) {
191+
AFL_VERIFY(PortionsForSend.begin()->second.size());
185192
NextPathId = PortionsForSend.begin()->first;
186193
NextPortionId = PortionsForSend.begin()->second.begin()->first;
187194
AFL_VERIFY(Next(storagesManager, index));

0 commit comments

Comments
 (0)