Skip to content

Commit 704a4ec

Browse files
authored
[KQP] Remove the CA nodes from ParticipantNodes in KqpDataExecuter (#20714)
1 parent 4a26872 commit 704a4ec

File tree

2 files changed

+10
-53
lines changed

2 files changed

+10
-53
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -526,11 +526,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
526526
auto& state = ev->Get()->Record;
527527
ui64 taskId = state.GetTaskId();
528528

529-
ParticipantNodes.emplace(computeActor.NodeId());
530-
if (TxManager) {
531-
TxManager->AddParticipantNode(computeActor.NodeId());
532-
}
533-
534529
bool populateChannels = HandleComputeStats(ev);
535530

536531
switch (state.GetState()) {
@@ -2311,7 +2306,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
23112306

23122307
ui32 StatementResultIndex;
23132308

2314-
// Track which nodes has been involved during execution
2309+
// Track which nodes (by shards) have been involved during execution
23152310
THashSet<ui32> ParticipantNodes;
23162311

23172312
bool AlreadyReplied = false;

ydb/core/kqp/ut/query/kqp_stats_ut.cpp

Lines changed: 9 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -718,9 +718,10 @@ Y_UNIT_TEST_TWIN(OneShardLocalExec, UseSink) {
718718
UNIT_ASSERT_VALUES_EQUAL(counters.NonLocalSingleNodeReqCount->Val(), 0);
719719
}
720720

721-
Y_UNIT_TEST_TWIN(OneShardNonLocalExec, UseSink) {
721+
Y_UNIT_TEST_QUAD(OneShardNonLocalExec, UseSink, EnableParallelPointReadConsolidation) {
722722
NKikimrConfig::TAppConfig app;
723723
app.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
724+
app.MutableTableServiceConfig()->SetEnableParallelPointReadConsolidation(EnableParallelPointReadConsolidation);
724725
TKikimrRunner kikimr(TKikimrSettings().SetNodeCount(2).SetAppConfig(app));
725726
auto db = kikimr.GetTableClient();
726727
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -791,89 +792,50 @@ Y_UNIT_TEST_TWIN(OneShardNonLocalExec, UseSink) {
791792
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
792793
UNIT_ASSERT(result.IsSuccess());
793794

794-
if (app.GetTableServiceConfig().GetEnableParallelPointReadConsolidation()) {
795-
// Session is on node 1, read task is for node 2 -> TotalSingleNodeReqCount does not increase
796-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), expectedTotalSingleNodeReqCount);
797-
} else {
798-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
799-
}
795+
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
800796
}
801797
{
802798
auto result = session.ExecuteDataQuery(R"(
803799
UPSERT INTO `/Root/EightShard` (Key, Data) VALUES (1, 1);
804800
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
805801
UNIT_ASSERT(result.IsSuccess());
806802

807-
if (app.GetTableServiceConfig().GetEnableParallelPointReadConsolidation()) {
808-
// If UseSink is enabled, compute is on node 1, write is on node 2 -> TotalSingleNodeReqCount does not increase
809-
// If UseSink is disabled, the execution is on node 2 -> TotalSingleNodeReqCount increases
810-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), expectedTotalSingleNodeReqCount += (UseSink) ? 0 : 1);
811-
} else {
812-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
813-
}
803+
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
814804
}
815805
{
816806
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
817807
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
818808
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
819809
UNIT_ASSERT(result.IsSuccess());
820810

821-
if (app.GetTableServiceConfig().GetEnableParallelPointReadConsolidation()) {
822-
// Session is on node 1, read task is for node 2 -> TotalSingleNodeReqCount does not increase
823-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), expectedTotalSingleNodeReqCount);
824-
} else {
825-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
826-
}
811+
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
827812
}
828813
{
829814
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
830815
UPSERT INTO `/Root/EightShard` (Key, Data) VALUES (1, 1);
831816
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
832817
UNIT_ASSERT(result.IsSuccess());
833818

834-
if (app.GetTableServiceConfig().GetEnableParallelPointReadConsolidation()) {
835-
// If UseSink is enabled, compute is on node 1, write is on node 2 -> TotalSingleNodeReqCount does not increase
836-
// If UseSink is disabled, the execution is on node 2 -> TotalSingleNodeReqCount increases
837-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), expectedTotalSingleNodeReqCount += (UseSink) ? 0 : 1);
838-
} else {
839-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
840-
}
819+
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
841820
}
842821
{
843822
auto result = session.ExecuteDataQuery(R"(
844823
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
845824
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
846825
UNIT_ASSERT(result.IsSuccess());
847826

848-
if (app.GetTableServiceConfig().GetEnableParallelPointReadConsolidation()) {
849-
// Read is on node 1, write is on node 2 -> TotalSingleNodeReqCount does not increase
850-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), expectedTotalSingleNodeReqCount);
851-
} else {
852-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
853-
}
827+
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
854828
}
855829
{
856830
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
857831
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
858832
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
859833
UNIT_ASSERT(result.IsSuccess());
860834

861-
if (app.GetTableServiceConfig().GetEnableParallelPointReadConsolidation()) {
862-
// Read is on node 1, write is on node 2 -> TotalSingleNodeReqCount does not increase
863-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), expectedTotalSingleNodeReqCount);
864-
} else {
865-
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
866-
}
867-
}
868-
869-
if (app.GetTableServiceConfig().GetEnableParallelPointReadConsolidation()) {
870-
// If UseSink is enabled, all requests are local or with two nodes -> NonLocalSingleNodeReqCount does not increase
871-
// If UseSink is disabled, UPSERT queries are non-local and for the single node -> NonLocalSingleNodeReqCount increases
872-
expectedNonLocalSingleNodeReqCount += (UseSink) ? 0 : 2;
873-
} else {
874-
expectedNonLocalSingleNodeReqCount += 6;
835+
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
875836
}
876837

838+
expectedNonLocalSingleNodeReqCount += 6;
877839
UNIT_ASSERT_VALUES_EQUAL(counters.NonLocalSingleNodeReqCount->Val(), expectedNonLocalSingleNodeReqCount);
878840

879841
// Now resume node 1 and move all tablets on the node1

0 commit comments

Comments
 (0)