Skip to content

Commit 67cacf0

Browse files
htap locks (#9121)
1 parent c0d116a commit 67cacf0

File tree

2 files changed

+53
-23
lines changed

2 files changed

+53
-23
lines changed

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
271271
class TCommitOperation {
272272
private:
273273
const ui64 TabletId;
274+
bool HtapFormat = false;
274275

275276
public:
276277
using TPtr = std::shared_ptr<TCommitOperation>;
@@ -281,7 +282,7 @@ class TCommitOperation {
281282

282283
bool IsPrimary() const {
283284
AFL_VERIFY(NeedSyncLocks());
284-
return TabletId == *ReceivingShards.begin();
285+
return TabletId == ArbiterColumnShard;
285286
}
286287

287288
TCommitOperation(const ui64 tabletId)
@@ -293,13 +294,29 @@ class TCommitOperation {
293294
auto& locks = evWrite.Record.GetLocks();
294295
auto& lock = evWrite.Record.GetLocks().GetLocks()[0];
295296
SendingShards = std::set<ui64>(locks.GetSendingShards().begin(), locks.GetSendingShards().end());
296-
if ((ui32)locks.GetSendingShards().size() != SendingShards.size()) {
297-
return TConclusionStatus::Fail("duplications in SendingShards proto field");
298-
}
297+
SendingColumnShards = std::set<ui64>(locks.GetSendingColumnShards().begin(), locks.GetSendingColumnShards().end());
299298
ReceivingShards = std::set<ui64>(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end());
300-
if ((ui32)locks.GetReceivingShards().size() != ReceivingShards.size()) {
301-
return TConclusionStatus::Fail("duplications in ReceivingShards proto field");
299+
ReceivingColumnShards = std::set<ui64>(locks.GetReceivingColumnShards().begin(), locks.GetReceivingColumnShards().end());
300+
HtapFormat = locks.HasArbiterColumnShard();
301+
if (!ReceivingShards.size() || !SendingShards.size()) {
302+
ReceivingShards.clear();
303+
SendingShards.clear();
304+
} else if (!HtapFormat) {
305+
ArbiterColumnShard = *ReceivingShards.begin();
306+
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
307+
return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists");
308+
}
309+
} else {
310+
if (!ReceivingColumnShards.size() || !SendingColumnShards.size()) {
311+
return TConclusionStatus::Fail("empty sending/receiving lists for columnshards is incorrect case");
312+
}
313+
ArbiterColumnShard = locks.GetArbiterColumnShard();
314+
AFL_VERIFY(ArbiterColumnShard);
315+
if (!ReceivingColumnShards.contains(TabletId) && !SendingColumnShards.contains(TabletId)) {
316+
return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists");
317+
}
302318
}
319+
303320
TxId = evWrite.Record.GetTxId();
304321
LockId = lock.GetLockId();
305322
Generation = lock.GetGeneration();
@@ -313,26 +330,36 @@ class TCommitOperation {
313330
if (evWrite.Record.GetLocks().GetOp() != NKikimrDataEvents::TKqpLocks::Commit) {
314331
return TConclusionStatus::Fail("incorrect message type");
315332
}
316-
if (!ReceivingShards.size() || !SendingShards.size()) {
317-
ReceivingShards.clear();
318-
SendingShards.clear();
319-
} else {
320-
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
321-
return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists");
322-
}
323-
}
324333
return TConclusionStatus::Success();
325334
}
326335

327336
std::unique_ptr<NColumnShard::TEvWriteCommitSyncTransactionOperator> CreateTxOperator(
328337
const NKikimrTxColumnShard::ETransactionKind kind) const {
329338
AFL_VERIFY(ReceivingShards.size());
330-
if (IsPrimary()) {
331-
return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
332-
TFullTxInfo::BuildFake(kind), LockId, ReceivingShards, SendingShards);
339+
if (HtapFormat) {
340+
if (IsPrimary()) {
341+
std::set<ui64> fullReceiving = ReceivingShards;
342+
fullReceiving.insert(ReceivingColumnShards.begin(), ReceivingColumnShards.end());
343+
AFL_VERIFY(fullReceiving.size() + 1 == ReceivingShards.size() + ReceivingColumnShards.size());
344+
345+
std::set<ui64> fullSending = SendingShards;
346+
fullSending.insert(SendingColumnShards.begin(), SendingColumnShards.end());
347+
AFL_VERIFY(fullSending.size() + 1 == SendingShards.size() + SendingColumnShards.size());
348+
349+
return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
350+
TFullTxInfo::BuildFake(kind), LockId, fullReceiving, fullSending);
351+
} else {
352+
return std::make_unique<NColumnShard::TEvWriteCommitSecondaryTransactionOperator>(TFullTxInfo::BuildFake(kind), LockId,
353+
ArbiterColumnShard, ReceivingColumnShards.contains(TabletId));
354+
}
333355
} else {
334-
return std::make_unique<NColumnShard::TEvWriteCommitSecondaryTransactionOperator>(
335-
TFullTxInfo::BuildFake(kind), LockId, *ReceivingShards.begin(), ReceivingShards.contains(TabletId));
356+
if (IsPrimary()) {
357+
return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
358+
TFullTxInfo::BuildFake(kind), LockId, ReceivingShards, SendingShards);
359+
} else {
360+
return std::make_unique<NColumnShard::TEvWriteCommitSecondaryTransactionOperator>(TFullTxInfo::BuildFake(kind), LockId,
361+
ArbiterColumnShard, ReceivingShards.contains(TabletId));
362+
}
336363
}
337364
}
338365

@@ -343,6 +370,9 @@ class TCommitOperation {
343370
YDB_READONLY(ui64, TxId, 0);
344371
YDB_READONLY_DEF(std::set<ui64>, SendingShards);
345372
YDB_READONLY_DEF(std::set<ui64>, ReceivingShards);
373+
YDB_READONLY_DEF(std::set<ui64>, SendingColumnShards);
374+
YDB_READONLY_DEF(std::set<ui64>, ReceivingColumnShards);
375+
ui64 ArbiterColumnShard = 0;
346376
};
347377

348378
class TProposeWriteTransaction: public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
@@ -430,7 +460,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
430460
const auto source = ev->Sender;
431461
const auto cookie = ev->Cookie;
432462
const auto behaviourConclusion = TOperationsManager::GetBehaviour(*ev->Get());
433-
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString());
463+
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString());
434464
if (behaviourConclusion.IsFail()) {
435465
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
436466
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST,

ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,9 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
203203
for (auto&& i : ReceivingShards) {
204204
if (WaitShardsResultAck.contains(i)) {
205205
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
206-
new TEvPipeCache::TEvForward(
207-
new TEvTxProcessing::TEvReadSet(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), readSetData.SerializeAsString()), i,
208-
true),
206+
new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(TxInfo.PlanStep, GetTxId(), owner.TabletID(), i,
207+
owner.TabletID(), readSetData.SerializeAsString()),
208+
i, true),
209209
IEventHandle::FlagTrackDelivery, GetTxId());
210210
}
211211
}

0 commit comments

Comments
 (0)