Skip to content

Commit 09fe1c0

Browse files
committed
Merge branch 'stable-25-1' of github.com:ydb-platform/ydb into 25-1-batch-op
2 parents 72d9831 + 9566518 commit 09fe1c0

File tree

70 files changed

+3584
-475
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+3584
-475
lines changed

ydb/core/kafka_proxy/kafka_connection.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,10 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
569569
responseHeader.Write(writable, headerVersion);
570570
reply->Write(writable, version);
571571

572-
Buffer.flush();
572+
ssize_t res = Buffer.flush();
573+
if (res < 0) {
574+
ythrow yexception() << "Error during flush of the written to socket data. Error code: " << strerror(-res) << " (" << res << ")";
575+
}
573576

574577
KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size);
575578
} catch(const yexception& e) {

ydb/core/kafka_proxy/kafka_messages_int.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ TKafkaWritable& TKafkaWritable::operator<<(const TKafkaUuid& val) {
2020
}
2121

2222
void TKafkaWritable::write(const char* val, size_t length) {
23-
Buffer.write(val, length);
23+
ssize_t res = Buffer.write(val, length);
24+
if (res < 0) {
25+
ythrow yexception() << "Error during flush of the written to socket data. Error code: " << strerror(-res) << " (" << res << ")";
26+
}
2427
}
2528

2629
TKafkaReadable& TKafkaReadable::operator>>(TKafkaUuid& val) {

ydb/core/kafka_proxy/ut/ut_serialization.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,8 @@ Y_UNIT_TEST(RequestHeader_reference) {
648648
0x65, 0x72, 0x2D, 0x31};
649649

650650
TWritableBuf sb(nullptr, BUFFER_SIZE);
651-
sb.write((char*)reference, sizeof(reference));
651+
ssize_t res = sb.write((char*)reference, sizeof(reference));
652+
UNIT_ASSERT_GE(res, 0);
652653

653654
TKafkaReadable readable(sb.GetBuffer());
654655
TRequestHeaderData result;

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
322322
AFL_ENSURE(!CollectOnly);
323323
AFL_ENSURE(State == ETransactionState::COLLECTING);
324324
AFL_ENSURE(NeedCommit());
325+
AFL_ENSURE(!BrokenLocks());
325326

326327
THashSet<ui64> sendingColumnShardsSet;
327328
THashSet<ui64> receivingColumnShardsSet;
@@ -336,7 +337,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
336337
SendingShards.insert(shardId);
337338
}
338339
}
339-
if (!shardInfo.Locks.empty()) {
340+
if (!shardInfo.Locks.empty() || (shardInfo.Flags & EAction::READ)) {
340341
SendingShards.insert(shardId);
341342
if (shardInfo.IsOlap) {
342343
sendingColumnShardsSet.insert(shardId);
@@ -428,6 +429,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
428429
|| (State == ETransactionState::COLLECTING
429430
&& IsSingleShard()));
430431
AFL_ENSURE(NeedCommit());
432+
AFL_ENSURE(!BrokenLocks());
431433
State = ETransactionState::EXECUTING;
432434

433435
for (auto& [_, shardInfo] : ShardsInfo) {

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,9 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
824824
WriteActorImmediateWritesRetries = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWritesRetries", true);
825825
WriteActorPrepareWrites = KqpGroup->GetCounter("SinkWrites/WriteActorPrepareWrites", true);
826826

827+
WriteActorWriteOnlyOperations = KqpGroup->GetCounter("SinkWrites/WriteActorWriteOnlyOperations", true);
828+
WriteActorReadWriteOperations = KqpGroup->GetCounter("SinkWrites/WriteActorReadWriteOperations", true);
829+
827830
BufferActorFlushes = KqpGroup->GetCounter("SinkWrites/BufferActorFlushes", true);
828831
BufferActorImmediateCommits = KqpGroup->GetCounter("SinkWrites/BufferActorImmediateCommits", true);
829832
BufferActorDistributedCommits = KqpGroup->GetCounter("SinkWrites/BufferActorDistributedCommits", true);

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,9 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
419419
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWritesRetries;
420420
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorPrepareWrites;
421421

422+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorWriteOnlyOperations;
423+
::NMonitoring::TDynamicCounters::TCounterPtr WriteActorReadWriteOperations;
424+
422425
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorFlushes;
423426
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorImmediateCommits;
424427
::NMonitoring::TDynamicCounters::TCounterPtr BufferActorDistributedCommits;

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1631,7 +1631,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16311631
shardTasks.emplace(shardId, task.Id);
16321632

16331633
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1634-
BuildSinks(stage, task);
1634+
BuildSinks(stage, stageInfo, task);
16351635

16361636
return task;
16371637
};

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
981981
task.Meta.Type = TTaskMeta::TTaskType::Compute;
982982

983983
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
984-
BuildSinks(stage, task);
984+
BuildSinks(stage, stageInfo, task);
985985

986986
LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
987987
}
@@ -1005,15 +1005,20 @@ class TKqpExecuterBase : public TActor<TDerived> {
10051005
output.SinkSettings = extSink.GetSettings();
10061006
}
10071007

1008-
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, TKqpTasksGraph::TTaskType& task) {
1008+
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
10091009
const auto& intSink = sink.GetInternalSink();
10101010
auto& output = task.Outputs[sink.GetOutputIndex()];
10111011
output.Type = TTaskOutputType::Sink;
10121012
output.SinkType = intSink.GetType();
10131013

10141014
if (intSink.GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
10151015
NKikimrKqp::TKqpTableSinkSettings settings;
1016-
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
1016+
if (!stageInfo.Meta.ResolvedSinkSettings) {
1017+
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
1018+
} else {
1019+
settings = *stageInfo.Meta.ResolvedSinkSettings;
1020+
}
1021+
10171022
auto& lockTxId = TasksGraph.GetMeta().LockTxId;
10181023
if (lockTxId) {
10191024
settings.SetLockTxId(*lockTxId);
@@ -1041,14 +1046,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
10411046
}
10421047
}
10431048

1044-
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) {
1049+
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
10451050
if (stage.SinksSize() > 0) {
10461051
YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported");
10471052
const auto& sink = stage.GetSinks(0);
10481053
YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size());
10491054

10501055
if (sink.HasInternalSink()) {
1051-
BuildInternalSinks(sink, task);
1056+
BuildInternalSinks(sink, stageInfo, task);
10521057
} else if (sink.HasExternalSink()) {
10531058
BuildExternalSinks(sink, task);
10541059
} else {
@@ -1127,7 +1132,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
11271132

11281133
// finish building
11291134
for (auto taskId : tasksIds) {
1130-
BuildSinks(stage, TasksGraph.GetTask(taskId));
1135+
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
11311136
}
11321137
}
11331138

@@ -1384,7 +1389,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
13841389

13851390
auto buildSinks = [&]() {
13861391
for (const ui64 taskId : createdTasksIds) {
1387-
BuildSinks(stage, TasksGraph.GetTask(taskId));
1392+
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
13881393
}
13891394
};
13901395

@@ -1495,7 +1500,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
14951500
task.Meta.Type = TTaskMeta::TTaskType::Compute;
14961501
task.Meta.ExecuterId = SelfId();
14971502
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1498-
BuildSinks(stage, task);
1503+
BuildSinks(stage, stageInfo, task);
14991504
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
15001505
}
15011506
}
@@ -1749,7 +1754,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17491754
auto& task = TasksGraph.GetTask(taskIdx);
17501755
task.Meta.SetEnableShardsSequentialScan(readSettings.Sorted);
17511756
PrepareScanMetaForUsage(task.Meta, keyTypes);
1752-
BuildSinks(stage, task);
1757+
BuildSinks(stage, stageInfo, task);
17531758
}
17541759
}
17551760

@@ -1778,7 +1783,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17781783
task.Meta.Type = TTaskMeta::TTaskType::Scan;
17791784
task.SetMetaId(metaGlueingId);
17801785
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1781-
BuildSinks(stage, task);
1786+
BuildSinks(stage, stageInfo, task);
17821787
}
17831788
}
17841789
}

0 commit comments

Comments
 (0)