Skip to content

Commit d139453

Browse files
committed
Fix sink flags (#19985)
1 parent 8f01128 commit d139453

File tree

6 files changed

+72
-12
lines changed

6 files changed

+72
-12
lines changed

ydb/core/kqp/common/kqp_tx.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,10 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
360360
bool HasTableWrite = false;
361361
bool HasTableRead = false;
362362

363+
std::optional<bool> EnableOltpSink;
364+
std::optional<bool> EnableOlapSink;
365+
std::optional<bool> EnableHtapTx;
366+
363367
bool NeedUncommittedChangesFlush = false;
364368
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;
365369

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
115115
{
116116
Target = creator;
117117

118+
<<<<<<< HEAD
118119
YQL_ENSURE(!TxManager || tableServiceConfig.GetEnableOltpSink());
120+
=======
121+
>>>>>>> 3a99069cf36 (Fix sink flags (#19985))
119122
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
120123

121124
if (Request.AcquireLocksTxId || Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback) {

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
542542
YQL_ENSURE(querySettings.Type);
543543
queryProto.SetType(GetPhyQueryType(*querySettings.Type));
544544

545+
queryProto.SetEnableOltpSink(Config->EnableOltpSink);
546+
queryProto.SetEnableOlapSink(Config->EnableOlapSink);
547+
queryProto.SetEnableHtapTx(Config->EnableHtapTx);
548+
545549
for (const auto& queryBlock : dataQueryBlocks) {
546550
auto queryBlockSettings = TKiDataQueryBlockSettings::Parse(queryBlock);
547551
if (queryBlockSettings.HasUncommittedChangesRead) {

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,55 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
928928
}
929929

930930
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
931+
932+
auto checkSchemeTx = [&]() {
933+
for (const auto &tx : phyQuery.GetTransactions()) {
934+
if (tx.GetType() != NKqpProto::TKqpPhyTx::TYPE_SCHEME) {
935+
return false;
936+
}
937+
}
938+
return true;
939+
};
940+
941+
if (phyQuery.HasEnableOltpSink()) {
942+
if (!QueryState->TxCtx->EnableOltpSink) {
943+
QueryState->TxCtx->EnableOltpSink = phyQuery.GetEnableOltpSink();
944+
}
945+
if (QueryState->TxCtx->EnableOltpSink != phyQuery.GetEnableOltpSink()) {
946+
ReplyQueryError(Ydb::StatusIds::ABORTED,
947+
"Transaction execution settings have been changed (EnableOltpSink).");
948+
return false;
949+
}
950+
} else {
951+
AFL_ENSURE(checkSchemeTx());
952+
}
953+
954+
if (phyQuery.HasEnableOlapSink()) {
955+
if (!QueryState->TxCtx->EnableOlapSink) {
956+
QueryState->TxCtx->EnableOlapSink = phyQuery.GetEnableOlapSink();
957+
}
958+
if (QueryState->TxCtx->EnableOlapSink != phyQuery.GetEnableOlapSink()) {
959+
ReplyQueryError(Ydb::StatusIds::ABORTED,
960+
"Transaction execution settings have been changed (EnableOlapSink).");
961+
return false;
962+
}
963+
} else {
964+
AFL_ENSURE(checkSchemeTx());
965+
}
966+
967+
if (phyQuery.HasEnableHtapTx()) {
968+
if (!QueryState->TxCtx->EnableHtapTx) {
969+
QueryState->TxCtx->EnableHtapTx = phyQuery.GetEnableHtapTx();
970+
}
971+
if (QueryState->TxCtx->EnableHtapTx != phyQuery.GetEnableHtapTx()) {
972+
ReplyQueryError(Ydb::StatusIds::ABORTED,
973+
"Transaction execution settings have been changed (EnableHtapTx).");
974+
return false;
975+
}
976+
} else {
977+
AFL_ENSURE(checkSchemeTx());
978+
}
979+
931980
const bool hasOlapWrite = ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
932981
const bool hasOltpWrite = ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
933982
const bool hasOlapRead = ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery);
@@ -937,7 +986,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
937986
QueryState->TxCtx->HasTableWrite |= hasOlapWrite || hasOltpWrite;
938987
QueryState->TxCtx->HasTableRead |= hasOlapRead || hasOltpRead;
939988
if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite
940-
&& !Settings.TableService.GetEnableHtapTx() && !QueryState->IsSplitted()) {
989+
&& !QueryState->TxCtx->EnableHtapTx.value_or(false) && !QueryState->IsSplitted()) {
941990
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
942991
"Write transactions between column and row tables are disabled at current time.");
943992
return false;
@@ -1178,7 +1227,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
11781227
return;
11791228
}
11801229

1181-
if (Settings.TableService.GetEnableOltpSink() && isBatchQuery) {
1230+
if (QueryState->TxCtx->EnableOltpSink.value_or(false) && isBatchQuery) {
11821231
if (!Settings.TableService.GetEnableBatchUpdates()) {
11831232
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
11841233
"BATCH operations are disabled by EnableBatchUpdates flag.");
@@ -1353,7 +1402,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
13531402
request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef();
13541403
}
13551404

1356-
if (Settings.TableService.GetEnableOltpSink()) {
1405+
if (txCtx.EnableOltpSink.value_or(false)) {
13571406
if (txCtx.TxHasEffects() || hasLocks || txCtx.TopicOperations.HasOperations()) {
13581407
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
13591408
}
@@ -1390,7 +1439,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
13901439
}
13911440
}
13921441
request.TopicOperations = std::move(txCtx.TopicOperations);
1393-
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
1442+
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || txCtx.EnableOlapSink.value_or(false))) {
13941443
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
13951444

13961445
if (!txCtx.CanDeferEffects()) {
@@ -1453,12 +1502,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
14531502
request.ResourceManager_ = ResourceManager_;
14541503
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());
14551504

1456-
if (Settings.TableService.GetEnableOltpSink() && !txCtx->TxManager) {
1505+
if (txCtx->EnableOltpSink.value_or(false) && !txCtx->TxManager) {
14571506
txCtx->TxManager = CreateKqpTransactionManager();
14581507
txCtx->TxManager->SetAllowVolatile(AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions());
14591508
}
14601509

1461-
if (Settings.TableService.GetEnableOltpSink()
1510+
if (txCtx->EnableOltpSink.value_or(false)
14621511
&& !txCtx->BufferActorId
14631512
&& (txCtx->HasTableWrite || request.TopicOperations.GetSize() != 0)) {
14641513
txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations));
@@ -1493,7 +1542,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
14931542
};
14941543
auto* actor = CreateKqpBufferWriterActor(std::move(settings));
14951544
txCtx->BufferActorId = RegisterWithSameMailbox(actor);
1496-
} else if (Settings.TableService.GetEnableOltpSink() && txCtx->BufferActorId) {
1545+
} else if (txCtx->EnableOltpSink.value_or(false) && txCtx->BufferActorId) {
14971546
txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations));
14981547
txCtx->TxManager->AddTopicsToShards();
14991548
}

ydb/core/protos/kqp_physical.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,4 +576,8 @@ message TKqpPhyQuery {
576576
string QueryDiagnostics = 10;
577577

578578
repeated TKqpTableInfo ViewInfos = 11;
579+
580+
optional bool EnableOltpSink = 12;
581+
optional bool EnableOlapSink = 13;
582+
optional bool EnableHtapTx = 14;
579583
}

ydb/services/persqueue_v1/ut/topic_service_ut.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -412,11 +412,7 @@ Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TUpdateOffsetsInTransacti
412412
auto result = tx->Commit().ExtractValueSync();
413413
Cerr << ">>> CommitTx >>>" << Endl;
414414
UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
415-
if (server->ServerSettings.AppConfig->GetTableServiceConfig().GetEnableOltpSink()) {
416-
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
417-
} else {
418-
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
419-
}
415+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
420416
}
421417

422418
}

0 commit comments

Comments
 (0)