@@ -1354,6 +1354,7 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) {
1354
1354
TPortManager pm;
1355
1355
NKikimrConfig::TAppConfig app;
1356
1356
app.MutableTableServiceConfig ()->SetEnableKqpDataQueryStreamLookup (StreamLookup);
1357
+ app.MutableTableServiceConfig ()->SetEnableOltpSink (EvWrite);
1357
1358
TServerSettings serverSettings (pm.GetPort (2134 ));
1358
1359
serverSettings.SetDomainName (" Root" )
1359
1360
.SetUseRealThreads (false )
@@ -1374,9 +1375,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) {
1374
1375
auto [shards2, tableId2] = CreateShardedTable (server, sender, " /Root" , " table-2" , 1 );
1375
1376
1376
1377
{
1377
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {1 , 1 }}, {tableId2, {2 , 1 }}} : TEvWriteRows{};
1378
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite (runtime, rows);
1379
-
1380
1378
ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);" ));
1381
1379
ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);" ));
1382
1380
}
@@ -1407,16 +1405,11 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderReadOnlyAllowed, StreamLookup, EvWrite) {
1407
1405
};
1408
1406
auto prevObserverFunc = runtime.SetObserverFunc (captureRS);
1409
1407
1410
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {3 , 2 }}, {tableId2, {4 , 2 }}} : TEvWriteRows{};
1411
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite (runtime, rows);
1412
-
1413
1408
// Send a commit request, it would block on readset exchange
1414
1409
auto f2 = SendRequest (runtime, MakeSimpleRequestRPC (Q_ (R"(
1415
1410
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2);
1416
1411
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))" ), sessionId, txId, true ));
1417
1412
1418
- evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{};
1419
-
1420
1413
// Wait until we captured both readsets
1421
1414
const size_t expectedReadSets = usesVolatileTxs ? 4 : 2 ;
1422
1415
{
@@ -1467,6 +1460,7 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
1467
1460
TPortManager pm;
1468
1461
NKikimrConfig::TAppConfig app;
1469
1462
app.MutableTableServiceConfig ()->SetEnableKqpDataQueryStreamLookup (StreamLookup);
1463
+ app.MutableTableServiceConfig ()->SetEnableOltpSink (EvWrite);
1470
1464
TServerSettings serverSettings (pm.GetPort (2134 ));
1471
1465
serverSettings.SetDomainName (" Root" )
1472
1466
.SetAppConfig (app)
@@ -1487,9 +1481,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
1487
1481
auto [shards2, tableId2] = CreateShardedTable (server, sender, " /Root" , " table-2" , 1 );
1488
1482
1489
1483
{
1490
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {1 , 1 }}, {tableId2, {2 , 1 }}} : TEvWriteRows{};
1491
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite (runtime, rows);
1492
-
1493
1484
ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);" ));
1494
1485
ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);" ));
1495
1486
}
@@ -1521,16 +1512,11 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
1521
1512
};
1522
1513
auto prevObserverFunc = runtime.SetObserverFunc (captureRS);
1523
1514
1524
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {3 , 2 }}, {tableId2, {4 , 2 }}} : TEvWriteRows{};
1525
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite (runtime, rows);
1526
-
1527
1515
// Send a commit request, it would block on readset exchange
1528
1516
auto f2 = SendRequest (runtime, MakeSimpleRequestRPC (Q_ (R"(
1529
1517
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2);
1530
1518
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))" ), sessionId, txId, true ));
1531
1519
1532
- evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{};
1533
-
1534
1520
// Wait until we captured both readsets
1535
1521
const size_t expectedReadSets = usesVolatileTxs ? 4 : 2 ;
1536
1522
if (readSets.size () < expectedReadSets) {
@@ -1545,9 +1531,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
1545
1531
1546
1532
// Now send non-conflicting upsert to both tables
1547
1533
{
1548
- auto rows1 = EvWrite ? TEvWriteRows{{tableId1, {5 , 3 }}, {tableId2, {6 , 3 }}} : TEvWriteRows{};
1549
- auto evWriteObservers1 = ReplaceEvProposeTransactionWithEvWrite (runtime, rows1);
1550
-
1551
1534
blockReadSets = false ; // needed for volatile transactions
1552
1535
auto result = KqpSimpleExec (runtime, Q_ (R"(
1553
1536
UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3);
@@ -1558,9 +1541,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNonConflictingWrites, StreamLookup, EvWrite) {
1558
1541
1559
1542
// Check that immediate non-conflicting upsert is working too
1560
1543
{
1561
- auto rows1 = EvWrite ? TEvWriteRows{{tableId1, {7 , 4 }}} : TEvWriteRows{};
1562
- auto evWriteObservers1 = ReplaceEvProposeTransactionWithEvWrite (runtime, rows1);
1563
-
1564
1544
auto result = KqpSimpleExec (runtime, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 4)" ));
1565
1545
UNIT_ASSERT_VALUES_EQUAL (result, " <empty>" );
1566
1546
}
@@ -2970,6 +2950,7 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri
2970
2950
TPortManager pm;
2971
2951
NKikimrConfig::TAppConfig app;
2972
2952
app.MutableTableServiceConfig ()->SetEnableKqpDataQueryStreamLookup (StreamLookup);
2953
+ app.MutableTableServiceConfig ()->SetEnableOltpSink (EvWrite);
2973
2954
TServerSettings serverSettings (pm.GetPort (2134 ));
2974
2955
serverSettings.SetDomainName (" Root" )
2975
2956
.SetUseRealThreads (false )
@@ -2990,9 +2971,6 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri
2990
2971
auto [shards2, tableId2] = CreateShardedTable (server, sender, " /Root" , " table-2" , 1 );
2991
2972
2992
2973
{
2993
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {1 , 1 }}, {tableId2, {2 , 1 }}} : TEvWriteRows{};
2994
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite (runtime, rows);
2995
-
2996
2974
Cerr << " ===== UPSERT initial rows" << Endl;
2997
2975
2998
2976
ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)" ));
@@ -3042,9 +3020,6 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri
3042
3020
};
3043
3021
auto prevObserverFunc = runtime.SetObserverFunc (captureRS);
3044
3022
3045
- auto rows = EvWrite ? TEvWriteRows{{tableId1, {3 , 2 }}, {tableId2, {4 , 2 }}} : TEvWriteRows{};
3046
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite (runtime, rows);
3047
-
3048
3023
Cerr << " ===== UPSERT and commit" << Endl;
3049
3024
3050
3025
// Send a commit request, it would block on readset exchange
@@ -3070,8 +3045,6 @@ Y_UNIT_TEST_QUAD(TestShardRestartPlannedCommitShouldSucceed, StreamLookup, EvWri
3070
3045
UNIT_ASSERT_VALUES_EQUAL (response.operation ().status (), Ydb::StatusIds::SUCCESS);
3071
3046
}
3072
3047
3073
- evWriteObservers = TTestActorRuntimeBase::TEventObserverHolderPair{};
3074
-
3075
3048
// Select key 3 and verify its value was updated
3076
3049
{
3077
3050
Cerr << " ===== Last SELECT" << Endl;
@@ -3291,10 +3264,13 @@ Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) {
3291
3264
}
3292
3265
3293
3266
Y_UNIT_TEST_TWIN (TestSnapshotReadAfterBrokenLock, EvWrite) {
3267
+ NKikimrConfig::TAppConfig app;
3268
+ app.MutableTableServiceConfig ()->SetEnableOltpSink (EvWrite);
3294
3269
TPortManager pm;
3295
3270
TServerSettings serverSettings (pm.GetPort (2134 ));
3296
3271
serverSettings.SetDomainName (" Root" )
3297
- .SetUseRealThreads (false );
3272
+ .SetUseRealThreads (false )
3273
+ .SetAppConfig (app);
3298
3274
3299
3275
Tests::TServer::TPtr server = new TServer (serverSettings);
3300
3276
auto &runtime = *server->GetRuntime ();
@@ -3305,9 +3281,6 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadAfterBrokenLock, EvWrite) {
3305
3281
CreateShardedTable (server, sender, " /Root" , " table-1" , 1 );
3306
3282
CreateShardedTable (server, sender, " /Root" , " table-2" , 1 );
3307
3283
3308
- auto rows = EvWrite ? TEvWriteRows{{{1 , 1 }}, {{2 , 2 }}, {{3 , 3 }}, {{5 , 5 }}} : TEvWriteRows{};
3309
- auto evWriteObservers = ReplaceEvProposeTransactionWithEvWrite (runtime, rows);
3310
-
3311
3284
ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)" ));
3312
3285
ExecSQL (server, sender, Q_ (" UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)" ));
3313
3286
0 commit comments