Skip to content

Commit 00e255a

Browse files
authored
[Stable-25-1-analytics] EvWrite fixes (#18944)
2 parents 97e3d04 + 6db3c69 commit 00e255a

File tree

75 files changed

+3720
-518
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+3720
-518
lines changed

ydb/core/kqp/common/kqp_tx.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414

1515
namespace NKikimr::NKqp {
1616

17+
namespace {
18+
// Avoid too many compute actors starting at the same time.
19+
constexpr size_t kMaxDeferredEffects = 100;
20+
}
21+
1722
class TKqpTxLock {
1823
public:
1924
using TKey = std::tuple<ui64, ui64, ui64, ui64>;
@@ -317,7 +322,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
317322
}
318323

319324
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool commit) {
320-
NeedUncommittedChangesFlush = HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit);
325+
NeedUncommittedChangesFlush = (DeferredEffects.Size() > kMaxDeferredEffects)
326+
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit);
321327
if (NeedUncommittedChangesFlush) {
322328
ModifiedTablesSinceLastFlush.clear();
323329
}

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
5151
if (action & EAction::WRITE) {
5252
ReadOnly = false;
5353
}
54+
++ActionsCount;
5455
}
5556

5657
void AddTopic(ui64 topicId, const TString& path) override {
@@ -310,7 +311,8 @@ class TKqpTransactionManager : public IKqpTransactionManager {
310311
}
311312

312313
bool NeedCommit() const override {
313-
const bool dontNeedCommit = IsEmpty() || IsReadOnly() && (IsSingleShard() || HasSnapshot());
314+
AFL_ENSURE(ActionsCount != 1 || IsSingleShard()); // ActionsCount == 1 then IsSingleShard()
315+
const bool dontNeedCommit = IsEmpty() || IsReadOnly() && ((ActionsCount == 1) || HasSnapshot());
314316
return !dontNeedCommit;
315317
}
316318

@@ -322,6 +324,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
322324
AFL_ENSURE(!CollectOnly);
323325
AFL_ENSURE(State == ETransactionState::COLLECTING);
324326
AFL_ENSURE(NeedCommit());
327+
AFL_ENSURE(!BrokenLocks());
325328

326329
THashSet<ui64> sendingColumnShardsSet;
327330
THashSet<ui64> receivingColumnShardsSet;
@@ -336,7 +339,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
336339
SendingShards.insert(shardId);
337340
}
338341
}
339-
if (!shardInfo.Locks.empty()) {
342+
if (!shardInfo.Locks.empty() || (shardInfo.Flags & EAction::READ)) {
340343
SendingShards.insert(shardId);
341344
if (shardInfo.IsOlap) {
342345
sendingColumnShardsSet.insert(shardId);
@@ -428,6 +431,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
428431
|| (State == ETransactionState::COLLECTING
429432
&& IsSingleShard()));
430433
AFL_ENSURE(NeedCommit());
434+
AFL_ENSURE(!BrokenLocks());
431435
State = ETransactionState::EXECUTING;
432436

433437
for (auto& [_, shardInfo] : ShardsInfo) {
@@ -513,6 +517,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
513517
THashSet<ui64> ShardsIds;
514518
THashMap<ui64, TShardInfo> ShardsInfo;
515519
std::unordered_set<TString> TablePathes;
520+
ui64 ActionsCount = 0;
516521

517522
THashSet<ui32> ParticipantNodes;
518523

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,9 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
824824
WriteActorImmediateWritesRetries = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWritesRetries", true);
825825
WriteActorPrepareWrites = KqpGroup->GetCounter("SinkWrites/WriteActorPrepareWrites", true);
826826

827+
WriteActorWriteOnlyOperations = KqpGroup->GetCounter("SinkWrites/WriteActorWriteOnlyOperations", true);
828+
WriteActorReadWriteOperations = KqpGroup->GetCounter("SinkWrites/WriteActorReadWriteOperations", true);
829+
827830
BufferActorFlushes = KqpGroup->GetCounter("SinkWrites/BufferActorFlushes", true);
828831
BufferActorImmediateCommits = KqpGroup->GetCounter("SinkWrites/BufferActorImmediateCommits", true);
829832
BufferActorDistributedCommits = KqpGroup->GetCounter("SinkWrites/BufferActorDistributedCommits", true);

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,9 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
419419
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWritesRetries;
420420
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorPrepareWrites;
421421

422+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorWriteOnlyOperations;
423+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorReadWriteOperations;
424+
422425
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorFlushes;
423426
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorImmediateCommits;
424427
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorDistributedCommits;

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1626,7 +1626,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16261626
shardTasks.emplace(shardId, task.Id);
16271627

16281628
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1629-
BuildSinks(stage, task);
1629+
BuildSinks(stage, stageInfo, task);
16301630

16311631
return task;
16321632
};

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
984984
task.Meta.Type = TTaskMeta::TTaskType::Compute;
985985

986986
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
987-
BuildSinks(stage, task);
987+
BuildSinks(stage, stageInfo, task);
988988

989989
LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
990990
}
@@ -1008,15 +1008,20 @@ class TKqpExecuterBase : public TActor<TDerived> {
10081008
output.SinkSettings = extSink.GetSettings();
10091009
}
10101010

1011-
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, TKqpTasksGraph::TTaskType& task) {
1011+
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
10121012
const auto& intSink = sink.GetInternalSink();
10131013
auto& output = task.Outputs[sink.GetOutputIndex()];
10141014
output.Type = TTaskOutputType::Sink;
10151015
output.SinkType = intSink.GetType();
10161016

10171017
if (intSink.GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
10181018
NKikimrKqp::TKqpTableSinkSettings settings;
1019-
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
1019+
if (!stageInfo.Meta.ResolvedSinkSettings) {
1020+
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
1021+
} else {
1022+
settings = *stageInfo.Meta.ResolvedSinkSettings;
1023+
}
1024+
10201025
auto& lockTxId = TasksGraph.GetMeta().LockTxId;
10211026
if (lockTxId) {
10221027
settings.SetLockTxId(*lockTxId);
@@ -1044,14 +1049,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
10441049
}
10451050
}
10461051

1047-
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) {
1052+
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
10481053
if (stage.SinksSize() > 0) {
10491054
YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported");
10501055
const auto& sink = stage.GetSinks(0);
10511056
YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size());
10521057

10531058
if (sink.HasInternalSink()) {
1054-
BuildInternalSinks(sink, task);
1059+
BuildInternalSinks(sink, stageInfo, task);
10551060
} else if (sink.HasExternalSink()) {
10561061
BuildExternalSinks(sink, task);
10571062
} else {
@@ -1130,7 +1135,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
11301135

11311136
// finish building
11321137
for (auto taskId : tasksIds) {
1133-
BuildSinks(stage, TasksGraph.GetTask(taskId));
1138+
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
11341139
}
11351140
}
11361141

@@ -1381,7 +1386,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
13811386

13821387
auto buildSinks = [&]() {
13831388
for (const ui64 taskId : createdTasksIds) {
1384-
BuildSinks(stage, TasksGraph.GetTask(taskId));
1389+
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
13851390
}
13861391
};
13871392

@@ -1502,7 +1507,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
15021507
task.Meta.Type = TTaskMeta::TTaskType::Compute;
15031508
task.Meta.ExecuterId = SelfId();
15041509
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1505-
BuildSinks(stage, task);
1510+
BuildSinks(stage, stageInfo, task);
15061511
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
15071512
}
15081513
}
@@ -1780,7 +1785,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17801785
auto& task = TasksGraph.GetTask(taskIdx);
17811786
task.Meta.SetEnableShardsSequentialScan(readSettings.IsSorted());
17821787
PrepareScanMetaForUsage(task.Meta, keyTypes);
1783-
BuildSinks(stage, task);
1788+
BuildSinks(stage, stageInfo, task);
17841789
}
17851790
}
17861791

@@ -1834,7 +1839,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
18341839
task.Meta.Type = TTaskMeta::TTaskType::Scan;
18351840
task.SetMetaId(t);
18361841
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1837-
BuildSinks(stage, task);
1842+
BuildSinks(stage, stageInfo, task);
18381843

18391844
for (const auto& readInfo: *task.Meta.Reads) {
18401845
Y_ENSURE(hashByShardId.contains(readInfo.ShardId));
@@ -1876,7 +1881,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
18761881
task.Meta.Type = TTaskMeta::TTaskType::Scan;
18771882
task.SetMetaId(metaGlueingId);
18781883
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1879-
BuildSinks(stage, task);
1884+
BuildSinks(stage, stageInfo, task);
18801885
}
18811886
}
18821887
}

0 commit comments

Comments
 (0)