Skip to content

Commit 1d5f8c9

Browse files
authored
[stable-25-1] EvWrite fixes (#18808)
2 parents 2b837d2 + 8ef7024 commit 1d5f8c9

File tree

10 files changed

+306
-26
lines changed

10 files changed

+306
-26
lines changed

ydb/core/kqp/common/kqp_tx.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414

1515
namespace NKikimr::NKqp {
1616

17+
namespace {
18+
// Avoid too many compute actors starting at the same time.
19+
constexpr size_t kMaxDeferredEffects = 100;
20+
}
21+
1722
class TKqpTxLock {
1823
public:
1924
using TKey = std::tuple<ui64, ui64, ui64, ui64>;
@@ -317,7 +322,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
317322
}
318323

319324
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool commit) {
320-
NeedUncommittedChangesFlush = HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit);
325+
NeedUncommittedChangesFlush = (DeferredEffects.Size() > kMaxDeferredEffects)
326+
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit);
321327
if (NeedUncommittedChangesFlush) {
322328
ModifiedTablesSinceLastFlush.clear();
323329
}

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
5151
if (action & EAction::WRITE) {
5252
ReadOnly = false;
5353
}
54+
++ActionsCount;
5455
}
5556

5657
void AddTopic(ui64 topicId, const TString& path) override {
@@ -310,7 +311,8 @@ class TKqpTransactionManager : public IKqpTransactionManager {
310311
}
311312

312313
bool NeedCommit() const override {
313-
const bool dontNeedCommit = IsEmpty() || IsReadOnly() && (IsSingleShard() || HasSnapshot());
314+
AFL_ENSURE(ActionsCount != 1 || IsSingleShard()); // ActionsCount == 1 then IsSingleShard()
315+
const bool dontNeedCommit = IsEmpty() || IsReadOnly() && ((ActionsCount == 1) || HasSnapshot());
314316
return !dontNeedCommit;
315317
}
316318

@@ -515,6 +517,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
515517
THashSet<ui64> ShardsIds;
516518
THashMap<ui64, TShardInfo> ShardsInfo;
517519
std::unordered_set<TString> TablePathes;
520+
ui64 ActionsCount = 0;
518521

519522
THashSet<ui32> ParticipantNodes;
520523

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,8 @@ class TxPlanSerializer {
245245
planNode.TypeName = "Effect";
246246
Visit(TExprBase(stage), planNode);
247247
} else if (stageBase.Outputs()) { // Sink
248+
AFL_ENSURE(stageBase.Outputs().Cast().Size() == 1);
248249
auto& planNode = AddPlanNode(phaseNode);
249-
planNode.TypeName = "Sink";
250250
Visit(TExprBase(stage), planNode);
251251
}
252252
}
@@ -961,7 +961,8 @@ class TxPlanSerializer {
961961
if (auto outputs = expr.Cast<TDqStageBase>().Outputs()) {
962962
for (auto output : outputs.Cast()) {
963963
if (auto sink = output.Maybe<TDqSink>()) {
964-
Visit(sink.Cast(), expr.Cast<TDqStageBase>(), stagePlanNode);
964+
AFL_ENSURE(outputs.Cast().Size() == 1);
965+
Visit(sink.Cast(), expr.Cast<TDqStageBase>(), planNode);
965966
}
966967
}
967968
}

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,10 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
291291
return ShardedWriteController->IsEmpty();
292292
}
293293

294+
const TTableId& GetTableId() const {
295+
return TableId;
296+
}
297+
294298
TVector<NKikimrDataEvents::TLock> GetLocks() const {
295299
return TxManager->GetLocks();
296300
}
@@ -1267,6 +1271,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
12671271
Stats.AffectedPartitions.clear();
12681272
}
12691273

1274+
private:
12701275
NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false);
12711276

12721277
TString LogPrefix;
@@ -1504,7 +1509,8 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
15041509
RuntimeError(
15051510
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
15061511
NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
1507-
TStringBuilder() << "Stream write can't be used for this query.",
1512+
TStringBuilder() << "Out of buffer memory. Used " << GetMemory()
1513+
<< " bytes of " << MessageSettings.InFlightMemoryLimitPerActorBytes << " bytes.",
15081514
{});
15091515
return;
15101516
}
@@ -1633,11 +1639,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
16331639
namespace {
16341640

16351641
struct TWriteToken {
1636-
TTableId TableId;
1642+
TPathId PathId;
16371643
ui64 Cookie;
16381644

16391645
bool IsEmpty() const {
1640-
return !TableId;
1646+
return !PathId;
16411647
}
16421648
};
16431649

@@ -1774,7 +1780,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
17741780
InconsistentTx = settings.TransactionSettings.InconsistentTx;
17751781
}
17761782

1777-
auto& writeInfo = WriteInfos[settings.TableId];
1783+
auto& writeInfo = WriteInfos[settings.TableId.PathId];
17781784
if (!writeInfo.WriteTableActor) {
17791785
TVector<NScheme::TTypeInfo> keyColumnTypes;
17801786
keyColumnTypes.reserve(settings.KeyColumns.size());
@@ -1803,6 +1809,19 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18031809
CA_LOG_D("Create new TableWriteActor for table `" << settings.TablePath << "` (" << settings.TableId << "). lockId=" << LockTxId << " " << writeInfo.WriteTableActorId);
18041810
}
18051811

1812+
if (writeInfo.WriteTableActor->GetTableId().SchemaVersion != settings.TableId.SchemaVersion) {
1813+
CA_LOG_E("Scheme changed for table `"
1814+
<< settings.TablePath << "`.");
1815+
ReplyErrorAndDie(
1816+
NYql::NDqProto::StatusIds::SCHEME_ERROR,
1817+
NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH,
1818+
TStringBuilder() << "Scheme changed. Table: `"
1819+
<< settings.TablePath << "`.",
1820+
{});
1821+
return;
1822+
}
1823+
AFL_ENSURE(writeInfo.WriteTableActor->GetTableId() == settings.TableId);
1824+
18061825
EnableStreamWrite &= settings.EnableStreamWrite;
18071826

18081827
auto cookie = writeInfo.WriteTableActor->Open(
@@ -1811,12 +1830,12 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18111830
std::move(settings.Columns),
18121831
std::move(settings.WriteIndex),
18131832
settings.Priority);
1814-
token = TWriteToken{settings.TableId, cookie};
1833+
token = TWriteToken{settings.TableId.PathId, cookie};
18151834
} else {
18161835
token = *ev->Get()->Token;
18171836
}
18181837

1819-
auto& queue = DataQueues[token.TableId];
1838+
auto& queue = DataQueues[token.PathId];
18201839
queue.emplace();
18211840
auto& message = queue.back();
18221841

@@ -1848,11 +1867,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18481867
}
18491868

18501869
void ProcessRequestQueue() {
1851-
for (auto& [tableId, queue] : DataQueues) {
1852-
auto& writeInfo = WriteInfos.at(tableId);
1870+
for (auto& [pathId, queue] : DataQueues) {
1871+
auto& writeInfo = WriteInfos.at(pathId);
18531872

18541873
if (!writeInfo.WriteTableActor->IsReady()) {
1855-
CA_LOG_D("ProcessRequestQueue " << tableId << " NOT READY queue=" << queue.size());
1874+
CA_LOG_D("ProcessRequestQueue " << pathId << " NOT READY queue=" << queue.size());
18561875
return;
18571876
}
18581877

@@ -1904,7 +1923,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19041923
ReplyErrorAndDie(
19051924
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
19061925
NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
1907-
TStringBuilder() << "Stream write queries aren't allowed.",
1926+
TStringBuilder() << "Out of buffer memory. Used " << GetTotalMemory()
1927+
<< " bytes of " << MessageSettings.InFlightMemoryLimitPerActorBytes << " bytes.",
19081928
{});
19091929
return false;
19101930
}
@@ -2161,7 +2181,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
21612181
}
21622182

21632183
i64 GetFreeSpace(TWriteToken token) const {
2164-
auto& info = WriteInfos.at(token.TableId);
2184+
auto& info = WriteInfos.at(token.PathId);
21652185
return info.WriteTableActor->IsReady()
21662186
? MessageSettings.InFlightMemoryLimitPerActorBytes - info.WriteTableActor->GetMemory()
21672187
: std::numeric_limits<i64>::min(); // Can't use zero here because compute can use overcommit!
@@ -2899,11 +2919,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
28992919
TActorId WriteTableActorId;
29002920
};
29012921

2902-
THashMap<TTableId, TWriteInfo> WriteInfos;
2922+
THashMap<TPathId, TWriteInfo> WriteInfos;
29032923

29042924
EState State;
29052925
bool HasError = false;
2906-
THashMap<TTableId, std::queue<TBufferWriteMessage>> DataQueues;
2926+
THashMap<TPathId, std::queue<TBufferWriteMessage>> DataQueues;
29072927

29082928
struct TAckMessage {
29092929
TActorId ForwardActorId;

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,8 +438,6 @@ class TRowBuilder {
438438

439439
TVector<TCellInfo> CellsInfo;
440440
TVector<TCell> Cells;
441-
442-
TOwnedCellVecBatch Batch;
443441
};
444442

445443
class TColumnDataBatcher : public IDataBatcher {

ydb/core/kqp/ut/effects/kqp_effects_ut.cpp

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,168 @@ Y_UNIT_TEST_SUITE(KqpEffects) {
524524
UNIT_ASSERT_VALUES_EQUAL(reads[0]["type"], "Scan");
525525
UNIT_ASSERT_VALUES_EQUAL(reads[0]["columns"].GetArraySafe().size(), 3);
526526
}
527+
528+
Y_UNIT_TEST_TWIN(AlterDuringUpsertTransaction, UseSink) {
529+
NKikimrConfig::TAppConfig appConfig;
530+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
531+
auto kikimr = DefaultKikimrRunner({}, appConfig);
532+
auto db = kikimr.GetTableClient();
533+
auto session1 = db.CreateSession().GetValueSync().GetSession();
534+
auto session2 = db.CreateSession().GetValueSync().GetSession();
535+
536+
auto ret = session1.ExecuteSchemeQuery(R"(
537+
CREATE TABLE `TestTable` (
538+
Key Uint32,
539+
Value1 String,
540+
PRIMARY KEY (Key)
541+
)
542+
)").ExtractValueSync();
543+
UNIT_ASSERT_C(ret.IsSuccess(), ret.GetIssues().ToString());
544+
545+
auto txControl = TTxControl::BeginTx();
546+
auto upsertResult = session1.ExecuteDataQuery(R"(
547+
UPSERT INTO `TestTable` (Key, Value1) VALUES
548+
(1u, "First"),
549+
(2u, "Second")
550+
)", txControl).ExtractValueSync();
551+
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
552+
auto tx1 = upsertResult.GetTransaction();
553+
UNIT_ASSERT(tx1);
554+
555+
auto alterResult = session2.ExecuteSchemeQuery(R"(
556+
ALTER TABLE `TestTable` ADD COLUMN Value2 Int32
557+
)").ExtractValueSync();
558+
UNIT_ASSERT_C(alterResult.IsSuccess(), alterResult.GetIssues().ToString());
559+
560+
auto commitResult = tx1->Commit().GetValueSync();
561+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString());
562+
UNIT_ASSERT_C(commitResult.GetIssues().ToString().contains("Scheme changed. Table: `/Root/TestTable`.")
563+
|| commitResult.GetIssues().ToString().contains("Table '/Root/TestTable' scheme changed."),
564+
commitResult.GetIssues().ToString());
565+
}
566+
567+
Y_UNIT_TEST_TWIN(AlterAfterUpsertTransaction, UseSink) {
568+
NKikimrConfig::TAppConfig appConfig;
569+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
570+
auto kikimr = DefaultKikimrRunner({}, appConfig);
571+
auto db = kikimr.GetTableClient();
572+
auto session1 = db.CreateSession().GetValueSync().GetSession();
573+
auto session2 = db.CreateSession().GetValueSync().GetSession();
574+
575+
auto ret = session1.ExecuteSchemeQuery(R"(
576+
CREATE TABLE `TestTable` (
577+
Key Uint32,
578+
Value1 String,
579+
PRIMARY KEY (Key)
580+
)
581+
)").ExtractValueSync();
582+
UNIT_ASSERT_C(ret.IsSuccess(), ret.GetIssues().ToString());
583+
584+
auto txControl = TTxControl::BeginTx();
585+
auto upsertResult = session1.ExecuteDataQuery(R"(
586+
UPSERT INTO `TestTable` (Key, Value1) VALUES
587+
(1u, "First"),
588+
(2u, "Second");
589+
SELECT * FROM `TestTable`;
590+
)", txControl).ExtractValueSync();
591+
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
592+
auto tx1 = upsertResult.GetTransaction();
593+
UNIT_ASSERT(tx1);
594+
595+
auto alterResult = session2.ExecuteSchemeQuery(R"(
596+
ALTER TABLE `TestTable` ADD COLUMN Value2 Int32
597+
)").ExtractValueSync();
598+
UNIT_ASSERT_C(alterResult.IsSuccess(), alterResult.GetIssues().ToString());
599+
600+
auto commitResult = tx1->Commit().GetValueSync();
601+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString());
602+
}
603+
604+
Y_UNIT_TEST_TWIN(AlterAfterUpsertBeforeUpsertTransaction, UseSink) {
605+
NKikimrConfig::TAppConfig appConfig;
606+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
607+
auto kikimr = DefaultKikimrRunner({}, appConfig);
608+
auto db = kikimr.GetTableClient();
609+
auto session1 = db.CreateSession().GetValueSync().GetSession();
610+
auto session2 = db.CreateSession().GetValueSync().GetSession();
611+
612+
auto ret = session1.ExecuteSchemeQuery(R"(
613+
CREATE TABLE `TestTable` (
614+
Key Uint32,
615+
Value1 String,
616+
PRIMARY KEY (Key)
617+
)
618+
)").ExtractValueSync();
619+
UNIT_ASSERT_C(ret.IsSuccess(), ret.GetIssues().ToString());
620+
621+
auto txControl = TTxControl::BeginTx();
622+
auto upsertResult = session1.ExecuteDataQuery(R"(
623+
UPSERT INTO `TestTable` (Key, Value1) VALUES
624+
(1u, "First"),
625+
(2u, "Second");
626+
SELECT * FROM `TestTable` WHERE Key = 1u;
627+
)", txControl).ExtractValueSync();
628+
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
629+
auto tx1 = upsertResult.GetTransaction();
630+
UNIT_ASSERT(tx1);
631+
632+
auto alterResult = session2.ExecuteSchemeQuery(R"(
633+
ALTER TABLE `TestTable` ADD COLUMN Value2 Int32
634+
)").ExtractValueSync();
635+
UNIT_ASSERT_C(alterResult.IsSuccess(), alterResult.GetIssues().ToString());
636+
637+
auto upsertResult2 = session1.ExecuteDataQuery(R"(
638+
UPSERT INTO `TestTable` (Key, Value1) VALUES
639+
(1u, "First"),
640+
(2u, "Second");
641+
)", TTxControl::Tx(*tx1)).ExtractValueSync();
642+
UNIT_ASSERT_C(upsertResult2.IsSuccess(), upsertResult2.GetIssues().ToString());
643+
644+
auto commitResult = tx1->Commit().GetValueSync();
645+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString());
646+
}
647+
648+
Y_UNIT_TEST_TWIN(AlterAfterUpsertBeforeUpsertSelectTransaction, UseSink) {
649+
NKikimrConfig::TAppConfig appConfig;
650+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
651+
auto kikimr = DefaultKikimrRunner({}, appConfig);
652+
auto db = kikimr.GetTableClient();
653+
auto session1 = db.CreateSession().GetValueSync().GetSession();
654+
auto session2 = db.CreateSession().GetValueSync().GetSession();
655+
656+
auto ret = session1.ExecuteSchemeQuery(R"(
657+
CREATE TABLE `TestTable` (
658+
Key Uint32,
659+
Value1 String,
660+
PRIMARY KEY (Key)
661+
)
662+
)").ExtractValueSync();
663+
UNIT_ASSERT_C(ret.IsSuccess(), ret.GetIssues().ToString());
664+
665+
auto txControl = TTxControl::BeginTx();
666+
auto upsertResult = session1.ExecuteDataQuery(R"(
667+
UPSERT INTO `TestTable` (Key, Value1) VALUES
668+
(1u, "First"),
669+
(2u, "Second");
670+
SELECT * FROM `TestTable` WHERE Key = 1u;
671+
)", txControl).ExtractValueSync();
672+
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
673+
auto tx1 = upsertResult.GetTransaction();
674+
UNIT_ASSERT(tx1);
675+
676+
auto alterResult = session2.ExecuteSchemeQuery(R"(
677+
ALTER TABLE `TestTable` ADD COLUMN Value2 Int32
678+
)").ExtractValueSync();
679+
UNIT_ASSERT_C(alterResult.IsSuccess(), alterResult.GetIssues().ToString());
680+
681+
auto upsertResult2 = session1.ExecuteDataQuery(R"(
682+
UPSERT INTO `TestTable` (Key, Value1) VALUES
683+
(1u, "First"),
684+
(2u, "Second");
685+
SELECT * FROM `TestTable` WHERE Key = 1u;
686+
)", TTxControl::Tx(*tx1)).ExtractValueSync();
687+
UNIT_ASSERT_VALUES_EQUAL_C(upsertResult2.GetStatus(), EStatus::ABORTED, upsertResult2.GetIssues().ToString());
688+
}
527689
}
528690

529691
} // namespace NKqp

0 commit comments

Comments
 (0)