Skip to content

Commit a38de79

Browse files
authored
[Stable-25-1] stream join fix (#20503)
2 parents e7493c1 + f116a81 commit a38de79

File tree

3 files changed

+167
-103
lines changed

3 files changed

+167
-103
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
278278
i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
279279
YQL_ENSURE(!batch.IsWide(), "Wide stream is not supported");
280280

281+
if (ResolveShardsInProgress) {
282+
finished = false;
283+
return 0;
284+
}
285+
281286
auto replyResultStats = StreamLookupWorker->ReplyResult(batch, freeSpace);
282287
ReadRowsCount += replyResultStats.ReadRowsCount;
283288
ReadBytesCount += replyResultStats.ReadBytesCount;
@@ -343,8 +348,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
343348
}
344349

345350
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
346-
ResoleShardsInProgress = false;
347351
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
352+
if (!ResolveShardsInProgress) {
353+
return;
354+
}
355+
ResolveShardsInProgress = false;
348356
if (ev->Get()->Request->ErrorCount > 0) {
349357
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
350358
<< StreamLookupWorker->GetTablePath();
@@ -360,6 +368,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
360368
Partitioning = resultSet[0].KeyDescription->Partitioning;
361369

362370
ProcessInputRows();
371+
372+
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
363373
}
364374

365375
void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
@@ -533,7 +543,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
533543
if (!Partitioning) {
534544
LookupActorStateSpan.EndError("timeout exceeded");
535545
CA_LOG_D("Retry attempt to resolve shards for table: " << StreamLookupWorker->GetTablePath());
536-
ResoleShardsInProgress = false;
546+
ResolveShardsInProgress = false;
537547
ResolveTableShards();
538548
}
539549
}
@@ -674,7 +684,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
674684
}
675685

676686
void ResolveTableShards() {
677-
if (ResoleShardsInProgress) {
687+
if (ResolveShardsInProgress) {
678688
return;
679689
}
680690

@@ -684,7 +694,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
684694
}
685695

686696
CA_LOG_D("Resolve shards for table: " << StreamLookupWorker->GetTablePath());
687-
ResoleShardsInProgress = true;
697+
ResolveShardsInProgress = true;
688698

689699
Partitioning.reset();
690700

@@ -758,7 +768,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
758768
ui64 ReadId = 0;
759769
size_t TotalRetryAttempts = 0;
760770
size_t TotalResolveShardsAttempts = 0;
761-
bool ResoleShardsInProgress = false;
771+
bool ResolveShardsInProgress = false;
762772

763773
// stats
764774
ui64 ReadRowsCount = 0;

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,14 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
479479
auto unprocessedRange = ranges[firstUnprocessedQuery];
480480
YQL_ENSURE(!unprocessedRange.Point);
481481

482-
unprocessedRanges.emplace_back(*lastProcessedKey, false,
482+
TOwnedTableRange range(*lastProcessedKey, false,
483483
unprocessedRange.GetOwnedTo(), unprocessedRange.InclusiveTo);
484+
485+
auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(range));
486+
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
487+
leftRowIt->second.PendingReads.erase(prevReadId);
488+
489+
unprocessedRanges.emplace_back(std::move(range));
484490
++firstUnprocessedQuery;
485491
}
486492

@@ -543,6 +549,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
543549

544550
auto partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), range);
545551
for (auto [shardId, range] : partitions) {
552+
YQL_ENSURE(PendingLeftRowsByKey.contains(ExtractKeyPrefix(range)));
553+
546554
if (range.Point) {
547555
pointsPerShard[shardId].push_back(std::move(range));
548556
} else {
@@ -581,7 +589,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
581589
std::vector <std::pair<ui64, TOwnedTableRange>> partitions;
582590
if (joinKey.size() < KeyColumns.size()) {
583591
// build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
584-
std::vector <TCell> fromCells(KeyColumns.size());
592+
std::vector<TCell> fromCells(KeyColumns.size() - joinKey.size());
585593
fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end());
586594
bool fromInclusive = true;
587595
bool toInclusive = false;
@@ -716,8 +724,14 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
716724
auto unprocessedRange = ranges[firstUnprocessedQuery];
717725
YQL_ENSURE(!unprocessedRange.Point);
718726

719-
UnprocessedKeys.emplace_back(*lastProcessedKey, false,
727+
TOwnedTableRange range(*lastProcessedKey, false,
720728
unprocessedRange.GetOwnedTo(), unprocessedRange.InclusiveTo);
729+
730+
auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(range));
731+
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
732+
leftRowIt->second.PendingReads.erase(readId);
733+
734+
UnprocessedKeys.emplace_back(std::move(range));
721735
++firstUnprocessedQuery;
722736
}
723737

ydb/core/kqp/ut/scan/kqp_scan_ut.cpp

Lines changed: 135 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -2371,101 +2371,6 @@ Y_UNIT_TEST_SUITE(KqpScan) {
23712371
}
23722372
}
23732373

2374-
Y_UNIT_TEST(StreamLookupTryGetDataBeforeSchemeInitialization) {
2375-
NKikimrConfig::TAppConfig appConfig;
2376-
2377-
TPortManager tp;
2378-
ui16 mbusport = tp.GetPort(2134);
2379-
auto settings = Tests::TServerSettings(mbusport)
2380-
.SetDomainName("Root")
2381-
.SetUseRealThreads(false)
2382-
.SetAppConfig(appConfig);
2383-
2384-
Tests::TServer::TPtr server = new Tests::TServer(settings);
2385-
2386-
auto runtime = server->GetRuntime();
2387-
auto sender = runtime->AllocateEdgeActor();
2388-
auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
2389-
2390-
InitRoot(server, sender);
2391-
2392-
std::vector<TAutoPtr<IEventHandle>> captured;
2393-
bool firstAttemptToGetData = false;
2394-
2395-
auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
2396-
if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvResolveKeySetResult::EventType) {
2397-
Cerr << "Captured TEvTxProxySchemeCache::TEvResolveKeySetResult from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
2398-
if (runtime->FindActorName(ev->GetRecipientRewrite()) == "KQP_STREAM_LOOKUP_ACTOR") {
2399-
if (!firstAttemptToGetData) {
2400-
// capture response from scheme cache until CA calls GetAsyncInputData()
2401-
captured.push_back(ev.Release());
2402-
return true;
2403-
}
2404-
2405-
for (auto ev : captured) {
2406-
runtime->Send(ev.Release());
2407-
}
2408-
}
2409-
} else if (ev->GetTypeRewrite() == NYql::NDq::IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::EventType) {
2410-
firstAttemptToGetData = true;
2411-
} else if (ev->GetTypeRewrite() == NKqp::TEvKqpExecuter::TEvStreamData::EventType) {
2412-
auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
2413-
Y_ASSERT(record.GetResultSet().rows().size() == 0);
2414-
2415-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
2416-
resp->Record.SetEnough(false);
2417-
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
2418-
return true;
2419-
}
2420-
2421-
return false;
2422-
};
2423-
2424-
auto createSession = [&]() {
2425-
runtime->Send(new IEventHandle(kqpProxy, sender, new TEvKqp::TEvCreateSessionRequest()));
2426-
auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvCreateSessionResponse>(sender);
2427-
auto record = reply->Get()->Record;
2428-
UNIT_ASSERT_VALUES_EQUAL(record.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
2429-
return record.GetResponse().GetSessionId();
2430-
};
2431-
2432-
auto createTable = [&](const TString& sessionId, const TString& queryText) {
2433-
auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
2434-
ev->Record.MutableRequest()->SetSessionId(sessionId);
2435-
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
2436-
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL);
2437-
ev->Record.MutableRequest()->SetQuery(queryText);
2438-
2439-
runtime->Send(new IEventHandle(kqpProxy, sender, ev.release()));
2440-
auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
2441-
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
2442-
};
2443-
2444-
auto sendQuery = [&](const TString& queryText) {
2445-
auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
2446-
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
2447-
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN);
2448-
ev->Record.MutableRequest()->SetQuery(queryText);
2449-
ev->Record.MutableRequest()->SetKeepSession(false);
2450-
ActorIdToProto(sender, ev->Record.MutableRequestActorId());
2451-
2452-
runtime->Send(new IEventHandle(kqpProxy, sender, ev.release()));
2453-
auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
2454-
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
2455-
};
2456-
2457-
createTable(createSession(), R"(
2458-
--!syntax_v1
2459-
CREATE TABLE `/Root/Table` (Key int32, Fk int32, Value int32, PRIMARY KEY(Key), INDEX Index GLOBAL ON (Fk));
2460-
)");
2461-
2462-
server->GetRuntime()->SetEventFilter(captureEvents);
2463-
2464-
sendQuery(R"(
2465-
SELECT Value FROM `/Root/Table` VIEW Index WHERE Fk IN AsList(1, 2, 3);
2466-
)");
2467-
}
2468-
24692374
Y_UNIT_TEST(LimitOverSecondaryIndexRead) {
24702375
NKikimrConfig::TAppConfig appConfig;
24712376
TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig));
@@ -2594,6 +2499,141 @@ Y_UNIT_TEST_SUITE(KqpScan) {
25942499
UNIT_ASSERT_VALUES_EQUAL(read["scan_by"].GetArray().size(), 1);
25952500
UNIT_ASSERT_VALUES_EQUAL(read["scan_by"][0], "Key [SomeString, SomeStrinh)");
25962501
}
2502+
2503+
Y_UNIT_TEST(StreamLookupFailedRead) {
2504+
NKikimrConfig::TAppConfig appConfig;
2505+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(true);
2506+
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetStartDelayMs(5);
2507+
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxDelayMs(10);
2508+
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxShardRetries(100);
2509+
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxShardResolves(100);
2510+
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetUnsertaintyRatio(0.5);
2511+
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMultiplier(2.0);
2512+
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxTotalRetries(100);
2513+
2514+
2515+
TPortManager tp;
2516+
ui16 mbusport = tp.GetPort(2134);
2517+
auto settings = Tests::TServerSettings(mbusport)
2518+
.SetDomainName("Root")
2519+
.SetUseRealThreads(false)
2520+
.SetAppConfig(appConfig);
2521+
2522+
Tests::TServer::TPtr server = new Tests::TServer(settings);
2523+
2524+
server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::EPriority::PRI_DEBUG);
2525+
2526+
auto runtime = server->GetRuntime();
2527+
auto sender = runtime->AllocateEdgeActor();
2528+
auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
2529+
2530+
InitRoot(server, sender);
2531+
2532+
std::vector<TAutoPtr<IEventHandle>> captured;
2533+
bool captureEvRead = true;
2534+
int arrivedDataCount = 0;
2535+
2536+
auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
2537+
if (ev->GetTypeRewrite() == NKikimr::TEvTxProxySchemeCache::TEvResolveKeySetResult::EventType) {
2538+
if (runtime->FindActorName(ev->GetRecipientRewrite()) == "KQP_STREAM_LOOKUP_ACTOR") {
2539+
if (!captureEvRead && arrivedDataCount < 2) {
2540+
// don't resolve
2541+
captured.push_back(ev.Release());
2542+
return true;
2543+
}
2544+
}
2545+
} if (ev->GetTypeRewrite() == NYql::NDq::IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::EventType) {
2546+
++arrivedDataCount;
2547+
2548+
if (captured.size() > 1 && arrivedDataCount > 2) {
2549+
auto resp = captured.back();
2550+
captured.pop_back();
2551+
runtime->Send(resp.Release());
2552+
}
2553+
} else if (ev->GetTypeRewrite() == NKqp::TEvKqpExecuter::TEvStreamData::EventType) {
2554+
auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
2555+
Y_ASSERT(record.GetResultSet().rows().size() == 0);
2556+
2557+
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
2558+
resp->Record.SetEnough(false);
2559+
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
2560+
return true;
2561+
} else if (ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvRead::EventType) {
2562+
if (captureEvRead) {
2563+
auto& record = ev->Get<NKikimr::TEvDataShard::TEvRead>()->Record;
2564+
auto resp = MakeHolder<NKikimr::TEvDataShard::TEvReadResult>();
2565+
resp->Record.SetReadId(record.GetReadId());
2566+
resp->Record.MutableStatus()->SetCode(Ydb::StatusIds::NOT_FOUND);
2567+
2568+
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
2569+
2570+
captured.push_back(ev.Release());
2571+
2572+
captureEvRead = false;
2573+
return true;
2574+
}
2575+
}
2576+
2577+
return false;
2578+
};
2579+
2580+
auto createSession = [&]() {
2581+
runtime->Send(new IEventHandle(kqpProxy, sender, new TEvKqp::TEvCreateSessionRequest()));
2582+
auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvCreateSessionResponse>(sender);
2583+
auto record = reply->Get()->Record;
2584+
UNIT_ASSERT_VALUES_EQUAL(record.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
2585+
return record.GetResponse().GetSessionId();
2586+
};
2587+
2588+
auto createTable = [&](const TString& sessionId, const TString& queryText) {
2589+
auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
2590+
ev->Record.MutableRequest()->SetSessionId(sessionId);
2591+
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
2592+
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL);
2593+
ev->Record.MutableRequest()->SetQuery(queryText);
2594+
2595+
runtime->Send(new IEventHandle(kqpProxy, sender, ev.release()));
2596+
auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
2597+
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
2598+
};
2599+
2600+
auto sendQuery = [&](const TString& queryText) {
2601+
auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
2602+
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
2603+
ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
2604+
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
2605+
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
2606+
ev->Record.MutableRequest()->SetQuery(queryText);
2607+
ev->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
2608+
ActorIdToProto(sender, ev->Record.MutableRequestActorId());
2609+
2610+
runtime->Send(new IEventHandle(kqpProxy, sender, ev.release()));
2611+
auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
2612+
UNIT_ASSERT_VALUES_EQUAL_C(
2613+
reply->Get()->Record.GetYdbStatus(),
2614+
Ydb::StatusIds::SUCCESS,
2615+
reply->Get()->Record.GetResponse().DebugString());
2616+
};
2617+
2618+
createTable(createSession(), R"(
2619+
--!syntax_v1
2620+
CREATE TABLE `/Root/Table1` (Key uint32, Value uint32, PRIMARY KEY(Key))
2621+
WITH (UNIFORM_PARTITIONS = 64, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64);
2622+
)");
2623+
2624+
server->GetRuntime()->SetEventFilter(captureEvents);
2625+
2626+
sendQuery(R"(
2627+
$data = AsList(
2628+
AsStruct(1u AS Key, 1u AS Value),
2629+
AsStruct(2147483648u AS Key, 2147483648u AS Value));
2630+
2631+
SELECT a.Value, b.Value
2632+
FROM AS_TABLE($data) a
2633+
JOIN `/Root/Table1` b
2634+
ON a.Key = b.Key;
2635+
)");
2636+
}
25972637
}
25982638

25992639

0 commit comments

Comments
 (0)