Skip to content

Commit 2878186

Browse files
authored
Improve tx defer (#10507)
1 parent 477ff30 commit 2878186

File tree

5 files changed

+100
-18
lines changed

5 files changed

+100
-18
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
206206
}
207207
}
208208

209-
if (txCtx.HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
209+
if (txCtx.NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
210210
return true;
211211
}
212212

@@ -343,5 +343,81 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
343343
return false;
344344
}
345345

346+
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery) {
347+
auto getTable = [](const NKqpProto::TKqpPhyTableId& table) {
348+
return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId());
349+
};
350+
351+
for (size_t index = 0; index < physicalQuery.TransactionsSize(); ++index) {
352+
const auto &tx = physicalQuery.GetTransactions()[index];
353+
for (const auto &stage : tx.GetStages()) {
354+
for (const auto &tableOp : stage.GetTableOps()) {
355+
switch (tableOp.GetTypeCase()) {
356+
case NKqpProto::TKqpPhyTableOperation::kReadRange:
357+
case NKqpProto::TKqpPhyTableOperation::kLookup:
358+
case NKqpProto::TKqpPhyTableOperation::kReadRanges: {
359+
if (modifiedTables.contains(getTable(tableOp.GetTable()))) {
360+
return true;
361+
}
362+
break;
363+
}
364+
case NKqpProto::TKqpPhyTableOperation::kReadOlapRange:
365+
case NKqpProto::TKqpPhyTableOperation::kUpsertRows:
366+
case NKqpProto::TKqpPhyTableOperation::kDeleteRows:
367+
modifiedTables.insert(getTable(tableOp.GetTable()));
368+
break;
369+
default:
370+
YQL_ENSURE(false, "unexpected type");
371+
}
372+
}
373+
374+
for (const auto& input : stage.GetInputs()) {
375+
switch (input.GetTypeCase()) {
376+
case NKqpProto::TKqpPhyConnection::kStreamLookup:
377+
if (modifiedTables.contains(getTable(input.GetStreamLookup().GetTable()))) {
378+
return true;
379+
}
380+
break;
381+
case NKqpProto::TKqpPhyConnection::kSequencer:
382+
return true;
383+
case NKqpProto::TKqpPhyConnection::kUnionAll:
384+
case NKqpProto::TKqpPhyConnection::kMap:
385+
case NKqpProto::TKqpPhyConnection::kHashShuffle:
386+
case NKqpProto::TKqpPhyConnection::kBroadcast:
387+
case NKqpProto::TKqpPhyConnection::kMapShard:
388+
case NKqpProto::TKqpPhyConnection::kShuffleShard:
389+
case NKqpProto::TKqpPhyConnection::kResult:
390+
case NKqpProto::TKqpPhyConnection::kValue:
391+
case NKqpProto::TKqpPhyConnection::kMerge:
392+
case NKqpProto::TKqpPhyConnection::TYPE_NOT_SET:
393+
break;
394+
}
395+
}
396+
397+
for (const auto& source : stage.GetSources()) {
398+
if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) {
399+
if (modifiedTables.contains(getTable(source.GetReadRangesSource().GetTable()))) {
400+
return true;
401+
}
402+
} else {
403+
return true;
404+
}
405+
}
406+
407+
for (const auto& sink : stage.GetSinks()) {
408+
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
409+
YQL_ENSURE(sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>());
410+
NKikimrKqp::TKqpTableSinkSettings settings;
411+
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
412+
modifiedTables.insert(getTable(settings.GetTable()));
413+
} else {
414+
return true;
415+
}
416+
}
417+
}
418+
}
419+
return false;
420+
}
421+
346422
} // namespace NKqp
347423
} // namespace NKikimr

ydb/core/kqp/common/kqp_tx.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ class TShardIdToTableInfo {
165165
};
166166
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;
167167

168+
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery);
169+
168170
class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
169171
public:
170172
explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry,
@@ -232,6 +234,11 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
232234
ParamsState = MakeIntrusive<TParamsState>();
233235
SnapshotHandle.Snapshot = IKqpGateway::TKqpSnapshot::InvalidSnapshot;
234236
HasImmediateEffects = false;
237+
238+
HasOlapTable = false;
239+
HasOltpTable = false;
240+
HasTableWrite = false;
241+
NeedUncommittedChangesFlush = false;
235242
}
236243

237244
TKqpTransactionInfo GetInfo() const;
@@ -268,7 +275,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
268275
}
269276

270277
bool ShouldExecuteDeferredEffects() const {
271-
if (HasUncommittedChangesRead || HasOlapTable) {
278+
if (NeedUncommittedChangesFlush || HasOlapTable) {
272279
return !DeferredEffects.Empty();
273280
}
274281

@@ -297,13 +304,20 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
297304
}
298305

299306
bool CanDeferEffects() const {
300-
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
307+
if (NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
301308
return false;
302309
}
303310

304311
return true;
305312
}
306313

314+
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery) {
315+
NeedUncommittedChangesFlush = HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery);
316+
if (NeedUncommittedChangesFlush) {
317+
ModifiedTablesSinceLastFlush.clear();
318+
}
319+
}
320+
307321
public:
308322
struct TParamsState : public TThrRefBase {
309323
ui32 LastIndex = 0;
@@ -334,6 +348,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
334348
bool HasOltpTable = false;
335349
bool HasTableWrite = false;
336350

351+
bool NeedUncommittedChangesFlush = false;
352+
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;
353+
337354
TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
338355
};
339356

ydb/core/kqp/provider/yql_kikimr_provider.h

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
292292
Invalidated = false;
293293
Readonly = false;
294294
Closed = false;
295-
HasUncommittedChangesRead = false;
296295
}
297296

298297
void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) {
@@ -409,17 +408,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
409408
}
410409

411410
auto& currentOps = TableOperations[table];
412-
const bool currentModify = currentOps & KikimrModifyOps();
413-
if (currentModify) {
414-
if (KikimrReadOps() & newOp) {
415-
HasUncommittedChangesRead = true;
416-
}
417-
418-
if ((*info)->GetHasIndexTables()) {
419-
HasUncommittedChangesRead = true;
420-
}
421-
}
422-
423411
currentOps |= newOp;
424412
}
425413

@@ -429,7 +417,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
429417
virtual ~TKikimrTransactionContextBase() = default;
430418

431419
public:
432-
bool HasUncommittedChangesRead = false;
433420
THashMap<TString, TYdbOperations> TableOperations;
434421
THashMap<TKikimrPathId, TString> TableByIdMap;
435422
TMaybe<NKikimrKqp::EIsolationLevel> EffectiveIsolationLevel;

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ class TKqpQueryState : public TNonCopyable {
357357
return false;
358358
}
359359

360-
if (TxCtx->HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
360+
if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
361361
if (tx && tx->GetHasEffects()) {
362362
YQL_ENSURE(tx->ResultsSize() == 0);
363363
// commit can be applied to the last transaction with effects

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
851851
"Write transactions between column and row tables are disabled at current time.");
852852
return false;
853853
}
854+
854855
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
856+
QueryState->TxCtx->ApplyPhysicalQuery(phyQuery);
855857
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
856858
EKikimrQueryType::Dml);
857859
if (!success) {
@@ -1222,7 +1224,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12221224
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
12231225
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
12241226

1225-
if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution() || txCtx.HasOlapTable) {
1227+
if (!txCtx.CanDeferEffects()) {
12261228
request.UseImmediateEffects = true;
12271229
}
12281230
}

0 commit comments

Comments
 (0)