Skip to content

Commit 9566518

Browse files
authored
Merge pull request #18123 from nikvas0/evwrite-fixes-2
[25-1] EvWrite fixes
2 parents d9cb943 + 4ef0b8e commit 9566518

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+3055
-349
lines changed

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
322322
AFL_ENSURE(!CollectOnly);
323323
AFL_ENSURE(State == ETransactionState::COLLECTING);
324324
AFL_ENSURE(NeedCommit());
325+
AFL_ENSURE(!BrokenLocks());
325326

326327
THashSet<ui64> sendingColumnShardsSet;
327328
THashSet<ui64> receivingColumnShardsSet;
@@ -336,7 +337,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
336337
SendingShards.insert(shardId);
337338
}
338339
}
339-
if (!shardInfo.Locks.empty()) {
340+
if (!shardInfo.Locks.empty() || (shardInfo.Flags & EAction::READ)) {
340341
SendingShards.insert(shardId);
341342
if (shardInfo.IsOlap) {
342343
sendingColumnShardsSet.insert(shardId);
@@ -428,6 +429,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
428429
|| (State == ETransactionState::COLLECTING
429430
&& IsSingleShard()));
430431
AFL_ENSURE(NeedCommit());
432+
AFL_ENSURE(!BrokenLocks());
431433
State = ETransactionState::EXECUTING;
432434

433435
for (auto& [_, shardInfo] : ShardsInfo) {

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,9 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
824824
WriteActorImmediateWritesRetries = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWritesRetries", true);
825825
WriteActorPrepareWrites = KqpGroup->GetCounter("SinkWrites/WriteActorPrepareWrites", true);
826826

827+
WriteActorWriteOnlyOperations = KqpGroup->GetCounter("SinkWrites/WriteActorWriteOnlyOperations", true);
828+
WriteActorReadWriteOperations = KqpGroup->GetCounter("SinkWrites/WriteActorReadWriteOperations", true);
829+
827830
BufferActorFlushes = KqpGroup->GetCounter("SinkWrites/BufferActorFlushes", true);
828831
BufferActorImmediateCommits = KqpGroup->GetCounter("SinkWrites/BufferActorImmediateCommits", true);
829832
BufferActorDistributedCommits = KqpGroup->GetCounter("SinkWrites/BufferActorDistributedCommits", true);

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,9 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
419419
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWritesRetries;
420420
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorPrepareWrites;
421421

422+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorWriteOnlyOperations;
423+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorReadWriteOperations;
424+
422425
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorFlushes;
423426
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorImmediateCommits;
424427
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorDistributedCommits;

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1620,7 +1620,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16201620
shardTasks.emplace(shardId, task.Id);
16211621

16221622
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1623-
BuildSinks(stage, task);
1623+
BuildSinks(stage, stageInfo, task);
16241624

16251625
return task;
16261626
};

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -978,7 +978,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
978978
task.Meta.Type = TTaskMeta::TTaskType::Compute;
979979

980980
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
981-
BuildSinks(stage, task);
981+
BuildSinks(stage, stageInfo, task);
982982

983983
LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
984984
}
@@ -1002,15 +1002,20 @@ class TKqpExecuterBase : public TActor<TDerived> {
10021002
output.SinkSettings = extSink.GetSettings();
10031003
}
10041004

1005-
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, TKqpTasksGraph::TTaskType& task) {
1005+
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
10061006
const auto& intSink = sink.GetInternalSink();
10071007
auto& output = task.Outputs[sink.GetOutputIndex()];
10081008
output.Type = TTaskOutputType::Sink;
10091009
output.SinkType = intSink.GetType();
10101010

10111011
if (intSink.GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
10121012
NKikimrKqp::TKqpTableSinkSettings settings;
1013-
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
1013+
if (!stageInfo.Meta.ResolvedSinkSettings) {
1014+
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
1015+
} else {
1016+
settings = *stageInfo.Meta.ResolvedSinkSettings;
1017+
}
1018+
10141019
auto& lockTxId = TasksGraph.GetMeta().LockTxId;
10151020
if (lockTxId) {
10161021
settings.SetLockTxId(*lockTxId);
@@ -1038,14 +1043,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
10381043
}
10391044
}
10401045

1041-
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) {
1046+
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
10421047
if (stage.SinksSize() > 0) {
10431048
YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported");
10441049
const auto& sink = stage.GetSinks(0);
10451050
YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size());
10461051

10471052
if (sink.HasInternalSink()) {
1048-
BuildInternalSinks(sink, task);
1053+
BuildInternalSinks(sink, stageInfo, task);
10491054
} else if (sink.HasExternalSink()) {
10501055
BuildExternalSinks(sink, task);
10511056
} else {
@@ -1124,7 +1129,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
11241129

11251130
// finish building
11261131
for (auto taskId : tasksIds) {
1127-
BuildSinks(stage, TasksGraph.GetTask(taskId));
1132+
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
11281133
}
11291134
}
11301135

@@ -1375,7 +1380,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
13751380

13761381
auto buildSinks = [&]() {
13771382
for (const ui64 taskId : createdTasksIds) {
1378-
BuildSinks(stage, TasksGraph.GetTask(taskId));
1383+
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
13791384
}
13801385
};
13811386

@@ -1486,7 +1491,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
14861491
task.Meta.Type = TTaskMeta::TTaskType::Compute;
14871492
task.Meta.ExecuterId = SelfId();
14881493
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1489-
BuildSinks(stage, task);
1494+
BuildSinks(stage, stageInfo, task);
14901495
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
14911496
}
14921497
}
@@ -1740,7 +1745,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17401745
auto& task = TasksGraph.GetTask(taskIdx);
17411746
task.Meta.SetEnableShardsSequentialScan(readSettings.Sorted);
17421747
PrepareScanMetaForUsage(task.Meta, keyTypes);
1743-
BuildSinks(stage, task);
1748+
BuildSinks(stage, stageInfo, task);
17441749
}
17451750
}
17461751

@@ -1769,7 +1774,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17691774
task.Meta.Type = TTaskMeta::TTaskType::Scan;
17701775
task.SetMetaId(metaGlueingId);
17711776
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1772-
BuildSinks(stage, task);
1777+
BuildSinks(stage, stageInfo, task);
17731778
}
17741779
}
17751780
}

0 commit comments

Comments
 (0)