@@ -2359,7 +2359,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
2359
2359
2360
2360
absl::flat_hash_set<ui64> sendingShardsSet;
2361
2361
absl::flat_hash_set<ui64> receivingShardsSet;
2362
- absl::flat_hash_set<ui64> sendingColumnShardsSet;
2363
2362
absl::flat_hash_set<ui64> receivingColumnShardsSet;
2364
2363
ui64 arbiter = 0 ;
2365
2364
std::optional<ui64> columnShardArbiter;
@@ -2382,31 +2381,20 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
2382
2381
}
2383
2382
2384
2383
for (auto & [shardId, tx] : evWriteTxs) {
2385
- if (ShardIdToTableInfo->at (shardId).IsOlap && HtapTx) {
2386
- if (tx->HasLocks ()) {
2387
- // Locks may be broken so shards with locks need to send readsets
2388
- sendingColumnShardsSet.insert (shardId);
2389
- }
2390
- if (ShardsWithEffects.contains (shardId)) {
2391
- // Volatile transactions may abort effects, so they send readsets
2392
- if (VolatileTx) {
2393
- sendingColumnShardsSet.insert (shardId);
2394
- }
2395
- // Effects are only applied when all locks are valid
2396
- receivingColumnShardsSet.insert (shardId);
2397
- }
2398
- } else {
2399
- if (tx->HasLocks ()) {
2400
- // Locks may be broken so shards with locks need to send readsets
2384
+ if (tx->HasLocks ()) {
2385
+ // Locks may be broken so shards with locks need to send readsets
2386
+ sendingShardsSet.insert (shardId);
2387
+ }
2388
+ if (ShardsWithEffects.contains (shardId)) {
2389
+ // Volatile transactions may abort effects, so they send readsets
2390
+ if (VolatileTx) {
2401
2391
sendingShardsSet.insert (shardId);
2402
2392
}
2403
- if (ShardsWithEffects.contains (shardId)) {
2404
- // Volatile transactions may abort effects, so they send readsets
2405
- if (VolatileTx) {
2406
- sendingShardsSet.insert (shardId);
2407
- }
2408
- // Effects are only applied when all locks are valid
2409
- receivingShardsSet.insert (shardId);
2393
+ // Effects are only applied when all locks are valid
2394
+ receivingShardsSet.insert (shardId);
2395
+
2396
+ if (HtapTx && ShardIdToTableInfo->at (shardId).IsOlap ) {
2397
+ receivingColumnShardsSet.insert (shardId);
2410
2398
}
2411
2399
}
2412
2400
}
@@ -2464,13 +2452,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
2464
2452
}
2465
2453
2466
2454
if (!receivingColumnShardsSet.empty ()) {
2455
+ AFL_ENSURE (HtapTx);
2467
2456
const ui32 index = RandomNumber<ui32>(receivingColumnShardsSet.size ());
2468
2457
auto arbiterIterator = std::begin (receivingColumnShardsSet);
2469
2458
std::advance (arbiterIterator, index);
2470
2459
columnShardArbiter = *arbiterIterator;
2471
-
2472
- sendingShardsSet.insert (*columnShardArbiter);
2473
- receivingShardsSet.insert (*columnShardArbiter);
2474
2460
}
2475
2461
}
2476
2462
@@ -2483,13 +2469,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
2483
2469
std::sort (sendingShards.begin (), sendingShards.end ());
2484
2470
std::sort (receivingShards.begin (), receivingShards.end ());
2485
2471
2486
- NProtoBuf::RepeatedField<ui64> sendingColumnShards (sendingColumnShardsSet.begin (), sendingColumnShardsSet.end ());
2487
- NProtoBuf::RepeatedField<ui64> receivingColumnShards (receivingColumnShardsSet.begin (), receivingColumnShardsSet.end ());
2488
-
2489
- std::sort (sendingColumnShards.begin (), sendingColumnShards.end ());
2490
- std::sort (receivingColumnShards.begin (), receivingColumnShards.end ());
2491
-
2492
2472
for (auto & [shardId, shardTx] : datashardTxs) {
2473
+ AFL_ENSURE (!columnShardArbiter);
2493
2474
shardTx->MutableLocks ()->SetOp (NKikimrDataEvents::TKqpLocks::Commit);
2494
2475
*shardTx->MutableLocks ()->MutableSendingShards () = sendingShards;
2495
2476
*shardTx->MutableLocks ()->MutableReceivingShards () = receivingShards;
@@ -2498,24 +2479,46 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
2498
2479
}
2499
2480
}
2500
2481
2501
- for (auto & [_ , tx] : evWriteTxs) {
2482
+ for (auto & [shardId , tx] : evWriteTxs) {
2502
2483
tx->MutableLocks ()->SetOp (NKikimrDataEvents::TKqpLocks::Commit);
2503
- *tx->MutableLocks ()->MutableSendingShards () = sendingShards;
2504
- *tx->MutableLocks ()->MutableReceivingShards () = receivingShards;
2505
- *tx->MutableLocks ()->MutableSendingColumnShards () = sendingColumnShards;
2506
- *tx->MutableLocks ()->MutableReceivingColumnShards () = receivingColumnShards;
2507
- if (arbiter) {
2508
- tx->MutableLocks ()->SetArbiterShard (arbiter);
2509
- }
2510
- if (columnShardArbiter) {
2484
+ if (columnShardArbiter && *columnShardArbiter == shardId) {
2511
2485
tx->MutableLocks ()->SetArbiterColumnShard (*columnShardArbiter);
2486
+ *tx->MutableLocks ()->MutableSendingShards () = sendingShards;
2487
+ *tx->MutableLocks ()->MutableReceivingShards () = receivingShards;
2488
+ } else if (columnShardArbiter) {
2489
+ tx->MutableLocks ()->SetArbiterColumnShard (*columnShardArbiter);
2490
+ tx->MutableLocks ()->AddSendingShards (*columnShardArbiter);
2491
+ tx->MutableLocks ()->AddReceivingShards (*columnShardArbiter);
2492
+ if (sendingShardsSet.contains (shardId)) {
2493
+ tx->MutableLocks ()->AddSendingShards (shardId);
2494
+ }
2495
+ if (receivingShardsSet.contains (shardId)) {
2496
+ tx->MutableLocks ()->AddReceivingShards (shardId);
2497
+ }
2498
+ } else {
2499
+ *tx->MutableLocks ()->MutableSendingShards () = sendingShards;
2500
+ *tx->MutableLocks ()->MutableReceivingShards () = receivingShards;
2501
+ if (arbiter) {
2502
+ tx->MutableLocks ()->SetArbiterShard (arbiter);
2503
+ }
2512
2504
}
2513
2505
}
2514
2506
2515
- for (auto & [_ , t] : topicTxs) {
2507
+ for (auto & [shardId , t] : topicTxs) {
2516
2508
t.tx .SetOp (NKikimrPQ::TDataTransaction::Commit);
2517
- *t.tx .MutableSendingShards () = sendingShards;
2518
- *t.tx .MutableReceivingShards () = receivingShards;
2509
+ if (columnShardArbiter) {
2510
+ t.tx .AddSendingShards (*columnShardArbiter);
2511
+ t.tx .AddReceivingShards (*columnShardArbiter);
2512
+ if (sendingShardsSet.contains (shardId)) {
2513
+ t.tx .AddSendingShards (shardId);
2514
+ }
2515
+ if (receivingShardsSet.contains (shardId)) {
2516
+ t.tx .AddReceivingShards (shardId);
2517
+ }
2518
+ } else {
2519
+ *t.tx .MutableSendingShards () = sendingShards;
2520
+ *t.tx .MutableReceivingShards () = receivingShards;
2521
+ }
2519
2522
YQL_ENSURE (!arbiter);
2520
2523
}
2521
2524
}
0 commit comments