Skip to content

Commit e6d795f

Browse files
authored
HTAP: ProposeTx + EvWrite (#9236)
1 parent 8dff0b9 commit e6d795f

File tree

9 files changed

+282
-65
lines changed

9 files changed

+282
-65
lines changed

ydb/core/kqp/common/kqp_tx.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
268268
}
269269

270270
bool ShouldExecuteDeferredEffects() const {
271-
if (HasUncommittedChangesRead) {
271+
if (HasUncommittedChangesRead || HasOlapTable) {
272272
return !DeferredEffects.Empty();
273273
}
274274

@@ -297,7 +297,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
297297
}
298298

299299
bool CanDeferEffects() const {
300-
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
300+
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
301301
return false;
302302
}
303303

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
127127
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
128128
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
129129
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
130-
const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
131-
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
130+
ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
131+
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo)
132132
: TBase(std::move(request), database, userToken, counters, tableServiceConfig,
133133
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
134134
, AsyncIoFactory(std::move(asyncIoFactory))
135-
, UseEvWrite(useEvWrite)
135+
, UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
136+
, HtapTx(tableServiceConfig.GetEnableHtapTx())
136137
, FederatedQuerySetup(federatedQuerySetup)
137138
, GUCSettings(GUCSettings)
138139
, ShardIdToTableInfo(shardIdToTableInfo)
139-
, HtapTx(htapTx)
140140
{
141141
Target = creator;
142142

@@ -1487,7 +1487,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
14871487
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
14881488

14891489
auto getShardTask = [&](ui64 shardId) -> TTask& {
1490-
YQL_ENSURE(!UseEvWrite);
1490+
YQL_ENSURE(!UseEvWriteForOltp);
14911491
auto it = shardTasks.find(shardId);
14921492
if (it != shardTasks.end()) {
14931493
return TasksGraph.GetTask(it->second);
@@ -1627,7 +1627,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16271627

16281628
void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap)
16291629
{
1630-
YQL_ENSURE(!UseEvWrite);
1630+
YQL_ENSURE(!UseEvWriteForOltp);
16311631
TShardState shardState;
16321632
shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
16331633
shardState.DatashardState.ConstructInPlace();
@@ -2030,7 +2030,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20302030
TDatashardTxs datashardTxs;
20312031
TEvWriteTxs evWriteTxs;
20322032
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
2033-
YQL_ENSURE(evWriteTxs.empty() || datashardTxs.empty());
20342033

20352034
// Single-shard datashard transactions are always immediate
20362035
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + Request.TopicOperations.GetSize() + sourceScanPartitionsCount) <= 1
@@ -2261,7 +2260,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
22612260
YQL_ENSURE(!locksList.empty(), "unexpected empty locks list in DataShardLocks");
22622261
NKikimrDataEvents::TKqpLocks* locks = nullptr;
22632262

2264-
if (UseEvWrite) {
2263+
if (UseEvWriteForOltp || ShardIdToTableInfo->Get(shardId).IsOlap) {
22652264
if (auto it = evWriteTxs.find(shardId); it != evWriteTxs.end()) {
22662265
locks = it->second->MutableLocks();
22672266
} else {
@@ -2333,15 +2332,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23332332
// Note: currently persistent channels are never used
23342333
!HasPersistentChannels &&
23352334
// Can't use volatile transactions for EvWrite at current time
2336-
!UseEvWrite);
2335+
evWriteTxs.empty());
23372336

23382337
const bool useGenericReadSets = (
23392338
// Use generic readsets when feature is explicitly enabled
23402339
AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() ||
23412340
// Volatile transactions must always use generic readsets
23422341
VolatileTx ||
23432342
// Transactions with topics must always use generic readsets
2344-
!topicTxs.empty());
2343+
!topicTxs.empty() ||
2344+
// HTAP transactions always use generic readsets
2345+
!evWriteTxs.empty());
23452346

23462347
if (!locksMap.empty() || VolatileTx ||
23472348
Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations())
@@ -2463,12 +2464,23 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24632464
std::sort(receivingShards.begin(), receivingShards.end());
24642465

24652466
for (auto& [shardId, shardTx] : datashardTxs) {
2466-
AFL_ENSURE(!columnShardArbiter);
24672467
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
2468-
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
2469-
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
2470-
if (arbiter) {
2471-
shardTx->MutableLocks()->SetArbiterShard(arbiter);
2468+
if (columnShardArbiter) {
2469+
shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter);
2470+
shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter);
2471+
if (sendingShardsSet.contains(shardId)) {
2472+
shardTx->MutableLocks()->AddSendingShards(shardId);
2473+
}
2474+
if (receivingShardsSet.contains(shardId)) {
2475+
shardTx->MutableLocks()->AddReceivingShards(shardId);
2476+
}
2477+
AFL_ENSURE(!arbiter);
2478+
} else {
2479+
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
2480+
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
2481+
if (arbiter) {
2482+
shardTx->MutableLocks()->SetArbiterShard(arbiter);
2483+
}
24722484
}
24732485
}
24742486

@@ -2844,11 +2856,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28442856

28452857
private:
28462858
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
2847-
bool UseEvWrite = false;
2859+
const bool UseEvWriteForOltp = false;
2860+
const bool HtapTx = false;
28482861
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
28492862
const TGUCSettings::TPtr GUCSettings;
28502863
TShardIdToTableInfoPtr ShardIdToTableInfo;
2851-
const bool HtapTx = false;
28522864

28532865
bool HasExternalSources = false;
28542866
bool SecretSnapshotRequired = false;
@@ -2890,13 +2902,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28902902
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
28912903
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
28922904
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
2893-
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool useEvWrite, ui32 statementResultIndex,
2905+
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
28942906
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
2895-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
2907+
const TShardIdToTableInfoPtr& shardIdToTableInfo)
28962908
{
28972909
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig,
28982910
std::move(asyncIoFactory), creator, userRequestContext,
2899-
useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx);
2911+
statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo);
29002912
}
29012913

29022914
} // namespace NKqp

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
9595
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
9696
const NKikimrConfig::TTableServiceConfig tableServiceConfig,
9797
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
98-
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
99-
const bool useEvWrite, ui32 statementResultIndex,
98+
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
10099
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
101-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
100+
const TShardIdToTableInfoPtr& shardIdToTableInfo);
102101

103102
IActor* CreateKqpSchemeExecuter(
104103
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,

ydb/core/kqp/executer_actor/kqp_executer_impl.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,17 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
8080
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
8181
const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
8282
TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
83-
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
84-
const bool useEvWrite, ui32 statementResultIndex,
83+
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
8584
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
86-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
85+
const TShardIdToTableInfoPtr& shardIdToTableInfo)
8786
{
8887
if (request.Transactions.empty()) {
8988
// commit-only or rollback-only data transaction
9089
return CreateKqpDataExecuter(
9190
std::move(request), database, userToken, counters, false, tableServiceConfig,
9291
std::move(asyncIoFactory), creator,
93-
userRequestContext, useEvWrite, statementResultIndex,
94-
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
92+
userRequestContext, statementResultIndex,
93+
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo
9594
);
9695
}
9796

@@ -113,8 +112,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
113112
return CreateKqpDataExecuter(
114113
std::move(request), database, userToken, counters, false, tableServiceConfig,
115114
std::move(asyncIoFactory), creator,
116-
userRequestContext, useEvWrite, statementResultIndex,
117-
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
115+
userRequestContext, statementResultIndex,
116+
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo
118117
);
119118

120119
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
@@ -128,8 +127,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
128127
return CreateKqpDataExecuter(
129128
std::move(request), database, userToken, counters, true,
130129
tableServiceConfig, std::move(asyncIoFactory), creator,
131-
userRequestContext, useEvWrite, statementResultIndex,
132-
federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx
130+
userRequestContext, statementResultIndex,
131+
federatedQuerySetup, GUCSettings, shardIdToTableInfo
133132
);
134133

135134
default:

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2031,10 +2031,9 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
20312031
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
20322032
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
20332033
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
2034-
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
2035-
const bool useEvWrite, ui32 statementResultIndex,
2034+
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
20362035
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
2037-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
2036+
const TShardIdToTableInfoPtr& shardIdToTableInfo);
20382037

20392038
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
20402039
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,11 @@ class TKqpQueryState : public TNonCopyable {
349349
return false;
350350
}
351351

352+
if (TxCtx->HasOlapTable) {
353+
// HTAP/OLAP transactions always use separate commit.
354+
return false;
355+
}
356+
352357
if (TxCtx->HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
353358
if (tx && tx->GetHasEffects()) {
354359
YQL_ENSURE(tx->ResultsSize() == 0);

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,7 +1233,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12331233
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
12341234
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
12351235

1236-
if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
1236+
if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution() || txCtx.HasOlapTable) {
12371237
request.UseImmediateEffects = true;
12381238
}
12391239
}
@@ -1292,29 +1292,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12921292
request.ResourceManager_ = ResourceManager_;
12931293
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());
12941294

1295-
bool useEvWrite = (
1296-
(txCtx->HasOlapTable // olap only
1297-
&& !txCtx->HasOltpTable
1298-
&& Settings.TableService.GetEnableOlapSink())
1299-
|| (txCtx->HasOltpTable // oltp only
1300-
&& !txCtx->HasOlapTable
1301-
&& Settings.TableService.GetEnableOltpSink())
1302-
|| (txCtx->HasOlapTable // htap
1303-
&& txCtx->HasOltpTable
1304-
&& Settings.TableService.GetEnableOlapSink()
1305-
&& Settings.TableService.GetEnableHtapTx()))
1306-
&& (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED
1307-
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY
1308-
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
1309-
|| (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML)
1310-
|| (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML));
13111295
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
13121296
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
13131297
RequestCounters, Settings.TableService,
13141298
AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(),
13151299
QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId),
1316-
useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo,
1317-
Settings.TableService.GetEnableHtapTx());
1300+
QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo);
13181301

13191302
auto exId = RegisterWithSameMailbox(executerActor);
13201303
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);

0 commit comments

Comments
 (0)