Skip to content

Commit 003bbcc

Browse files
committed
Merge pull request #18123 from nikvas0/evwrite-fixes-2
[25-1] EvWrite fixes
1 parent d85bc71 commit 003bbcc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2961
-323
lines changed

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
322322
AFL_ENSURE(!CollectOnly);
323323
AFL_ENSURE(State == ETransactionState::COLLECTING);
324324
AFL_ENSURE(NeedCommit());
325+
AFL_ENSURE(!BrokenLocks());
325326

326327
THashSet<ui64> sendingColumnShardsSet;
327328
THashSet<ui64> receivingColumnShardsSet;
@@ -336,7 +337,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
336337
SendingShards.insert(shardId);
337338
}
338339
}
339-
if (!shardInfo.Locks.empty()) {
340+
if (!shardInfo.Locks.empty() || (shardInfo.Flags & EAction::READ)) {
340341
SendingShards.insert(shardId);
341342
if (shardInfo.IsOlap) {
342343
sendingColumnShardsSet.insert(shardId);
@@ -428,6 +429,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
428429
|| (State == ETransactionState::COLLECTING
429430
&& IsSingleShard()));
430431
AFL_ENSURE(NeedCommit());
432+
AFL_ENSURE(!BrokenLocks());
431433
State = ETransactionState::EXECUTING;
432434

433435
for (auto& [_, shardInfo] : ShardsInfo) {

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,9 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
824824
WriteActorImmediateWritesRetries = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWritesRetries", true);
825825
WriteActorPrepareWrites = KqpGroup->GetCounter("SinkWrites/WriteActorPrepareWrites", true);
826826

827+
WriteActorWriteOnlyOperations = KqpGroup->GetCounter("SinkWrites/WriteActorWriteOnlyOperations", true);
828+
WriteActorReadWriteOperations = KqpGroup->GetCounter("SinkWrites/WriteActorReadWriteOperations", true);
829+
827830
BufferActorFlushes = KqpGroup->GetCounter("SinkWrites/BufferActorFlushes", true);
828831
BufferActorImmediateCommits = KqpGroup->GetCounter("SinkWrites/BufferActorImmediateCommits", true);
829832
BufferActorDistributedCommits = KqpGroup->GetCounter("SinkWrites/BufferActorDistributedCommits", true);

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,9 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
419419
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWritesRetries;
420420
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorPrepareWrites;
421421

422+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorWriteOnlyOperations;
423+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorReadWriteOperations;
424+
422425
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorFlushes;
423426
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorImmediateCommits;
424427
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorDistributedCommits;

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1626,7 +1626,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16261626
shardTasks.emplace(shardId, task.Id);
16271627

16281628
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1629-
BuildSinks(stage, task);
1629+
BuildSinks(stage, stageInfo, task);
16301630

16311631
return task;
16321632
};

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
984984
task.Meta.Type = TTaskMeta::TTaskType::Compute;
985985

986986
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
987-
BuildSinks(stage, task);
987+
BuildSinks(stage, stageInfo, task);
988988

989989
LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
990990
}
@@ -1008,15 +1008,20 @@ class TKqpExecuterBase : public TActor<TDerived> {
10081008
output.SinkSettings = extSink.GetSettings();
10091009
}
10101010

1011-
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, TKqpTasksGraph::TTaskType& task) {
1011+
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
10121012
const auto& intSink = sink.GetInternalSink();
10131013
auto& output = task.Outputs[sink.GetOutputIndex()];
10141014
output.Type = TTaskOutputType::Sink;
10151015
output.SinkType = intSink.GetType();
10161016

10171017
if (intSink.GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
10181018
NKikimrKqp::TKqpTableSinkSettings settings;
1019-
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
1019+
if (!stageInfo.Meta.ResolvedSinkSettings) {
1020+
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
1021+
} else {
1022+
settings = *stageInfo.Meta.ResolvedSinkSettings;
1023+
}
1024+
10201025
auto& lockTxId = TasksGraph.GetMeta().LockTxId;
10211026
if (lockTxId) {
10221027
settings.SetLockTxId(*lockTxId);
@@ -1044,14 +1049,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
10441049
}
10451050
}
10461051

1047-
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) {
1052+
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
10481053
if (stage.SinksSize() > 0) {
10491054
YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported");
10501055
const auto& sink = stage.GetSinks(0);
10511056
YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size());
10521057

10531058
if (sink.HasInternalSink()) {
1054-
BuildInternalSinks(sink, task);
1059+
BuildInternalSinks(sink, stageInfo, task);
10551060
} else if (sink.HasExternalSink()) {
10561061
BuildExternalSinks(sink, task);
10571062
} else {
@@ -1130,7 +1135,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
11301135

11311136
// finish building
11321137
for (auto taskId : tasksIds) {
1133-
BuildSinks(stage, TasksGraph.GetTask(taskId));
1138+
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
11341139
}
11351140
}
11361141

@@ -1381,7 +1386,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
13811386

13821387
auto buildSinks = [&]() {
13831388
for (const ui64 taskId : createdTasksIds) {
1384-
BuildSinks(stage, TasksGraph.GetTask(taskId));
1389+
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
13851390
}
13861391
};
13871392

@@ -1502,7 +1507,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
15021507
task.Meta.Type = TTaskMeta::TTaskType::Compute;
15031508
task.Meta.ExecuterId = SelfId();
15041509
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1505-
BuildSinks(stage, task);
1510+
BuildSinks(stage, stageInfo, task);
15061511
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
15071512
}
15081513
}
@@ -1780,7 +1785,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17801785
auto& task = TasksGraph.GetTask(taskIdx);
17811786
task.Meta.SetEnableShardsSequentialScan(readSettings.IsSorted());
17821787
PrepareScanMetaForUsage(task.Meta, keyTypes);
1783-
BuildSinks(stage, task);
1788+
BuildSinks(stage, stageInfo, task);
17841789
}
17851790
}
17861791

@@ -1876,7 +1881,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
18761881
task.Meta.Type = TTaskMeta::TTaskType::Scan;
18771882
task.SetMetaId(metaGlueingId);
18781883
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1879-
BuildSinks(stage, task);
1884+
BuildSinks(stage, stageInfo, task);
18801885
}
18811886
}
18821887
}

0 commit comments

Comments
 (0)