Skip to content

Tests for stream lookup join & shard splits #20749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 223 additions & 1 deletion ydb/core/kqp/ut/scan/kqp_split_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,18 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetYdbStatus(), code);
}

void SendDataQuery(TTestActorRuntime* runtime, TActorId kqpProxy, TActorId sender, const TString& queryText) {
auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
ev->Record.MutableRequest()->SetQuery(queryText);
ev->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
ActorIdToProto(sender, ev->Record.MutableRequestActorId());
runtime->Send(new IEventHandle(kqpProxy, sender, ev.release()));
}

void SendScanQuery(TTestActorRuntime* runtime, TActorId kqpProxy, TActorId sender, const TString& queryText) {
auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
Expand All @@ -317,7 +329,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
ev->Record.MutableRequest()->SetKeepSession(false);
ActorIdToProto(sender, ev->Record.MutableRequestActorId());
runtime->Send(new IEventHandle(kqpProxy, sender, ev.release()));
};
}

void CollectKeysTo(TVector<ui64>* collectedKeys, TTestActorRuntime* runtime, TActorId sender) {
auto captureEvents = [=](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
Expand All @@ -332,6 +344,14 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return true;
}
if (ev->GetTypeRewrite() == NKqp::TEvKqp::TEvQueryResponse::EventType) {
auto& record = ev->Get<NKqp::TEvKqp::TEvQueryResponse>()->Record;
for (auto& resultSet : record.GetResponse().GetYdbResults()) {
for (auto& row : resultSet.rows()) {
collectedKeys->push_back(row.items(0).uint64_value());
}
}
}

return false;
};
Expand Down Expand Up @@ -408,6 +428,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(testActorType == ETestActorType::SorceRead);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
settings.SetAppConfig(appConfig);

Expand Down Expand Up @@ -455,6 +476,10 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
::NKikimr::NKqp::NTestSuiteKqpSplit::SendScanQuery(Runtime, KqpProxy, Sender, text);
}

void SendDataQuery(TString text) {
::NKikimr::NKqp::NTestSuiteKqpSplit::SendDataQuery(Runtime, KqpProxy, Sender, text);
}

TMaybe<TKikimrRunner> Kikimr;
TVector<ui64> CollectedKeys;
Tests::TServer* Server;
Expand Down Expand Up @@ -794,6 +819,50 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",1,2,3,4");
}

Y_UNIT_TEST(StreamLookupJoinSplitBeforeReading) {
TTestSetup s(ETestActorType::StreamLookup, "/Root/Table1");

ExecSQL(*s.Runtime, s.Sender, R"(
--!syntax_v1
CREATE TABLE `/Root/Table1` (Key uint64, Key2 uint64, Value uint64, PRIMARY KEY(Key, Key2));
)", false);

ExecSQL(*s.Runtime, s.Sender, R"(
REPLACE INTO `/Root/Table1` (Key, Key2, Value) VALUES
(1u, 1u, 1u),
(1u, 2u, 2u),
(1u, 3u, 3u),
(2147483648u, 4u, 4u);
)", true);

auto shards = s.Shards();
auto* shim = new TReadActorPipeCacheStub();

InterceptStreamLookupActorPipeCache(s.Runtime->Register(shim));
shim->SetupCapture(0, 1);
s.SendDataQuery(R"(
$data = AsList(
AsStruct(1u AS Key, 1u AS Value),
AsStruct(2147483648u AS Key, 2147483648u AS Value));

SELECT b.Value
FROM AS_TABLE($data) a
JOIN `/Root/Table1` b
ON a.Key = b.Key
ORDER BY b.Value ASC;
)");

shim->ReadsReceived.WaitI();
Cerr << "starting split -----------------------------------------------------------" << Endl;
s.Split(shards.at(0), 3);
Cerr << "resume evread -----------------------------------------------------------" << Endl;
shim->SkipAll();
shim->SendCaptured(s.Runtime);

s.AssertSuccess();
UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",1,2,3,4");
}

Y_UNIT_TEST(StreamLookupSplitAfterFirstResult) {
TTestSetup s(ETestActorType::StreamLookup, "/Root/TestIndex");

Expand Down Expand Up @@ -847,6 +916,61 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",1,2,3,4");
}

Y_UNIT_TEST(StreamLookupJoinSplitAfterFirstResult) {
TTestSetup s(ETestActorType::StreamLookup, "/Root/Table1");

ExecSQL(*s.Runtime, s.Sender, R"(
--!syntax_v1
CREATE TABLE `/Root/Table1` (Key uint64, Key2 uint64, Value uint64, PRIMARY KEY(Key, Key2));
)", false);

ExecSQL(*s.Runtime, s.Sender, R"(
REPLACE INTO `/Root/Table1` (Key, Key2, Value) VALUES
(1u, 1u, 1u),
(1u, 2u, 2u),
(1u, 3u, 3u),
(2147483648u, 4u, 4u);
)", true);

auto shards = s.Shards();

NKikimrTxDataShard::TEvRead evread;
evread.SetMaxRowsInResult(2);
evread.SetMaxRows(2);
SetDefaultReadSettings(evread);

NKikimrTxDataShard::TEvReadAck evreadack;
evreadack.SetMaxRows(2);
SetDefaultReadAckSettings(evreadack);

auto* shim = new TReadActorPipeCacheStub();
shim->SetupCapture(1, 1);
shim->SetupResultsCapture(1);
InterceptStreamLookupActorPipeCache(s.Runtime->Register(shim));
s.SendDataQuery(R"(
$data = AsList(
AsStruct(1u AS Key, 1u AS Value),
AsStruct(2147483648u AS Key, 2147483648u AS Value));

SELECT b.Value
FROM AS_TABLE($data) a
JOIN `/Root/Table1` b
ON a.Key = b.Key
ORDER BY b.Value ASC;
)");

shim->ReadsReceived.WaitI();
Cerr << "starting split -----------------------------------------------------------" << Endl;
s.Split(shards.at(0), 3);
Cerr << "resume evread -----------------------------------------------------------" << Endl;
shim->SkipAll();
shim->AllowResults();
shim->SendCaptured(s.Runtime);

s.AssertSuccess();
UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",1,2,3,4");
}

Y_UNIT_TEST(StreamLookupRetryAttemptForFinishedRead) {
TTestSetup s(ETestActorType::StreamLookup, "/Root/TestIndex");

Expand Down Expand Up @@ -902,6 +1026,58 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",1,2,3,4");
}

Y_UNIT_TEST(StreamLookupJoinRetryAttemptForFinishedRead) {
TTestSetup s(ETestActorType::StreamLookup, "/Root/Table1");

auto settings = MakeIntrusive<TIteratorReadBackoffSettings>();
settings->StartRetryDelay = TDuration::MilliSeconds(250);
settings->MaxShardAttempts = 4;
// set small read response timeout (for frequent retries)
settings->ReadResponseTimeout = TDuration::MilliSeconds(1);
SetReadIteratorBackoffSettings(settings);

ExecSQL(*s.Runtime, s.Sender, R"(
--!syntax_v1
CREATE TABLE `/Root/Table1` (Key uint64, Key2 uint64, Value uint64, PRIMARY KEY(Key, Key2));
)", false);

ExecSQL(*s.Runtime, s.Sender, R"(
REPLACE INTO `/Root/Table1` (Key, Key2, Value) VALUES
(1u, 1u, 10u);
)", true);

auto shards = s.Shards();
auto* shim = new TReadActorPipeCacheStub();

InterceptStreamLookupActorPipeCache(s.Runtime->Register(shim));
shim->SetupCapture(0, 1);

s.SendDataQuery(R"(
$data = AsList(
AsStruct(1u AS Key, 1u AS Value),
AsStruct(2147483648u AS Key, 2147483648u AS Value));

SELECT b.Value
FROM AS_TABLE($data) a
JOIN `/Root/Table1` b
ON a.Key = b.Key
ORDER BY b.Value ASC;
)");

shim->ReadsReceived.WaitI();

UNIT_ASSERT_EQUAL(shards.size(), 1);
auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true);

UNIT_ASSERT_EQUAL(shim->Captured.size(), 1);
s.Runtime->Send(shim->Captured[0]->Sender, s.Sender, undelivery.Release());

shim->SkipAll();

s.AssertSuccess();
UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",10");
}

Y_UNIT_TEST(StreamLookupDeliveryProblem) {
TTestSetup s(ETestActorType::StreamLookup, "/Root/TestIndex");

Expand Down Expand Up @@ -950,6 +1126,52 @@ Y_UNIT_TEST_SUITE(KqpSplit) {

}

Y_UNIT_TEST(StreamLookupJoinDeliveryProblem) {
TTestSetup s(ETestActorType::StreamLookup, "/Root/Table1");

ExecSQL(*s.Runtime, s.Sender, R"(
--!syntax_v1
CREATE TABLE `/Root/Table1` (Key uint64, Key2 uint64, Value uint64, PRIMARY KEY(Key, Key2));
)", false);

ExecSQL(*s.Runtime, s.Sender, R"(
REPLACE INTO `/Root/Table1` (Key, Key2, Value) VALUES
(1u, 1u, 10u);
)", true);

auto shards = s.Shards();
auto* shim = new TReadActorPipeCacheStub();

InterceptStreamLookupActorPipeCache(s.Runtime->Register(shim));
shim->SetupCapture(0, 1);

s.SendDataQuery(R"(
$data = AsList(
AsStruct(1u AS Key, 1u AS Value),
AsStruct(2147483648u AS Key, 2147483648u AS Value));

SELECT b.Value
FROM AS_TABLE($data) a
JOIN `/Root/Table1` b
ON a.Key = b.Key
ORDER BY b.Value ASC;
)");

shim->ReadsReceived.WaitI();

UNIT_ASSERT_EQUAL(shards.size(), 1);
auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true);

UNIT_ASSERT_EQUAL(shim->Captured.size(), 1);
s.Runtime->Send(shim->Captured[0]->Sender, s.Sender, undelivery.Release());

shim->SkipAll();
shim->SendCaptured(s.Runtime);

s.AssertSuccess();
UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, SortOrder::Ascending)), ",10");
}

// TODO: rework test for stream lookups
//Y_UNIT_TEST_SORT(AfterResolvePoints, Order) {
// TTestSetup s;
Expand Down