@@ -928,6 +928,55 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
928
928
}
929
929
930
930
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery ->GetPhysicalQuery ();
931
+
932
+ auto checkSchemeTx = [&]() {
933
+ for (const auto &tx : phyQuery.GetTransactions ()) {
934
+ if (tx.GetType () != NKqpProto::TKqpPhyTx::TYPE_SCHEME) {
935
+ return false ;
936
+ }
937
+ }
938
+ return true ;
939
+ };
940
+
941
+ if (phyQuery.HasEnableOltpSink ()) {
942
+ if (!QueryState->TxCtx ->EnableOltpSink ) {
943
+ QueryState->TxCtx ->EnableOltpSink = phyQuery.GetEnableOltpSink ();
944
+ }
945
+ if (QueryState->TxCtx ->EnableOltpSink != phyQuery.GetEnableOltpSink ()) {
946
+ ReplyQueryError (Ydb::StatusIds::ABORTED,
947
+ " Transaction execution settings have been changed (EnableOltpSink)." );
948
+ return false ;
949
+ }
950
+ } else {
951
+ AFL_ENSURE (checkSchemeTx ());
952
+ }
953
+
954
+ if (phyQuery.HasEnableOlapSink ()) {
955
+ if (!QueryState->TxCtx ->EnableOlapSink ) {
956
+ QueryState->TxCtx ->EnableOlapSink = phyQuery.GetEnableOlapSink ();
957
+ }
958
+ if (QueryState->TxCtx ->EnableOlapSink != phyQuery.GetEnableOlapSink ()) {
959
+ ReplyQueryError (Ydb::StatusIds::ABORTED,
960
+ " Transaction execution settings have been changed (EnableOlapSink)." );
961
+ return false ;
962
+ }
963
+ } else {
964
+ AFL_ENSURE (checkSchemeTx ());
965
+ }
966
+
967
+ if (phyQuery.HasEnableHtapTx ()) {
968
+ if (!QueryState->TxCtx ->EnableHtapTx ) {
969
+ QueryState->TxCtx ->EnableHtapTx = phyQuery.GetEnableHtapTx ();
970
+ }
971
+ if (QueryState->TxCtx ->EnableHtapTx != phyQuery.GetEnableHtapTx ()) {
972
+ ReplyQueryError (Ydb::StatusIds::ABORTED,
973
+ " Transaction execution settings have been changed (EnableHtapTx)." );
974
+ return false ;
975
+ }
976
+ } else {
977
+ AFL_ENSURE (checkSchemeTx ());
978
+ }
979
+
931
980
const bool hasOlapWrite = ::NKikimr::NKqp::HasOlapTableWriteInTx (phyQuery);
932
981
const bool hasOltpWrite = ::NKikimr::NKqp::HasOltpTableWriteInTx (phyQuery);
933
982
const bool hasOlapRead = ::NKikimr::NKqp::HasOlapTableReadInTx (phyQuery);
@@ -937,7 +986,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
937
986
QueryState->TxCtx ->HasTableWrite |= hasOlapWrite || hasOltpWrite;
938
987
QueryState->TxCtx ->HasTableRead |= hasOlapRead || hasOltpRead;
939
988
if (QueryState->TxCtx ->HasOlapTable && QueryState->TxCtx ->HasOltpTable && QueryState->TxCtx ->HasTableWrite
940
- && !Settings. TableService . GetEnableHtapTx ( ) && !QueryState->IsSplitted ()) {
989
+ && !QueryState-> TxCtx -> EnableHtapTx . value_or ( false ) && !QueryState->IsSplitted ()) {
941
990
ReplyQueryError (Ydb::StatusIds::PRECONDITION_FAILED,
942
991
" Write transactions between column and row tables are disabled at current time." );
943
992
return false ;
@@ -1178,7 +1227,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
1178
1227
return ;
1179
1228
}
1180
1229
1181
- if (Settings. TableService . GetEnableOltpSink ( ) && isBatchQuery) {
1230
+ if (QueryState-> TxCtx -> EnableOltpSink . value_or ( false ) && isBatchQuery) {
1182
1231
if (!Settings.TableService .GetEnableBatchUpdates ()) {
1183
1232
ReplyQueryError (Ydb::StatusIds::PRECONDITION_FAILED,
1184
1233
" BATCH operations are disabled by EnableBatchUpdates flag." );
@@ -1353,7 +1402,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
1353
1402
request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes .Get ().GetRef ();
1354
1403
}
1355
1404
1356
- if (Settings. TableService . GetEnableOltpSink ( )) {
1405
+ if (txCtx. EnableOltpSink . value_or ( false )) {
1357
1406
if (txCtx.TxHasEffects () || hasLocks || txCtx.TopicOperations .HasOperations ()) {
1358
1407
request.AcquireLocksTxId = txCtx.Locks .GetLockTxId ();
1359
1408
}
@@ -1390,7 +1439,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
1390
1439
}
1391
1440
}
1392
1441
request.TopicOperations = std::move (txCtx.TopicOperations );
1393
- } else if (QueryState->ShouldAcquireLocks (tx) && (!txCtx.HasOlapTable || Settings. TableService . GetEnableOlapSink ( ))) {
1442
+ } else if (QueryState->ShouldAcquireLocks (tx) && (!txCtx.HasOlapTable || txCtx. EnableOlapSink . value_or ( false ))) {
1394
1443
request.AcquireLocksTxId = txCtx.Locks .GetLockTxId ();
1395
1444
1396
1445
if (!txCtx.CanDeferEffects ()) {
@@ -1453,12 +1502,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
1453
1502
request.ResourceManager_ = ResourceManager_;
1454
1503
LOG_D (" Sending to Executer TraceId: " << request.TraceId .GetTraceId () << " " << request.TraceId .GetSpanIdSize ());
1455
1504
1456
- if (Settings. TableService . GetEnableOltpSink ( ) && !txCtx->TxManager ) {
1505
+ if (txCtx-> EnableOltpSink . value_or ( false ) && !txCtx->TxManager ) {
1457
1506
txCtx->TxManager = CreateKqpTransactionManager ();
1458
1507
txCtx->TxManager ->SetAllowVolatile (AppData ()->FeatureFlags .GetEnableDataShardVolatileTransactions ());
1459
1508
}
1460
1509
1461
- if (Settings. TableService . GetEnableOltpSink ( )
1510
+ if (txCtx-> EnableOltpSink . value_or ( false )
1462
1511
&& !txCtx->BufferActorId
1463
1512
&& (txCtx->HasTableWrite || request.TopicOperations .GetSize () != 0 )) {
1464
1513
txCtx->TxManager ->SetTopicOperations (std::move (request.TopicOperations ));
@@ -1493,7 +1542,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
1493
1542
};
1494
1543
auto * actor = CreateKqpBufferWriterActor (std::move (settings));
1495
1544
txCtx->BufferActorId = RegisterWithSameMailbox (actor);
1496
- } else if (Settings. TableService . GetEnableOltpSink ( ) && txCtx->BufferActorId ) {
1545
+ } else if (txCtx-> EnableOltpSink . value_or ( false ) && txCtx->BufferActorId ) {
1497
1546
txCtx->TxManager ->SetTopicOperations (std::move (request.TopicOperations ));
1498
1547
txCtx->TxManager ->AddTopicsToShards ();
1499
1548
}
0 commit comments