Skip to content

Commit b7dfcde

Browse files
authored
Fix participant nodes count for sinks (#16137)
1 parent 7667788 commit b7dfcde

File tree

9 files changed

+38
-6
lines changed

9 files changed

+38
-6
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
2727
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore
2828
ydb/core/kqp/ut/query KqpLimits.OutOfSpaceYQLUpsertFail+useSink
2929
ydb/core/kqp/ut/query KqpLimits.QSReplySizeEnsureMemoryLimits+useSink
30-
ydb/core/kqp/ut/query KqpStats.OneShardLocalExec+UseSink
31-
ydb/core/kqp/ut/query KqpStats.OneShardNonLocalExec+UseSink
3230
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
3331
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
3432
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ class TKqpTransactionManager : public IKqpTransactionManager {
162162
return nullptr;
163163
}
164164

165+
void AddParticipantNode(const ui32 nodeId) override {
166+
ParticipantNodes.insert(nodeId);
167+
}
168+
169+
const THashSet<ui32>& GetParticipantNodes() const override {
170+
return ParticipantNodes;
171+
}
172+
165173
void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) override {
166174
TopicOperations = std::move(topicOperations);
167175
}
@@ -506,6 +514,8 @@ class TKqpTransactionManager : public IKqpTransactionManager {
506514
THashMap<ui64, TShardInfo> ShardsInfo;
507515
std::unordered_set<TString> TablePathes;
508516

517+
THashSet<ui32> ParticipantNodes;
518+
509519
THashMap<TTableId, std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>>> TablePartitioning;
510520

511521
bool AllowVolatile = false;

ydb/core/kqp/common/kqp_tx_manager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ class IKqpTransactionManager {
7171
virtual void BuildTopicTxs(NTopic::TTopicOperationTransactions& txs) = 0;
7272
virtual bool HasTopics() const = 0;
7373

74+
virtual void AddParticipantNode(const ui32 nodeId) = 0;
75+
virtual const THashSet<ui32>& GetParticipantNodes() const = 0;
76+
7477
virtual bool IsTxPrepared() const = 0;
7578
virtual bool IsTxFinished() const = 0;
7679

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
12771277
Stats->AddDatashardStats(std::move(*res->Record.MutableTxStats()));
12781278
}
12791279

1280+
if (TxManager) {
1281+
TxManager->AddParticipantNode(ev->Sender.NodeId());
1282+
}
1283+
12801284
switch (ev->Get()->GetStatus()) {
12811285
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
12821286
YQL_ENSURE(false);

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,9 @@ class TKqpExecuterBase : public TActor<TDerived> {
265265
for (auto& [shardId, nodeId] : ShardIdToNodeId) {
266266
ShardsOnNode[nodeId].push_back(shardId);
267267
ParticipantNodes.emplace(nodeId);
268+
if (TxManager) {
269+
TxManager->AddParticipantNode(nodeId);
270+
}
268271
}
269272

270273
if (IsDebugLogEnabled()) {

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,8 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
548548
<< ", Cookie=" << ev->Cookie);
549549
UpdateStats(ev->Get()->Record.GetTxStats());
550550

551+
TxManager->AddParticipantNode(ev->Sender.NodeId());
552+
551553
switch (ev->Get()->GetStatus()) {
552554
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
553555
CA_LOG_E("Got UNSPECIFIED for table `"
@@ -2349,6 +2351,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
23492351
return (tableInfo.Pathes.size() == 1 ? "Table: " : "Tables: ") + builder;
23502352
};
23512353

2354+
TxManager->AddParticipantNode(ev->Sender.NodeId());
2355+
23522356
// TODO: get rid of copy-paste
23532357
switch (ev->Get()->GetStatus()) {
23542358
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,16 @@ class TKqpQueryState : public TNonCopyable {
176176
if (RequestEv->GetRequestCtx() == nullptr) {
177177
return false;
178178
}
179-
if (ParticipantNodes.size() == 1) {
179+
if (IsSingleNodeExecution()) {
180180
return *ParticipantNodes.begin() == nodeId;
181181
}
182182
return false;
183183
}
184184

185+
bool IsSingleNodeExecution() const {
186+
return ParticipantNodes.size() == 1;
187+
}
188+
185189
NKikimrKqp::EQueryAction GetAction() const {
186190
return QueryAction;
187191
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1627,8 +1627,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
16271627
QueryState->QueryStats.Executions.back().Swap(executerResults.MutableStats());
16281628
}
16291629

1630-
for (auto nodeId : ev->ParticipantNodes) {
1631-
QueryState->ParticipantNodes.emplace(nodeId);
1630+
if (QueryState->TxCtx->TxManager) {
1631+
QueryState->ParticipantNodes = QueryState->TxCtx->TxManager->GetParticipantNodes();
1632+
} else {
1633+
for (auto nodeId : ev->ParticipantNodes) {
1634+
QueryState->ParticipantNodes.emplace(nodeId);
1635+
}
16321636
}
16331637

16341638
if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
@@ -2377,7 +2381,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
23772381
QueryState->PoolHandlerActor = Nothing();
23782382
}
23792383

2380-
if (QueryState && QueryState->ParticipantNodes.size() == 1) {
2384+
if (QueryState && QueryState->IsSingleNodeExecution()) {
23812385
Counters->TotalSingleNodeReqCount->Inc();
23822386
if (!QueryState->IsLocalExecution(SelfId().NodeId())) {
23832387
Counters->NonLocalSingleNodeReqCount->Inc();

ydb/core/kqp/ut/query/kqp_stats_ut.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,8 @@ Y_UNIT_TEST_TWIN(OneShardLocalExec, UseSink) {
685685
auto session = db.CreateSession().GetValueSync().GetSession();
686686

687687
TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
688+
689+
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 1);
688690
{
689691
auto result = session.ExecuteDataQuery(R"(
690692
SELECT * FROM `/Root/KeyValue` WHERE Key = 1;

0 commit comments

Comments
 (0)