Skip to content

Commit 6862d3c

Browse files
fix mvcc tests. use write id as row feature for conflicts resolving (#9598)
1 parent b47a8b8 commit 6862d3c

File tree

17 files changed

+156
-122
lines changed

17 files changed

+156
-122
lines changed

ydb/core/formats/arrow/arrow_filter.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,8 +575,10 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter
575575
}
576576

577577
TColumnFilter::TIterator TColumnFilter::GetIterator(const bool reverse, const ui32 expectedSize) const {
578-
if ((IsTotalAllowFilter() || IsTotalDenyFilter()) && !Filter.size()) {
579-
return TIterator(reverse, expectedSize, LastValue);
578+
if (IsTotalAllowFilter()) {
579+
return TIterator(reverse, expectedSize, true);
580+
} else if (IsTotalDenyFilter()) {
581+
return TIterator(reverse, expectedSize, false);
580582
} else {
581583
AFL_VERIFY(expectedSize == Size())("expected", expectedSize)("size", Size())("reverse", reverse);
582584
return TIterator(reverse, Filter, GetStartValue(reverse));

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7903,10 +7903,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
79037903
testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[3;\"-321\";\"-3.14\";[\"test_res_3\"]]]");
79047904
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[\"-3.14\"]]");
79057905
testHelper.ReadData("SELECT resource_id FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]");
7906-
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]");
7906+
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]");
79077907

79087908
testHelper.RebootTablets(testTable.GetName());
7909-
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest`", "[[#];[#];[\"-3.14\"]]");
7909+
testHelper.ReadData("SELECT new_column FROM `/Root/TableStoreTest/ColumnTableTest` ORDER BY new_column", "[[#];[#];[\"-3.14\"]]");
79107910
}
79117911

79127912
Y_UNIT_TEST(AddColumnErrors) {

ydb/core/protos/feature_flags.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,5 +162,6 @@ message TFeatureFlags {
162162
optional bool EnableExternalDataSourcesOnServerless = 143 [default = true];
163163
optional bool EnableSparsedColumns = 144 [default = false];
164164
optional bool EnableParameterizedDecimal = 145 [default = false];
165-
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
165+
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
166+
optional bool EnableInsertWriteIdSpecialColumnCompatibility = 147 [default = false];
166167
}

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -306,9 +306,10 @@ struct Schema : NIceDb::Schema {
306306

307307
struct BlobRangeOffset: Column<11, NScheme::NTypeIds::Uint64> {};
308308
struct BlobRangeSize: Column<12, NScheme::NTypeIds::Uint64> {};
309+
struct InsertWriteId: Column<13, NScheme::NTypeIds::Uint64> {};
309310

310311
using TKey = TableKey<Committed, PlanStep, WriteTxId, PathId, DedupId>;
311-
using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize>;
312+
using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion, BlobRangeOffset, BlobRangeSize, InsertWriteId>;
312313
};
313314

314315
struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> {
@@ -808,6 +809,7 @@ struct Schema : NIceDb::Schema {
808809
.Key((ui8)recType, 0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "")
809810
.Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
810811
NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset),
812+
NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()),
811813
NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size),
812814
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
813815
NIceDb::TUpdate<InsertTable::SchemaVersion>(data.GetSchemaVersion()));
@@ -818,6 +820,7 @@ struct Schema : NIceDb::Schema {
818820
.Key((ui8)EInsertTableIds::Committed, data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(),
819821
data.GetDedupId())
820822
.Update(NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
823+
NIceDb::TUpdate<InsertTable::InsertWriteId>((ui64)data.GetInsertWriteId()),
821824
NIceDb::TUpdate<InsertTable::BlobRangeOffset>(data.GetBlobRange().Offset),
822825
NIceDb::TUpdate<InsertTable::BlobRangeSize>(data.GetBlobRange().Size),
823826
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
@@ -982,15 +985,16 @@ class TInsertTableRecordLoadContext {
982985
NColumnShard::Schema::EInsertTableIds RecType;
983986
ui64 PlanStep;
984987
ui64 WriteTxId;
988+
TInsertWriteId InsertWriteId;
985989
ui64 PathId;
986990
YDB_ACCESSOR_DEF(TString, DedupId);
987991
ui64 SchemaVersion;
988992
TString BlobIdString;
989993
std::optional<NOlap::TUnifiedBlobId> BlobId;
990994
TString MetadataString;
991995
std::optional<NKikimrTxColumnShard::TLogicalMetadata> Metadata;
992-
std::optional<ui64> RangeOffset;
993-
std::optional<ui64> RangeSize;
996+
ui64 RangeOffset;
997+
ui64 RangeSize;
994998

995999
void Prepare(const IBlobGroupSelector* dsGroupSelector) {
9961000
AFL_VERIFY(!PreparedFlag);
@@ -1004,7 +1008,6 @@ class TInsertTableRecordLoadContext {
10041008
AFL_VERIFY(MetadataString);
10051009
Y_ABORT_UNLESS(meta.ParseFromString(MetadataString));
10061010
Metadata = std::move(meta);
1007-
AFL_VERIFY(!!RangeOffset == !!RangeSize);
10081011
}
10091012

10101013
bool PreparedFlag = false;
@@ -1013,8 +1016,13 @@ class TInsertTableRecordLoadContext {
10131016
public:
10141017
TInsertWriteId GetInsertWriteId() const {
10151018
AFL_VERIFY(ParsedFlag);
1016-
AFL_VERIFY(RecType != NColumnShard::Schema::EInsertTableIds::Committed);
1017-
return (TInsertWriteId)WriteTxId;
1019+
return InsertWriteId;
1020+
}
1021+
1022+
ui64 GetTxId() const {
1023+
AFL_VERIFY(ParsedFlag);
1024+
AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed);
1025+
return WriteTxId;
10181026
}
10191027

10201028
NColumnShard::Schema::EInsertTableIds GetRecType() const {
@@ -1024,6 +1032,7 @@ class TInsertTableRecordLoadContext {
10241032

10251033
ui64 GetPlanStep() const {
10261034
AFL_VERIFY(ParsedFlag);
1035+
AFL_VERIFY(RecType == NColumnShard::Schema::EInsertTableIds::Committed);
10271036
return PlanStep;
10281037
}
10291038

@@ -1035,19 +1044,12 @@ class TInsertTableRecordLoadContext {
10351044
void Upsert(NIceDb::TNiceDb& db) const {
10361045
AFL_VERIFY(ParsedFlag);
10371046
using namespace NColumnShard;
1038-
if (RangeOffset) {
1039-
db.Table<Schema::InsertTable>()
1040-
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
1041-
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
1042-
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(*RangeOffset),
1043-
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(*RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
1044-
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
1045-
} else {
1046-
db.Table<Schema::InsertTable>()
1047-
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
1048-
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
1049-
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
1050-
}
1047+
db.Table<Schema::InsertTable>()
1048+
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
1049+
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
1050+
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(RangeOffset),
1051+
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
1052+
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
10511053
}
10521054

10531055
template <class TRowset>
@@ -1059,41 +1061,40 @@ class TInsertTableRecordLoadContext {
10591061
PlanStep = rowset.template GetValue<Schema::InsertTable::PlanStep>();
10601062
WriteTxId = rowset.template GetValueOrDefault<Schema::InsertTable::WriteTxId>();
10611063
AFL_VERIFY(WriteTxId);
1064+
InsertWriteId = (TInsertWriteId)rowset.template GetValueOrDefault<Schema::InsertTable::InsertWriteId>(WriteTxId);
10621065

10631066
PathId = rowset.template GetValue<Schema::InsertTable::PathId>();
10641067
DedupId = rowset.template GetValue<Schema::InsertTable::DedupId>();
1065-
SchemaVersion =
1066-
rowset.template HaveValue<Schema::InsertTable::SchemaVersion>() ? rowset.template GetValue<Schema::InsertTable::SchemaVersion>() : 0;
1068+
SchemaVersion = rowset.template GetValueOrDefault<Schema::InsertTable::SchemaVersion>(0);
10671069
BlobIdString = rowset.template GetValue<Schema::InsertTable::BlobId>();
10681070
MetadataString = rowset.template GetValue<Schema::InsertTable::Meta>();
1069-
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>()) {
1070-
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
1071-
}
1072-
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>()) {
1073-
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
1074-
}
1071+
AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>());
1072+
AFL_VERIFY(rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>());
1073+
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
1074+
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
10751075
}
10761076

10771077
NOlap::TCommittedData BuildCommitted(const IBlobGroupSelector* dsGroupSelector) {
10781078
Prepare(dsGroupSelector);
10791079
using namespace NColumnShard;
10801080
AFL_VERIFY(RecType == Schema::EInsertTableIds::Committed);
1081-
auto userData = std::make_shared<NOlap::TUserData>(PathId,
1082-
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
1081+
auto userData = std::make_shared<NOlap::TUserData>(
1082+
PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt);
10831083
AFL_VERIFY(!!DedupId);
10841084
AFL_VERIFY(PlanStep);
1085-
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, DedupId);
1085+
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, InsertWriteId, DedupId);
10861086
}
10871087

10881088
NOlap::TInsertedData BuildInsertedOrAborted(const IBlobGroupSelector* dsGroupSelector) {
10891089
Prepare(dsGroupSelector);
10901090
using namespace NColumnShard;
1091+
AFL_VERIFY(InsertWriteId == (TInsertWriteId)WriteTxId)("insert", InsertWriteId)("write", WriteTxId);
10911092
AFL_VERIFY(RecType != Schema::EInsertTableIds::Committed);
1092-
auto userData = std::make_shared<NOlap::TUserData>(PathId,
1093-
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
1093+
auto userData = std::make_shared<NOlap::TUserData>(
1094+
PathId, NOlap::TBlobRange(*BlobId, RangeOffset, RangeSize), *Metadata, SchemaVersion, std::nullopt);
10941095
AFL_VERIFY(!DedupId);
10951096
AFL_VERIFY(!PlanStep);
1096-
return NOlap::TInsertedData((TInsertWriteId)WriteTxId, userData);
1097+
return NOlap::TInsertedData(InsertWriteId, userData);
10971098
}
10981099
};
10991100

ydb/core/tx/columnshard/common/portion.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ class TSpecialColumns {
1717
public:
1818
static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
1919
static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
20+
static constexpr const char* SPEC_COL_WRITE_ID = "_yql_write_id";
2021
static constexpr const char* SPEC_COL_DELETE_FLAG = "_yql_delete_flag";
2122
static const ui32 SPEC_COL_PLAN_STEP_INDEX = 0xffffff00;
2223
static const ui32 SPEC_COL_TX_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 1;
23-
static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
24+
static const ui32 SPEC_COL_WRITE_ID_INDEX = SPEC_COL_PLAN_STEP_INDEX + 2;
25+
static const ui32 SPEC_COL_DELETE_FLAG_INDEX = SPEC_COL_PLAN_STEP_INDEX + 3;
2426
};
2527

2628
}

ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
7676
TMergingContext mergingContext(batchResults, Batches);
7777

7878
for (auto&& [columnId, columnData] : columnsData) {
79+
if (columnId == (ui32)IIndexInfo::ESpecialColumn::WRITE_ID &&
80+
(!HasAppData() || !AppDataVerified().FeatureFlags.GetEnableInsertWriteIdSpecialColumnCompatibility())) {
81+
continue;
82+
}
7983
const TString& columnName = resultFiltered->GetIndexInfo().GetColumnName(columnId);
8084
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("field_name", columnName));
8185
auto columnInfo = stats->GetColumnInfo(columnId);
@@ -125,13 +129,6 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
125129
AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())(
126130
"current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
127131
}
128-
auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
129-
auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
130-
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx);
131-
Y_ABORT_UNLESS(columnSnapshotTxIdx);
132-
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
133-
Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
134-
135132
std::vector<TGeneralSerializedSlice> batchSlices;
136133
std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats));
137134

ydb/core/tx/columnshard/engines/changes/general_compaction.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
114114
if (dataColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) {
115115
pkColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
116116
}
117+
dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::WRITE_ID);
117118
}
118119
resultFiltered = std::make_shared<TFilteredSnapshotSchema>(resultSchema, dataColumnIds);
119120
{

ydb/core/tx/columnshard/engines/changes/indexation.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
244244
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
245245
blobSchema->AdaptBatchToSchema(*batch, resultSchema);
246246
}
247-
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot());
247+
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot(), (ui64)inserted.GetInsertWriteId());
248248

249249
auto& pathInfo = pathBatches.GetPathInfo(inserted.GetPathId());
250250

0 commit comments

Comments
 (0)