From 3e0f8d26d6f2ffad59a8a0a8a6888740e9635fc0 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Mon, 7 Jul 2025 19:46:11 +0300 Subject: [PATCH] tests --- ydb/core/kqp/ut/scan/kqp_split_ut.cpp | 224 +++++++++++++++++++++++++- 1 file changed, 223 insertions(+), 1 deletion(-) diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp index e6a72d2d7729..9578bf7dae7a 100644 --- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp @@ -309,6 +309,18 @@ Y_UNIT_TEST_SUITE(KqpSplit) { UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetYdbStatus(), code); } + void SendDataQuery(TTestActorRuntime* runtime, TActorId kqpProxy, TActorId sender, const TString& queryText) { + auto ev = std::make_unique(); + ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + ev->Record.MutableRequest()->SetQuery(queryText); + ev->Record.MutableRequest()->SetUsePublicResponseDataFormat(true); + ActorIdToProto(sender, ev->Record.MutableRequestActorId()); + runtime->Send(new IEventHandle(kqpProxy, sender, ev.release())); + } + void SendScanQuery(TTestActorRuntime* runtime, TActorId kqpProxy, TActorId sender, const TString& queryText) { auto ev = std::make_unique(); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); @@ -317,7 +329,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { ev->Record.MutableRequest()->SetKeepSession(false); ActorIdToProto(sender, ev->Record.MutableRequestActorId()); runtime->Send(new IEventHandle(kqpProxy, sender, ev.release())); - }; + } void CollectKeysTo(TVector* collectedKeys, TTestActorRuntime* runtime, TActorId sender) { auto captureEvents = [=](TTestActorRuntimeBase&, TAutoPtr& ev) { @@ -332,6 +344,14 @@ Y_UNIT_TEST_SUITE(KqpSplit) { runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return true; } + if (ev->GetTypeRewrite() == NKqp::TEvKqp::TEvQueryResponse::EventType) { + auto& record = ev->Get()->Record; + for (auto& resultSet : record.GetResponse().GetYdbResults()) { + for (auto& row : resultSet.rows()) { + collectedKeys->push_back(row.items(0).uint64_value()); + } + } + } return false; }; @@ -408,6 +428,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(testActorType == ETestActorType::SorceRead); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(true); settings.SetDomainRoot(KikimrDefaultUtDomainRoot); settings.SetAppConfig(appConfig); @@ -455,6 +476,10 @@ Y_UNIT_TEST_SUITE(KqpSplit) { ::NKikimr::NKqp::NTestSuiteKqpSplit::SendScanQuery(Runtime, KqpProxy, Sender, text); } + void SendDataQuery(TString text) { + ::NKikimr::NKqp::NTestSuiteKqpSplit::SendDataQuery(Runtime, KqpProxy, Sender, text); + } + TMaybe Kikimr; TVector CollectedKeys; Tests::TServer* Server; @@ -794,6 +819,50 @@ Y_UNIT_TEST_SUITE(KqpSplit) { UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",1,2,3,4"); } + Y_UNIT_TEST(StreamLookupJoinSplitBeforeReading) { + TTestSetup s(ETestActorType::StreamLookup, "/Root/Table1"); + + ExecSQL(*s.Runtime, s.Sender, R"( + --!syntax_v1 + CREATE TABLE `/Root/Table1` (Key uint64, Key2 uint64, Value uint64, PRIMARY KEY(Key, Key2)); + )", false); + + ExecSQL(*s.Runtime, s.Sender, R"( + REPLACE INTO `/Root/Table1` (Key, Key2, Value) VALUES + (1u, 1u, 1u), + (1u, 2u, 2u), + (1u, 3u, 3u), + (2147483648u, 4u, 4u); + )", true); + + auto shards = s.Shards(); + auto* shim = new TReadActorPipeCacheStub(); + + InterceptStreamLookupActorPipeCache(s.Runtime->Register(shim)); + shim->SetupCapture(0, 1); + s.SendDataQuery(R"( + $data = AsList( + AsStruct(1u AS Key, 1u AS Value), + AsStruct(2147483648u AS Key, 2147483648u AS Value)); + + SELECT b.Value + FROM AS_TABLE($data) a + JOIN `/Root/Table1` b + ON a.Key = b.Key + ORDER BY b.Value ASC; + )"); + + shim->ReadsReceived.WaitI(); + Cerr << "starting split -----------------------------------------------------------" << Endl; + s.Split(shards.at(0), 3); + Cerr << "resume evread -----------------------------------------------------------" << Endl; + shim->SkipAll(); + shim->SendCaptured(s.Runtime); + + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",1,2,3,4"); + } + Y_UNIT_TEST(StreamLookupSplitAfterFirstResult) { TTestSetup s(ETestActorType::StreamLookup, "/Root/TestIndex"); @@ -847,6 +916,61 @@ Y_UNIT_TEST_SUITE(KqpSplit) { UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",1,2,3,4"); } + Y_UNIT_TEST(StreamLookupJoinSplitAfterFirstResult) { + TTestSetup s(ETestActorType::StreamLookup, "/Root/Table1"); + + ExecSQL(*s.Runtime, s.Sender, R"( + --!syntax_v1 + CREATE TABLE `/Root/Table1` (Key uint64, Key2 uint64, Value uint64, PRIMARY KEY(Key, Key2)); + )", false); + + ExecSQL(*s.Runtime, s.Sender, R"( + REPLACE INTO `/Root/Table1` (Key, Key2, Value) VALUES + (1u, 1u, 1u), + (1u, 2u, 2u), + (1u, 3u, 3u), + (2147483648u, 4u, 4u); + )", true); + + auto shards = s.Shards(); + + NKikimrTxDataShard::TEvRead evread; + evread.SetMaxRowsInResult(2); + evread.SetMaxRows(2); + SetDefaultReadSettings(evread); + + NKikimrTxDataShard::TEvReadAck evreadack; + evreadack.SetMaxRows(2); + SetDefaultReadAckSettings(evreadack); + + auto* shim = new TReadActorPipeCacheStub(); + shim->SetupCapture(1, 1); + shim->SetupResultsCapture(1); + InterceptStreamLookupActorPipeCache(s.Runtime->Register(shim)); + s.SendDataQuery(R"( + $data = AsList( + AsStruct(1u AS Key, 1u AS Value), + AsStruct(2147483648u AS Key, 2147483648u AS Value)); + + SELECT b.Value + FROM AS_TABLE($data) a + JOIN `/Root/Table1` b + ON a.Key = b.Key + ORDER BY b.Value ASC; + )"); + + shim->ReadsReceived.WaitI(); + Cerr << "starting split -----------------------------------------------------------" << Endl; + s.Split(shards.at(0), 3); + Cerr << "resume evread -----------------------------------------------------------" << Endl; + shim->SkipAll(); + shim->AllowResults(); + shim->SendCaptured(s.Runtime); + + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",1,2,3,4"); + } + Y_UNIT_TEST(StreamLookupRetryAttemptForFinishedRead) { TTestSetup s(ETestActorType::StreamLookup, "/Root/TestIndex"); @@ -902,6 +1026,58 @@ Y_UNIT_TEST_SUITE(KqpSplit) { UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",1,2,3,4"); } + Y_UNIT_TEST(StreamLookupJoinRetryAttemptForFinishedRead) { + TTestSetup s(ETestActorType::StreamLookup, "/Root/Table1"); + + auto settings = MakeIntrusive(); + settings->StartRetryDelay = TDuration::MilliSeconds(250); + settings->MaxShardAttempts = 4; + // set small read response timeout (for frequent retries) + settings->ReadResponseTimeout = TDuration::MilliSeconds(1); + SetReadIteratorBackoffSettings(settings); + + ExecSQL(*s.Runtime, s.Sender, R"( + --!syntax_v1 + CREATE TABLE `/Root/Table1` (Key uint64, Key2 uint64, Value uint64, PRIMARY KEY(Key, Key2)); + )", false); + + ExecSQL(*s.Runtime, s.Sender, R"( + REPLACE INTO `/Root/Table1` (Key, Key2, Value) VALUES + (1u, 1u, 10u); + )", true); + + auto shards = s.Shards(); + auto* shim = new TReadActorPipeCacheStub(); + + InterceptStreamLookupActorPipeCache(s.Runtime->Register(shim)); + shim->SetupCapture(0, 1); + + s.SendDataQuery(R"( + $data = AsList( + AsStruct(1u AS Key, 1u AS Value), + AsStruct(2147483648u AS Key, 2147483648u AS Value)); + + SELECT b.Value + FROM AS_TABLE($data) a + JOIN `/Root/Table1` b + ON a.Key = b.Key + ORDER BY b.Value ASC; + )"); + + shim->ReadsReceived.WaitI(); + + UNIT_ASSERT_EQUAL(shards.size(), 1); + auto undelivery = MakeHolder(shards[0], true); + + UNIT_ASSERT_EQUAL(shim->Captured.size(), 1); + s.Runtime->Send(shim->Captured[0]->Sender, s.Sender, undelivery.Release()); + + shim->SkipAll(); + + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",10"); + } + Y_UNIT_TEST(StreamLookupDeliveryProblem) { TTestSetup s(ETestActorType::StreamLookup, "/Root/TestIndex"); @@ -950,6 +1126,52 @@ Y_UNIT_TEST_SUITE(KqpSplit) { } + Y_UNIT_TEST(StreamLookupJoinDeliveryProblem) { + TTestSetup s(ETestActorType::StreamLookup, "/Root/Table1"); + + ExecSQL(*s.Runtime, s.Sender, R"( + --!syntax_v1 + CREATE TABLE `/Root/Table1` (Key uint64, Key2 uint64, Value uint64, PRIMARY KEY(Key, Key2)); + )", false); + + ExecSQL(*s.Runtime, s.Sender, R"( + REPLACE INTO `/Root/Table1` (Key, Key2, Value) VALUES + (1u, 1u, 10u); + )", true); + + auto shards = s.Shards(); + auto* shim = new TReadActorPipeCacheStub(); + + InterceptStreamLookupActorPipeCache(s.Runtime->Register(shim)); + shim->SetupCapture(0, 1); + + s.SendDataQuery(R"( + $data = AsList( + AsStruct(1u AS Key, 1u AS Value), + AsStruct(2147483648u AS Key, 2147483648u AS Value)); + + SELECT b.Value + FROM AS_TABLE($data) a + JOIN `/Root/Table1` b + ON a.Key = b.Key + ORDER BY b.Value ASC; + )"); + + shim->ReadsReceived.WaitI(); + + UNIT_ASSERT_EQUAL(shards.size(), 1); + auto undelivery = MakeHolder(shards[0], true); + + UNIT_ASSERT_EQUAL(shim->Captured.size(), 1); + s.Runtime->Send(shim->Captured[0]->Sender, s.Sender, undelivery.Release()); + + shim->SkipAll(); + shim->SendCaptured(s.Runtime); + + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",10"); + } + // TODO: rework test for stream lookups //Y_UNIT_TEST_SORT(AfterResolvePoints, Order) { // TTestSetup s;