Skip to content

Commit 65e10f5

Browse files
authored
remove blob columns not present in current scheme
1 parent 766ee48 commit 65e10f5

File tree

15 files changed

+161
-14
lines changed

15 files changed

+161
-14
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ ydb/core/kqp/ut/query KqpLimits.QueryReplySize
1717
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
1818
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
1919
ydb/core/kqp/ut/scheme [*/*]*
20-
ydb/core/kqp/ut/scheme KqpOlapScheme.DropThenAddColumn
2120
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
2221
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
2322
ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter

ydb/core/formats/arrow/common/container.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "container.h"
22

3+
#include <ydb/core/formats/arrow/common/vector_operations.h>
34
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
45
#include <ydb/core/formats/arrow/arrow_helpers.h>
56
#include <ydb/core/formats/arrow/simple_arrays_cache.h>
@@ -59,6 +60,11 @@ TConclusionStatus TGeneralContainer::AddField(const std::shared_ptr<arrow::Field
5960
return AddField(f, std::make_shared<NAccessor::TTrivialArray>(data));
6061
}
6162

63+
void TGeneralContainer::DeleteFieldsByIndex(const std::vector<ui32>& idxs) {
64+
Schema->DeleteFieldsByIndex(idxs);
65+
NUtil::EraseItems(Columns, idxs);
66+
}
67+
6268
void TGeneralContainer::Initialize() {
6369
std::optional<ui64> recordsCount;
6470
AFL_VERIFY(Schema->num_fields() == (i32)Columns.size())("schema", Schema->num_fields())("columns", Columns.size());

ydb/core/formats/arrow/common/container.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ class TGeneralContainer {
7474

7575
[[nodiscard]] TConclusionStatus AddField(const std::shared_ptr<arrow::Field>& f, const std::shared_ptr<arrow::ChunkedArray>& data);
7676

77+
void DeleteFieldsByIndex(const std::vector<ui32>& idxs);
78+
7779
TGeneralContainer(const std::shared_ptr<arrow::Table>& table);
7880
TGeneralContainer(const std::shared_ptr<arrow::RecordBatch>& table);
7981
TGeneralContainer(const std::shared_ptr<arrow::Schema>& schema, std::vector<std::shared_ptr<NAccessor::IChunkedArray>>&& columns);
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#pragma once
2+
3+
#include <ydb/library/actors/core/log.h>
4+
5+
#include <util/system/types.h>
6+
7+
#include <vector>
8+
9+
namespace NKikimr::NArrow::NUtil {
10+
11+
template <typename T>
12+
class TDefaultErasePolicy {
13+
public:
14+
void OnEraseItem(const T& /*item*/) const {
15+
}
16+
void OnMoveItem(const T& /*item*/, const ui64 /*new_index*/) const {
17+
}
18+
};
19+
20+
template <typename T, typename ErasePolicy = TDefaultErasePolicy<T>>
21+
void EraseItems(std::vector<T>& container, const std::vector<ui32>& idxsToErase, const ErasePolicy& policy = TDefaultErasePolicy<T>()) {
22+
if (idxsToErase.empty()) {
23+
return;
24+
}
25+
AFL_VERIFY(idxsToErase.front() < container.size());
26+
27+
auto itNextEraseIdx = idxsToErase.begin();
28+
ui64 writeIdx = idxsToErase.front();
29+
ui64 readIdx = idxsToErase.front();
30+
while (readIdx != container.size()) {
31+
AFL_VERIFY(itNextEraseIdx != idxsToErase.end() && readIdx == *itNextEraseIdx);
32+
33+
policy.OnEraseItem(container[readIdx]);
34+
++readIdx;
35+
++itNextEraseIdx;
36+
if (itNextEraseIdx != idxsToErase.end()) {
37+
AFL_VERIFY(*itNextEraseIdx > *std::prev(itNextEraseIdx));
38+
AFL_VERIFY(*itNextEraseIdx < container.size());
39+
}
40+
41+
const ui64 nextReadIdx = itNextEraseIdx == idxsToErase.end() ? container.size() : *itNextEraseIdx;
42+
while (readIdx != nextReadIdx) {
43+
std::swap(container[writeIdx], container[readIdx]);
44+
policy.OnMoveItem(container[writeIdx], writeIdx);
45+
++writeIdx;
46+
++readIdx;
47+
}
48+
}
49+
50+
container.resize(writeIdx);
51+
AFL_VERIFY(itNextEraseIdx == idxsToErase.end());
52+
}
53+
54+
} // namespace NKikimr::NArrow::NUtil

ydb/core/formats/arrow/modifier/schema.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "schema.h"
22
#include <util/string/builder.h>
3+
#include <ydb/core/formats/arrow/common/vector_operations.h>
34
#include <ydb/library/actors/core/log.h>
45

56
namespace NKikimr::NArrow::NModifier {
@@ -29,6 +30,12 @@ TConclusionStatus TSchema::AddField(const std::shared_ptr<arrow::Field>& f) {
2930
return TConclusionStatus::Success();
3031
}
3132

33+
void TSchema::DeleteFieldsByIndex(const std::vector<ui32>& idxs) {
34+
AFL_VERIFY(Initialized);
35+
AFL_VERIFY(!Finished);
36+
NUtil::EraseItems(Fields, idxs, TFieldsErasePolicy(this));
37+
}
38+
3239
TString TSchema::ToString() const {
3340
TStringBuilder result;
3441
for (auto&& i : Fields) {

ydb/core/formats/arrow/modifier/schema.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#pragma once
2+
#include <ydb/library/actors/core/log.h>
23
#include <ydb/library/conclusion/status.h>
34
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
45
#include <util/generic/hash.h>
@@ -39,6 +40,7 @@ class TSchema {
3940
std::shared_ptr<arrow::Schema> Finish();
4041
[[nodiscard]] TConclusionStatus AddField(const std::shared_ptr<arrow::Field>& f);
4142
const std::shared_ptr<arrow::Field>& GetFieldByName(const std::string& name) const;
43+
void DeleteFieldsByIndex(const std::vector<ui32>& idxs);
4244

4345
bool HasField(const std::string& name) const {
4446
return IndexByName.contains(name);
@@ -51,5 +53,26 @@ class TSchema {
5153
const std::shared_ptr<arrow::Field>& GetFieldVerified(const ui32 index) const;
5254

5355
const std::shared_ptr<arrow::Field>& field(const ui32 index) const;
56+
57+
private:
58+
class TFieldsErasePolicy {
59+
private:
60+
TSchema* const Owner;
61+
62+
public:
63+
TFieldsErasePolicy(TSchema* const owner)
64+
: Owner(owner) {
65+
}
66+
67+
void OnEraseItem(const std::shared_ptr<arrow::Field>& item) const {
68+
Owner->IndexByName.erase(item->name());
69+
}
70+
71+
void OnMoveItem(const std::shared_ptr<arrow::Field>& item, const ui64 new_index) const {
72+
auto* findField = Owner->IndexByName.FindPtr(item->name());
73+
AFL_VERIFY(findField);
74+
*findField = new_index;
75+
}
76+
};
5477
};
5578
}

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7849,7 +7849,11 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
78497849
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]");
78507850
}
78517851

7852-
Y_UNIT_TEST(DropThenAddColumn) {
7852+
void TestDropThenAddColumn(bool enableIndexation, bool enableCompaction) {
7853+
if (enableCompaction) {
7854+
Y_ABORT_UNLESS(enableIndexation);
7855+
}
7856+
78537857
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
78547858
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);
78557859
csController->DisableBackground(NYDBTest::ICSController::EBackground::Compaction);
@@ -7874,12 +7878,14 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
78747878
testHelper.BulkUpsert(testTable, tableInserter);
78757879
}
78767880

7877-
csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
7878-
csController->EnableBackground(NYDBTest::ICSController::EBackground::Compaction);
7879-
csController->WaitIndexation(TDuration::Seconds(5));
7880-
csController->WaitCompactions(TDuration::Seconds(5));
7881-
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);
7882-
csController->DisableBackground(NYDBTest::ICSController::EBackground::Compaction);
7881+
if (enableCompaction) {
7882+
csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
7883+
csController->EnableBackground(NYDBTest::ICSController::EBackground::Compaction);
7884+
csController->WaitIndexation(TDuration::Seconds(5));
7885+
csController->WaitCompactions(TDuration::Seconds(5));
7886+
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);
7887+
csController->DisableBackground(NYDBTest::ICSController::EBackground::Compaction);
7888+
}
78837889

78847890
{
78857891
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` DROP COLUMN value;";
@@ -7900,12 +7906,28 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
79007906
testHelper.BulkUpsert(testTable, tableInserter);
79017907
}
79027908

7903-
csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
7904-
csController->EnableBackground(NYDBTest::ICSController::EBackground::Compaction);
7905-
csController->WaitIndexation(TDuration::Seconds(5));
7906-
csController->WaitCompactions(TDuration::Seconds(5));
7909+
if (enableIndexation) {
7910+
csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
7911+
csController->WaitIndexation(TDuration::Seconds(5));
7912+
}
7913+
if (enableCompaction) {
7914+
csController->EnableBackground(NYDBTest::ICSController::EBackground::Compaction);
7915+
csController->WaitCompactions(TDuration::Seconds(5));
7916+
}
7917+
7918+
testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest`", "[[#];[#];[[42u]];[[43u]]]");
7919+
}
7920+
7921+
Y_UNIT_TEST(DropThenAddColumn) {
7922+
TestDropThenAddColumn(false, false);
7923+
}
7924+
7925+
Y_UNIT_TEST(DropThenAddColumnIndexation) {
7926+
TestDropThenAddColumn(true, true);
7927+
}
79077928

7908-
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest`", "[[4;#;[\"test_res_1\"]]]");
7929+
Y_UNIT_TEST(DropThenAddColumnCompaction) {
7930+
TestDropThenAddColumn(true, true);
79097931
}
79107932

79117933
Y_UNIT_TEST(DropTtlColumn) {

ydb/core/tx/columnshard/engines/changes/indexation.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
235235
auto batchSchema =
236236
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().ArrowSchema()->fields()));
237237
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
238+
blobSchema->AdaptBatchToSchema(*batch, resultSchema);
238239
}
239240
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot());
240241

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,18 +215,21 @@ bool TCommittedDataSource::DoStartFetchingColumns(
215215

216216
void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns) {
217217
TMemoryProfileGuard mGuard("SCAN_PROFILE::ASSEMBLER::COMMITTED", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
218+
const ISnapshotSchema::TPtr batchSchema = GetContext()->GetReadMetadata()->GetIndexVersions().GetSchemaVerified(GetCommitted().GetSchemaVersion());
219+
const ISnapshotSchema::TPtr resultSchema = GetContext()->GetReadMetadata()->GetResultSchema();
218220
if (!GetStageData().GetTable()) {
219221
AFL_VERIFY(GetStageData().GetBlobs().size() == 1);
220222
auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first);
221223
auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion());
222224
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema->fields())));
223225
AFL_VERIFY(rBatch)("schema", schema->ToString());
224226
auto batch = std::make_shared<NArrow::TGeneralContainer>(rBatch);
227+
batchSchema->AdaptBatchToSchema(*batch, resultSchema);
225228
GetContext()->GetReadMetadata()->GetIndexInfo().AddSnapshotColumns(*batch, CommittedBlob.GetSnapshotDef(TSnapshot::Zero()));
226229
GetContext()->GetReadMetadata()->GetIndexInfo().AddDeleteFlagsColumn(*batch, CommittedBlob.GetIsDelete());
227230
MutableStageData().AddBatch(batch);
228231
}
229-
MutableStageData().SyncTableColumns(columns->GetSchema()->fields(), *GetContext()->GetReadMetadata()->GetResultSchema());
232+
MutableStageData().SyncTableColumns(columns->GetSchema()->fields(), *resultSchema);
230233
}
231234

232235
} // namespace NKikimr::NOlap::NReader::NPlain

ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,22 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi
140140
}
141141
}
142142

143+
void ISnapshotSchema::AdaptBatchToSchema(NArrow::TGeneralContainer& batch, const ISnapshotSchema::TPtr& targetSchema) const {
144+
if (targetSchema->GetVersion() != GetVersion()) {
145+
std::vector<ui32> columnIdxToDelete;
146+
for (size_t columnIdx = 0; columnIdx < batch.GetSchema()->GetFields().size(); ++columnIdx) {
147+
const std::optional<ui32> targetColumnId = targetSchema->GetColumnIdOptional(batch.GetSchema()->field(columnIdx)->name());
148+
const ui32 batchColumnId = GetColumnIdVerified(GetFieldByIndex(columnIdx)->name());
149+
if (!targetColumnId || *targetColumnId != batchColumnId) {
150+
columnIdxToDelete.emplace_back(columnIdx);
151+
}
152+
}
153+
if (!columnIdxToDelete.empty()) {
154+
batch.DeleteFieldsByIndex(columnIdxToDelete);
155+
}
156+
}
157+
}
158+
143159
ui32 ISnapshotSchema::GetColumnId(const std::string& columnName) const {
144160
auto id = GetColumnIdOptional(columnName);
145161
AFL_VERIFY(id)("column_name", columnName)("schema", JoinSeq(",", GetSchema()->field_names()));

ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class ISnapshotSchema {
4949
std::vector<std::string> GetPKColumnNames() const;
5050

5151
virtual std::optional<ui32> GetColumnIdOptional(const std::string& columnName) const = 0;
52+
virtual ui32 GetColumnIdVerified(const std::string& columnName) const = 0;
5253
virtual int GetFieldIndex(const ui32 columnId) const = 0;
5354
bool HasColumnId(const ui32 columnId) const {
5455
return GetFieldIndex(columnId) >= 0;
@@ -76,6 +77,7 @@ class ISnapshotSchema {
7677
const ISnapshotSchema& dataSchema, const std::shared_ptr<NArrow::TGeneralContainer>& batch, const std::set<ui32>& restoreColumnIds) const;
7778
[[nodiscard]] TConclusion<std::shared_ptr<arrow::RecordBatch>> PrepareForModification(
7879
const std::shared_ptr<arrow::RecordBatch>& incomingBatch, const NEvWrite::EModificationType mType) const;
80+
void AdaptBatchToSchema(NArrow::TGeneralContainer& batch, const ISnapshotSchema::TPtr& targetSchema) const;
7981
};
8082

8183
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ std::optional<ui32> TFilteredSnapshotSchema::GetColumnIdOptional(const std::stri
5858
return result;
5959
}
6060

61+
ui32 TFilteredSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const {
62+
auto result = OriginalSnapshot->GetColumnIdVerified(columnName);
63+
AFL_VERIFY(ColumnIds.contains(result));
64+
return result;
65+
}
66+
6167
int TFilteredSnapshotSchema::GetFieldIndex(const ui32 columnId) const {
6268
if (!ColumnIds.contains(columnId)) {
6369
return -1;

ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class TFilteredSnapshotSchema: public ISnapshotSchema {
2323
TColumnSaver GetColumnSaver(const ui32 columnId) const override;
2424
std::shared_ptr<TColumnLoader> GetColumnLoaderOptional(const ui32 columnId) const override;
2525
std::optional<ui32> GetColumnIdOptional(const std::string& columnName) const override;
26+
ui32 GetColumnIdVerified(const std::string& columnName) const override;
2627
int GetFieldIndex(const ui32 columnId) const override;
2728

2829
const std::shared_ptr<arrow::Schema>& GetSchema() const override;

ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ std::optional<ui32> TSnapshotSchema::GetColumnIdOptional(const std::string& colu
2121
return IndexInfo.GetColumnIdOptional(columnName);
2222
}
2323

24+
ui32 TSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const {
25+
return IndexInfo.GetColumnIdVerified(columnName);
26+
}
27+
2428
int TSnapshotSchema::GetFieldIndex(const ui32 columnId) const {
2529
const TString& columnName = IndexInfo.GetColumnName(columnId, false);
2630
if (!columnName) {

ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class TSnapshotSchema: public ISnapshotSchema {
3030
TColumnSaver GetColumnSaver(const ui32 columnId) const override;
3131
std::shared_ptr<TColumnLoader> GetColumnLoaderOptional(const ui32 columnId) const override;
3232
std::optional<ui32> GetColumnIdOptional(const std::string& columnName) const override;
33+
ui32 GetColumnIdVerified(const std::string& columnName) const override;
3334
int GetFieldIndex(const ui32 columnId) const override;
3435

3536
const std::shared_ptr<arrow::Schema>& GetSchema() const override;

0 commit comments

Comments
 (0)