@@ -2371,101 +2371,6 @@ Y_UNIT_TEST_SUITE(KqpScan) {
2371
2371
}
2372
2372
}
2373
2373
2374
- Y_UNIT_TEST (StreamLookupTryGetDataBeforeSchemeInitialization) {
2375
- NKikimrConfig::TAppConfig appConfig;
2376
-
2377
- TPortManager tp;
2378
- ui16 mbusport = tp.GetPort (2134 );
2379
- auto settings = Tests::TServerSettings (mbusport)
2380
- .SetDomainName (" Root" )
2381
- .SetUseRealThreads (false )
2382
- .SetAppConfig (appConfig);
2383
-
2384
- Tests::TServer::TPtr server = new Tests::TServer (settings);
2385
-
2386
- auto runtime = server->GetRuntime ();
2387
- auto sender = runtime->AllocateEdgeActor ();
2388
- auto kqpProxy = MakeKqpProxyID (runtime->GetNodeId (0 ));
2389
-
2390
- InitRoot (server, sender);
2391
-
2392
- std::vector<TAutoPtr<IEventHandle>> captured;
2393
- bool firstAttemptToGetData = false ;
2394
-
2395
- auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
2396
- if (ev->GetTypeRewrite () == TEvTxProxySchemeCache::TEvResolveKeySetResult::EventType) {
2397
- Cerr << " Captured TEvTxProxySchemeCache::TEvResolveKeySetResult from " << runtime->FindActorName (ev->Sender ) << " to " << runtime->FindActorName (ev->GetRecipientRewrite ()) << Endl;
2398
- if (runtime->FindActorName (ev->GetRecipientRewrite ()) == " KQP_STREAM_LOOKUP_ACTOR" ) {
2399
- if (!firstAttemptToGetData) {
2400
- // capture response from scheme cache until CA calls GetAsyncInputData()
2401
- captured.push_back (ev.Release ());
2402
- return true ;
2403
- }
2404
-
2405
- for (auto ev : captured) {
2406
- runtime->Send (ev.Release ());
2407
- }
2408
- }
2409
- } else if (ev->GetTypeRewrite () == NYql::NDq::IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::EventType) {
2410
- firstAttemptToGetData = true ;
2411
- } else if (ev->GetTypeRewrite () == NKqp::TEvKqpExecuter::TEvStreamData::EventType) {
2412
- auto & record = ev->Get <NKqp::TEvKqpExecuter::TEvStreamData>()->Record ;
2413
- Y_ASSERT (record.GetResultSet ().rows ().size () == 0 );
2414
-
2415
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo (), record.GetChannelId ());
2416
- resp->Record .SetEnough (false );
2417
- runtime->Send (new IEventHandle (ev->Sender , sender, resp.Release ()));
2418
- return true ;
2419
- }
2420
-
2421
- return false ;
2422
- };
2423
-
2424
- auto createSession = [&]() {
2425
- runtime->Send (new IEventHandle (kqpProxy, sender, new TEvKqp::TEvCreateSessionRequest ()));
2426
- auto reply = runtime->GrabEdgeEventRethrow <TEvKqp::TEvCreateSessionResponse>(sender);
2427
- auto record = reply->Get ()->Record ;
2428
- UNIT_ASSERT_VALUES_EQUAL (record.GetYdbStatus (), Ydb::StatusIds::SUCCESS);
2429
- return record.GetResponse ().GetSessionId ();
2430
- };
2431
-
2432
- auto createTable = [&](const TString& sessionId, const TString& queryText) {
2433
- auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
2434
- ev->Record .MutableRequest ()->SetSessionId (sessionId);
2435
- ev->Record .MutableRequest ()->SetAction (NKikimrKqp::QUERY_ACTION_EXECUTE);
2436
- ev->Record .MutableRequest ()->SetType (NKikimrKqp::QUERY_TYPE_SQL_DDL);
2437
- ev->Record .MutableRequest ()->SetQuery (queryText);
2438
-
2439
- runtime->Send (new IEventHandle (kqpProxy, sender, ev.release ()));
2440
- auto reply = runtime->GrabEdgeEventRethrow <TEvKqp::TEvQueryResponse>(sender);
2441
- UNIT_ASSERT_VALUES_EQUAL (reply->Get ()->Record .GetYdbStatus (), Ydb::StatusIds::SUCCESS);
2442
- };
2443
-
2444
- auto sendQuery = [&](const TString& queryText) {
2445
- auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
2446
- ev->Record .MutableRequest ()->SetAction (NKikimrKqp::QUERY_ACTION_EXECUTE);
2447
- ev->Record .MutableRequest ()->SetType (NKikimrKqp::QUERY_TYPE_SQL_SCAN);
2448
- ev->Record .MutableRequest ()->SetQuery (queryText);
2449
- ev->Record .MutableRequest ()->SetKeepSession (false );
2450
- ActorIdToProto (sender, ev->Record .MutableRequestActorId ());
2451
-
2452
- runtime->Send (new IEventHandle (kqpProxy, sender, ev.release ()));
2453
- auto reply = runtime->GrabEdgeEventRethrow <TEvKqp::TEvQueryResponse>(sender);
2454
- UNIT_ASSERT_VALUES_EQUAL (reply->Get ()->Record .GetYdbStatus (), Ydb::StatusIds::SUCCESS);
2455
- };
2456
-
2457
- createTable (createSession (), R"(
2458
- --!syntax_v1
2459
- CREATE TABLE `/Root/Table` (Key int32, Fk int32, Value int32, PRIMARY KEY(Key), INDEX Index GLOBAL ON (Fk));
2460
- )" );
2461
-
2462
- server->GetRuntime ()->SetEventFilter (captureEvents);
2463
-
2464
- sendQuery (R"(
2465
- SELECT Value FROM `/Root/Table` VIEW Index WHERE Fk IN AsList(1, 2, 3);
2466
- )" );
2467
- }
2468
-
2469
2374
Y_UNIT_TEST (LimitOverSecondaryIndexRead) {
2470
2375
NKikimrConfig::TAppConfig appConfig;
2471
2376
TKikimrRunner kikimr (TKikimrSettings ().SetAppConfig (appConfig));
@@ -2594,6 +2499,141 @@ Y_UNIT_TEST_SUITE(KqpScan) {
2594
2499
UNIT_ASSERT_VALUES_EQUAL (read[" scan_by" ].GetArray ().size (), 1 );
2595
2500
UNIT_ASSERT_VALUES_EQUAL (read[" scan_by" ][0 ], " Key [SomeString, SomeStrinh)" );
2596
2501
}
2502
+
2503
+ Y_UNIT_TEST (StreamLookupFailedRead) {
2504
+ NKikimrConfig::TAppConfig appConfig;
2505
+ appConfig.MutableTableServiceConfig ()->SetEnableKqpDataQueryStreamIdxLookupJoin (true );
2506
+ appConfig.MutableTableServiceConfig ()->MutableIteratorReadsRetrySettings ()->SetStartDelayMs (5 );
2507
+ appConfig.MutableTableServiceConfig ()->MutableIteratorReadsRetrySettings ()->SetMaxDelayMs (10 );
2508
+ appConfig.MutableTableServiceConfig ()->MutableIteratorReadsRetrySettings ()->SetMaxShardRetries (100 );
2509
+ appConfig.MutableTableServiceConfig ()->MutableIteratorReadsRetrySettings ()->SetMaxShardResolves (100 );
2510
+ appConfig.MutableTableServiceConfig ()->MutableIteratorReadsRetrySettings ()->SetUnsertaintyRatio (0.5 );
2511
+ appConfig.MutableTableServiceConfig ()->MutableIteratorReadsRetrySettings ()->SetMultiplier (2.0 );
2512
+ appConfig.MutableTableServiceConfig ()->MutableIteratorReadsRetrySettings ()->SetMaxTotalRetries (100 );
2513
+
2514
+
2515
+ TPortManager tp;
2516
+ ui16 mbusport = tp.GetPort (2134 );
2517
+ auto settings = Tests::TServerSettings (mbusport)
2518
+ .SetDomainName (" Root" )
2519
+ .SetUseRealThreads (false )
2520
+ .SetAppConfig (appConfig);
2521
+
2522
+ Tests::TServer::TPtr server = new Tests::TServer (settings);
2523
+
2524
+ server->GetRuntime ()->SetLogPriority (NKikimrServices::KQP_COMPUTE, NActors::NLog::EPriority::PRI_DEBUG);
2525
+
2526
+ auto runtime = server->GetRuntime ();
2527
+ auto sender = runtime->AllocateEdgeActor ();
2528
+ auto kqpProxy = MakeKqpProxyID (runtime->GetNodeId (0 ));
2529
+
2530
+ InitRoot (server, sender);
2531
+
2532
+ std::vector<TAutoPtr<IEventHandle>> captured;
2533
+ bool captureEvRead = true ;
2534
+ int arrivedDataCount = 0 ;
2535
+
2536
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
2537
+ if (ev->GetTypeRewrite () == NKikimr::TEvTxProxySchemeCache::TEvResolveKeySetResult::EventType) {
2538
+ if (runtime->FindActorName (ev->GetRecipientRewrite ()) == " KQP_STREAM_LOOKUP_ACTOR" ) {
2539
+ if (!captureEvRead && arrivedDataCount < 2 ) {
2540
+ // don't resolve
2541
+ captured.push_back (ev.Release ());
2542
+ return true ;
2543
+ }
2544
+ }
2545
+ } if (ev->GetTypeRewrite () == NYql::NDq::IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::EventType) {
2546
+ ++arrivedDataCount;
2547
+
2548
+ if (captured.size () > 1 && arrivedDataCount > 2 ) {
2549
+ auto resp = captured.back ();
2550
+ captured.pop_back ();
2551
+ runtime->Send (resp.Release ());
2552
+ }
2553
+ } else if (ev->GetTypeRewrite () == NKqp::TEvKqpExecuter::TEvStreamData::EventType) {
2554
+ auto & record = ev->Get <NKqp::TEvKqpExecuter::TEvStreamData>()->Record ;
2555
+ Y_ASSERT (record.GetResultSet ().rows ().size () == 0 );
2556
+
2557
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo (), record.GetChannelId ());
2558
+ resp->Record .SetEnough (false );
2559
+ runtime->Send (new IEventHandle (ev->Sender , sender, resp.Release ()));
2560
+ return true ;
2561
+ } else if (ev->GetTypeRewrite () == NKikimr::TEvDataShard::TEvRead::EventType) {
2562
+ if (captureEvRead) {
2563
+ auto & record = ev->Get <NKikimr::TEvDataShard::TEvRead>()->Record ;
2564
+ auto resp = MakeHolder<NKikimr::TEvDataShard::TEvReadResult>();
2565
+ resp->Record .SetReadId (record.GetReadId ());
2566
+ resp->Record .MutableStatus ()->SetCode (Ydb::StatusIds::NOT_FOUND);
2567
+
2568
+ runtime->Send (new IEventHandle (ev->Sender , sender, resp.Release ()));
2569
+
2570
+ captured.push_back (ev.Release ());
2571
+
2572
+ captureEvRead = false ;
2573
+ return true ;
2574
+ }
2575
+ }
2576
+
2577
+ return false ;
2578
+ };
2579
+
2580
+ auto createSession = [&]() {
2581
+ runtime->Send (new IEventHandle (kqpProxy, sender, new TEvKqp::TEvCreateSessionRequest ()));
2582
+ auto reply = runtime->GrabEdgeEventRethrow <TEvKqp::TEvCreateSessionResponse>(sender);
2583
+ auto record = reply->Get ()->Record ;
2584
+ UNIT_ASSERT_VALUES_EQUAL (record.GetYdbStatus (), Ydb::StatusIds::SUCCESS);
2585
+ return record.GetResponse ().GetSessionId ();
2586
+ };
2587
+
2588
+ auto createTable = [&](const TString& sessionId, const TString& queryText) {
2589
+ auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
2590
+ ev->Record .MutableRequest ()->SetSessionId (sessionId);
2591
+ ev->Record .MutableRequest ()->SetAction (NKikimrKqp::QUERY_ACTION_EXECUTE);
2592
+ ev->Record .MutableRequest ()->SetType (NKikimrKqp::QUERY_TYPE_SQL_DDL);
2593
+ ev->Record .MutableRequest ()->SetQuery (queryText);
2594
+
2595
+ runtime->Send (new IEventHandle (kqpProxy, sender, ev.release ()));
2596
+ auto reply = runtime->GrabEdgeEventRethrow <TEvKqp::TEvQueryResponse>(sender);
2597
+ UNIT_ASSERT_VALUES_EQUAL (reply->Get ()->Record .GetYdbStatus (), Ydb::StatusIds::SUCCESS);
2598
+ };
2599
+
2600
+ auto sendQuery = [&](const TString& queryText) {
2601
+ auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
2602
+ ev->Record .MutableRequest ()->MutableTxControl ()->mutable_begin_tx ()->mutable_serializable_read_write ();
2603
+ ev->Record .MutableRequest ()->MutableTxControl ()->set_commit_tx (true );
2604
+ ev->Record .MutableRequest ()->SetAction (NKikimrKqp::QUERY_ACTION_EXECUTE);
2605
+ ev->Record .MutableRequest ()->SetType (NKikimrKqp::QUERY_TYPE_SQL_DML);
2606
+ ev->Record .MutableRequest ()->SetQuery (queryText);
2607
+ ev->Record .MutableRequest ()->SetUsePublicResponseDataFormat (true );
2608
+ ActorIdToProto (sender, ev->Record .MutableRequestActorId ());
2609
+
2610
+ runtime->Send (new IEventHandle (kqpProxy, sender, ev.release ()));
2611
+ auto reply = runtime->GrabEdgeEventRethrow <TEvKqp::TEvQueryResponse>(sender);
2612
+ UNIT_ASSERT_VALUES_EQUAL_C (
2613
+ reply->Get ()->Record .GetYdbStatus (),
2614
+ Ydb::StatusIds::SUCCESS,
2615
+ reply->Get ()->Record .GetResponse ().DebugString ());
2616
+ };
2617
+
2618
+ createTable (createSession (), R"(
2619
+ --!syntax_v1
2620
+ CREATE TABLE `/Root/Table1` (Key uint32, Value uint32, PRIMARY KEY(Key))
2621
+ WITH (UNIFORM_PARTITIONS = 64, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64);
2622
+ )" );
2623
+
2624
+ server->GetRuntime ()->SetEventFilter (captureEvents);
2625
+
2626
+ sendQuery (R"(
2627
+ $data = AsList(
2628
+ AsStruct(1u AS Key, 1u AS Value),
2629
+ AsStruct(2147483648u AS Key, 2147483648u AS Value));
2630
+
2631
+ SELECT a.Value, b.Value
2632
+ FROM AS_TABLE($data) a
2633
+ JOIN `/Root/Table1` b
2634
+ ON a.Key = b.Key;
2635
+ )" );
2636
+ }
2597
2637
}
2598
2638
2599
2639
0 commit comments