Skip to content

Commit 8ef7024

Browse files
committed
Schema Versions for sinks (#18862)
1 parent ec934fb commit 8ef7024

File tree

2 files changed

+191
-11
lines changed

2 files changed

+191
-11
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,10 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
291291
return ShardedWriteController->IsEmpty();
292292
}
293293

294+
const TTableId& GetTableId() const {
295+
return TableId;
296+
}
297+
294298
TVector<NKikimrDataEvents::TLock> GetLocks() const {
295299
return TxManager->GetLocks();
296300
}
@@ -1267,6 +1271,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
12671271
Stats.AffectedPartitions.clear();
12681272
}
12691273

1274+
private:
12701275
NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false);
12711276

12721277
TString LogPrefix;
@@ -1634,11 +1639,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
16341639
namespace {
16351640

16361641
struct TWriteToken {
1637-
TTableId TableId;
1642+
TPathId PathId;
16381643
ui64 Cookie;
16391644

16401645
bool IsEmpty() const {
1641-
return !TableId;
1646+
return !PathId;
16421647
}
16431648
};
16441649

@@ -1775,7 +1780,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
17751780
InconsistentTx = settings.TransactionSettings.InconsistentTx;
17761781
}
17771782

1778-
auto& writeInfo = WriteInfos[settings.TableId];
1783+
auto& writeInfo = WriteInfos[settings.TableId.PathId];
17791784
if (!writeInfo.WriteTableActor) {
17801785
TVector<NScheme::TTypeInfo> keyColumnTypes;
17811786
keyColumnTypes.reserve(settings.KeyColumns.size());
@@ -1804,6 +1809,19 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18041809
CA_LOG_D("Create new TableWriteActor for table `" << settings.TablePath << "` (" << settings.TableId << "). lockId=" << LockTxId << " " << writeInfo.WriteTableActorId);
18051810
}
18061811

1812+
if (writeInfo.WriteTableActor->GetTableId().SchemaVersion != settings.TableId.SchemaVersion) {
1813+
CA_LOG_E("Scheme changed for table `"
1814+
<< settings.TablePath << "`.");
1815+
ReplyErrorAndDie(
1816+
NYql::NDqProto::StatusIds::SCHEME_ERROR,
1817+
NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH,
1818+
TStringBuilder() << "Scheme changed. Table: `"
1819+
<< settings.TablePath << "`.",
1820+
{});
1821+
return;
1822+
}
1823+
AFL_ENSURE(writeInfo.WriteTableActor->GetTableId() == settings.TableId);
1824+
18071825
EnableStreamWrite &= settings.EnableStreamWrite;
18081826

18091827
auto cookie = writeInfo.WriteTableActor->Open(
@@ -1812,12 +1830,12 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18121830
std::move(settings.Columns),
18131831
std::move(settings.WriteIndex),
18141832
settings.Priority);
1815-
token = TWriteToken{settings.TableId, cookie};
1833+
token = TWriteToken{settings.TableId.PathId, cookie};
18161834
} else {
18171835
token = *ev->Get()->Token;
18181836
}
18191837

1820-
auto& queue = DataQueues[token.TableId];
1838+
auto& queue = DataQueues[token.PathId];
18211839
queue.emplace();
18221840
auto& message = queue.back();
18231841

@@ -1849,11 +1867,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18491867
}
18501868

18511869
void ProcessRequestQueue() {
1852-
for (auto& [tableId, queue] : DataQueues) {
1853-
auto& writeInfo = WriteInfos.at(tableId);
1870+
for (auto& [pathId, queue] : DataQueues) {
1871+
auto& writeInfo = WriteInfos.at(pathId);
18541872

18551873
if (!writeInfo.WriteTableActor->IsReady()) {
1856-
CA_LOG_D("ProcessRequestQueue " << tableId << " NOT READY queue=" << queue.size());
1874+
CA_LOG_D("ProcessRequestQueue " << pathId << " NOT READY queue=" << queue.size());
18571875
return;
18581876
}
18591877

@@ -2163,7 +2181,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
21632181
}
21642182

21652183
i64 GetFreeSpace(TWriteToken token) const {
2166-
auto& info = WriteInfos.at(token.TableId);
2184+
auto& info = WriteInfos.at(token.PathId);
21672185
return info.WriteTableActor->IsReady()
21682186
? MessageSettings.InFlightMemoryLimitPerActorBytes - info.WriteTableActor->GetMemory()
21692187
: std::numeric_limits<i64>::min(); // Can't use zero here because compute can use overcommit!
@@ -2901,11 +2919,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
29012919
TActorId WriteTableActorId;
29022920
};
29032921

2904-
THashMap<TTableId, TWriteInfo> WriteInfos;
2922+
THashMap<TPathId, TWriteInfo> WriteInfos;
29052923

29062924
EState State;
29072925
bool HasError = false;
2908-
THashMap<TTableId, std::queue<TBufferWriteMessage>> DataQueues;
2926+
THashMap<TPathId, std::queue<TBufferWriteMessage>> DataQueues;
29092927

29102928
struct TAckMessage {
29112929
TActorId ForwardActorId;

ydb/core/kqp/ut/effects/kqp_effects_ut.cpp

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,168 @@ Y_UNIT_TEST_SUITE(KqpEffects) {
524524
UNIT_ASSERT_VALUES_EQUAL(reads[0]["type"], "Scan");
525525
UNIT_ASSERT_VALUES_EQUAL(reads[0]["columns"].GetArraySafe().size(), 3);
526526
}
527+
528+
Y_UNIT_TEST_TWIN(AlterDuringUpsertTransaction, UseSink) {
529+
NKikimrConfig::TAppConfig appConfig;
530+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
531+
auto kikimr = DefaultKikimrRunner({}, appConfig);
532+
auto db = kikimr.GetTableClient();
533+
auto session1 = db.CreateSession().GetValueSync().GetSession();
534+
auto session2 = db.CreateSession().GetValueSync().GetSession();
535+
536+
auto ret = session1.ExecuteSchemeQuery(R"(
537+
CREATE TABLE `TestTable` (
538+
Key Uint32,
539+
Value1 String,
540+
PRIMARY KEY (Key)
541+
)
542+
)").ExtractValueSync();
543+
UNIT_ASSERT_C(ret.IsSuccess(), ret.GetIssues().ToString());
544+
545+
auto txControl = TTxControl::BeginTx();
546+
auto upsertResult = session1.ExecuteDataQuery(R"(
547+
UPSERT INTO `TestTable` (Key, Value1) VALUES
548+
(1u, "First"),
549+
(2u, "Second")
550+
)", txControl).ExtractValueSync();
551+
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
552+
auto tx1 = upsertResult.GetTransaction();
553+
UNIT_ASSERT(tx1);
554+
555+
auto alterResult = session2.ExecuteSchemeQuery(R"(
556+
ALTER TABLE `TestTable` ADD COLUMN Value2 Int32
557+
)").ExtractValueSync();
558+
UNIT_ASSERT_C(alterResult.IsSuccess(), alterResult.GetIssues().ToString());
559+
560+
auto commitResult = tx1->Commit().GetValueSync();
561+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString());
562+
UNIT_ASSERT_C(commitResult.GetIssues().ToString().contains("Scheme changed. Table: `/Root/TestTable`.")
563+
|| commitResult.GetIssues().ToString().contains("Table '/Root/TestTable' scheme changed."),
564+
commitResult.GetIssues().ToString());
565+
}
566+
567+
Y_UNIT_TEST_TWIN(AlterAfterUpsertTransaction, UseSink) {
568+
NKikimrConfig::TAppConfig appConfig;
569+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
570+
auto kikimr = DefaultKikimrRunner({}, appConfig);
571+
auto db = kikimr.GetTableClient();
572+
auto session1 = db.CreateSession().GetValueSync().GetSession();
573+
auto session2 = db.CreateSession().GetValueSync().GetSession();
574+
575+
auto ret = session1.ExecuteSchemeQuery(R"(
576+
CREATE TABLE `TestTable` (
577+
Key Uint32,
578+
Value1 String,
579+
PRIMARY KEY (Key)
580+
)
581+
)").ExtractValueSync();
582+
UNIT_ASSERT_C(ret.IsSuccess(), ret.GetIssues().ToString());
583+
584+
auto txControl = TTxControl::BeginTx();
585+
auto upsertResult = session1.ExecuteDataQuery(R"(
586+
UPSERT INTO `TestTable` (Key, Value1) VALUES
587+
(1u, "First"),
588+
(2u, "Second");
589+
SELECT * FROM `TestTable`;
590+
)", txControl).ExtractValueSync();
591+
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
592+
auto tx1 = upsertResult.GetTransaction();
593+
UNIT_ASSERT(tx1);
594+
595+
auto alterResult = session2.ExecuteSchemeQuery(R"(
596+
ALTER TABLE `TestTable` ADD COLUMN Value2 Int32
597+
)").ExtractValueSync();
598+
UNIT_ASSERT_C(alterResult.IsSuccess(), alterResult.GetIssues().ToString());
599+
600+
auto commitResult = tx1->Commit().GetValueSync();
601+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString());
602+
}
603+
604+
Y_UNIT_TEST_TWIN(AlterAfterUpsertBeforeUpsertTransaction, UseSink) {
605+
NKikimrConfig::TAppConfig appConfig;
606+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
607+
auto kikimr = DefaultKikimrRunner({}, appConfig);
608+
auto db = kikimr.GetTableClient();
609+
auto session1 = db.CreateSession().GetValueSync().GetSession();
610+
auto session2 = db.CreateSession().GetValueSync().GetSession();
611+
612+
auto ret = session1.ExecuteSchemeQuery(R"(
613+
CREATE TABLE `TestTable` (
614+
Key Uint32,
615+
Value1 String,
616+
PRIMARY KEY (Key)
617+
)
618+
)").ExtractValueSync();
619+
UNIT_ASSERT_C(ret.IsSuccess(), ret.GetIssues().ToString());
620+
621+
auto txControl = TTxControl::BeginTx();
622+
auto upsertResult = session1.ExecuteDataQuery(R"(
623+
UPSERT INTO `TestTable` (Key, Value1) VALUES
624+
(1u, "First"),
625+
(2u, "Second");
626+
SELECT * FROM `TestTable` WHERE Key = 1u;
627+
)", txControl).ExtractValueSync();
628+
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
629+
auto tx1 = upsertResult.GetTransaction();
630+
UNIT_ASSERT(tx1);
631+
632+
auto alterResult = session2.ExecuteSchemeQuery(R"(
633+
ALTER TABLE `TestTable` ADD COLUMN Value2 Int32
634+
)").ExtractValueSync();
635+
UNIT_ASSERT_C(alterResult.IsSuccess(), alterResult.GetIssues().ToString());
636+
637+
auto upsertResult2 = session1.ExecuteDataQuery(R"(
638+
UPSERT INTO `TestTable` (Key, Value1) VALUES
639+
(1u, "First"),
640+
(2u, "Second");
641+
)", TTxControl::Tx(*tx1)).ExtractValueSync();
642+
UNIT_ASSERT_C(upsertResult2.IsSuccess(), upsertResult2.GetIssues().ToString());
643+
644+
auto commitResult = tx1->Commit().GetValueSync();
645+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString());
646+
}
647+
648+
Y_UNIT_TEST_TWIN(AlterAfterUpsertBeforeUpsertSelectTransaction, UseSink) {
649+
NKikimrConfig::TAppConfig appConfig;
650+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
651+
auto kikimr = DefaultKikimrRunner({}, appConfig);
652+
auto db = kikimr.GetTableClient();
653+
auto session1 = db.CreateSession().GetValueSync().GetSession();
654+
auto session2 = db.CreateSession().GetValueSync().GetSession();
655+
656+
auto ret = session1.ExecuteSchemeQuery(R"(
657+
CREATE TABLE `TestTable` (
658+
Key Uint32,
659+
Value1 String,
660+
PRIMARY KEY (Key)
661+
)
662+
)").ExtractValueSync();
663+
UNIT_ASSERT_C(ret.IsSuccess(), ret.GetIssues().ToString());
664+
665+
auto txControl = TTxControl::BeginTx();
666+
auto upsertResult = session1.ExecuteDataQuery(R"(
667+
UPSERT INTO `TestTable` (Key, Value1) VALUES
668+
(1u, "First"),
669+
(2u, "Second");
670+
SELECT * FROM `TestTable` WHERE Key = 1u;
671+
)", txControl).ExtractValueSync();
672+
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
673+
auto tx1 = upsertResult.GetTransaction();
674+
UNIT_ASSERT(tx1);
675+
676+
auto alterResult = session2.ExecuteSchemeQuery(R"(
677+
ALTER TABLE `TestTable` ADD COLUMN Value2 Int32
678+
)").ExtractValueSync();
679+
UNIT_ASSERT_C(alterResult.IsSuccess(), alterResult.GetIssues().ToString());
680+
681+
auto upsertResult2 = session1.ExecuteDataQuery(R"(
682+
UPSERT INTO `TestTable` (Key, Value1) VALUES
683+
(1u, "First"),
684+
(2u, "Second");
685+
SELECT * FROM `TestTable` WHERE Key = 1u;
686+
)", TTxControl::Tx(*tx1)).ExtractValueSync();
687+
UNIT_ASSERT_VALUES_EQUAL_C(upsertResult2.GetStatus(), EStatus::ABORTED, upsertResult2.GetIssues().ToString());
688+
}
527689
}
528690

529691
} // namespace NKqp

0 commit comments

Comments
 (0)