Skip to content

Commit 19c4e7f

Browse files
authored
Use read-only TEvGetBlock requests for lease confirmations (#17479)
1 parent 68d27f2 commit 19c4e7f

File tree

7 files changed

+233
-48
lines changed

7 files changed

+233
-48
lines changed

ydb/core/base/tablet.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ struct TEvTablet {
6464
EvPromoteToLeader,
6565
EvFGcAck, // from user tablet to follower
6666
EvLeaseDropped,
67+
EvConfirmLeader, // from user tablet to sys tablet
68+
EvConfirmLeaderResult, // from sys tablet to user tablet
6769

6870
EvTabletDead = EvBoot + 1024,
6971
EvFollowerUpdateState, // notifications to guardian
@@ -882,6 +884,30 @@ struct TEvTablet {
882884
Record.SetTabletID(tabletId);
883885
}
884886
};
887+
888+
struct TEvConfirmLeader : TEventLocal<TEvConfirmLeader, EvConfirmLeader> {
889+
ui64 TabletID;
890+
ui32 Generation;
891+
892+
TEvConfirmLeader(ui64 tabletId, ui32 generation)
893+
: TabletID(tabletId)
894+
, Generation(generation)
895+
{}
896+
};
897+
898+
struct TEvConfirmLeaderResult : TEventLocal<TEvConfirmLeaderResult, EvConfirmLeaderResult> {
899+
ui64 TabletID;
900+
ui32 Generation;
901+
NKikimrProto::EReplyStatus Status;
902+
TString ErrorReason;
903+
904+
TEvConfirmLeaderResult(ui64 tabletId, ui32 generation, NKikimrProto::EReplyStatus status = NKikimrProto::OK, TString errorReason = {})
905+
: TabletID(tabletId)
906+
, Generation(generation)
907+
, Status(status)
908+
, ErrorReason(std::move(errorReason))
909+
{}
910+
};
885911
};
886912

887913
IActor* CreateTabletKiller(ui64 tabletId, ui32 nodeId = 0, ui32 maxGeneration = Max<ui32>());

ydb/core/tablet/tablet_sys.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,56 @@ void TTablet::Handle(TEvTablet::TEvCommit::TPtr &ev) {
11021102
HandleNext(ev);
11031103
}
11041104

1105+
void TTablet::Handle(TEvTablet::TEvConfirmLeader::TPtr &ev) {
1106+
auto *msg = ev->Get();
1107+
1108+
Y_ABORT_UNLESS(msg->TabletID == TabletID() && msg->Generation == StateStorageInfo.KnownGeneration,
1109+
"ev.TabletID=%ld, tablet=%ld, ev.Generation=%d, KnownGeneration=%d",
1110+
msg->TabletID, TabletID(), msg->Generation, StateStorageInfo.KnownGeneration);
1111+
1112+
ui64 reqId = ++ConfirmLeaderCounter;
1113+
ConfirmLeaderRequests[reqId] = { ev->Sender, ev->Cookie };
1114+
1115+
const auto *channelInfo = Info->ChannelInfo(0);
1116+
Y_ABORT_UNLESS(channelInfo, "Unexpected failure to find group for channel 0");
1117+
const auto *x = channelInfo->LatestEntry();
1118+
Y_ABORT_UNLESS(x->FromGeneration <= StateStorageInfo.KnownGeneration);
1119+
1120+
SendToBSProxy(ActorContext(), x->GroupID, new TEvBlobStorage::TEvGetBlock(TabletID(), TInstant::Max()), reqId);
1121+
}
1122+
1123+
void TTablet::Handle(TEvBlobStorage::TEvGetBlockResult::TPtr &ev) {
1124+
auto *msg = ev->Get();
1125+
1126+
auto it = ConfirmLeaderRequests.find(ev->Cookie);
1127+
if (it == ConfirmLeaderRequests.end()) {
1128+
BLOG_ERROR("Unexpected TEvGetBlockResult with cookie " << ev->Cookie << " without a pending request", "TSYS33");
1129+
return;
1130+
}
1131+
1132+
auto status = msg->Status;
1133+
if (status == NKikimrProto::OK && msg->BlockedGeneration >= StateStorageInfo.KnownGeneration) {
1134+
status = NKikimrProto::BLOCKED;
1135+
}
1136+
1137+
if (status != NKikimrProto::OK) {
1138+
// We want to stop after current graph is committed
1139+
if (BlobStorageStatus == NKikimrProto::OK) {
1140+
BlobStorageStatus = status;
1141+
BlobStorageErrorStep = Graph.NextEntry;
1142+
BlobStorageErrorReason = std::move(msg->ErrorReason);
1143+
}
1144+
1145+
CheckBlobStorageError();
1146+
return;
1147+
}
1148+
1149+
auto addr = std::move(it->second);
1150+
ConfirmLeaderRequests.erase(it);
1151+
1152+
Send(addr.Sender, new TEvTablet::TEvConfirmLeaderResult(TabletID(), StateStorageInfo.KnownGeneration), 0, addr.Cookie);
1153+
}
1154+
11051155
bool TTablet::HandleNext(TEvTablet::TEvCommit::TPtr &ev) {
11061156
TEvTablet::TEvCommit *msg = ev->Get();
11071157

@@ -1795,6 +1845,16 @@ void TTablet::CancelTablet(TEvTablet::TEvTabletDead::EReason reason, const TStri
17951845
if (UserTablet)
17961846
Send(UserTablet, new TEvTablet::TEvTabletDead(TabletID(), reason, reportedGeneration));
17971847

1848+
if (BlobStorageStatus != NKikimrProto::OK) {
1849+
// Note: probably unnecessary after TEvTabletDead
1850+
for (const auto& pr : ConfirmLeaderRequests) {
1851+
Send(pr.second.Sender,
1852+
new TEvTablet::TEvConfirmLeaderResult(TabletID(), StateStorageInfo.KnownGeneration, BlobStorageStatus, BlobStorageErrorReason),
1853+
0, pr.second.Cookie);
1854+
}
1855+
ConfirmLeaderRequests.clear();
1856+
}
1857+
17981858
if (StateStorageGuardian)
17991859
Send(StateStorageGuardian, new TEvents::TEvPoisonPill());
18001860

ydb/core/tablet/tablet_sys.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ class TTablet : public TActor<TTablet> {
1515
using TTabletStateInfo = NKikimrWhiteboard::TTabletStateInfo;
1616
using ETabletState = TTabletStateInfo::ETabletState;
1717

18+
struct TRequestAddr {
19+
TActorId Sender;
20+
ui64 Cookie;
21+
};
22+
1823
struct TStateStorageInfo {
1924
TActorId ProxyID;
2025

@@ -255,6 +260,10 @@ class TTablet : public TActor<TTablet> {
255260
TString BlobStorageErrorReason;
256261
bool BlobStorageErrorReported = false;
257262

263+
// Leader confirmation requests
264+
THashMap<ui64, TRequestAddr> ConfirmLeaderRequests;
265+
ui64 ConfirmLeaderCounter = 0;
266+
258267
ui64 TabletID() const;
259268

260269
void ReportTabletStateChange(ETabletState state);
@@ -318,6 +327,8 @@ class TTablet : public TActor<TTablet> {
318327

319328
void Handle(TEvTablet::TEvGcForStepAckRequest::TPtr& ev);
320329

330+
void Handle(TEvTablet::TEvConfirmLeader::TPtr &ev);
331+
void Handle(TEvBlobStorage::TEvGetBlockResult::TPtr &ev);
321332
void Handle(TEvTablet::TEvCommit::TPtr &ev);
322333
bool HandleNext(TEvTablet::TEvCommit::TPtr &ev);
323334
void Handle(TEvTablet::TEvAux::TPtr &ev);
@@ -512,6 +523,7 @@ class TTablet : public TActor<TTablet> {
512523
STATEFN(StateActivePhase) {
513524
switch (ev->GetTypeRewrite()) {
514525
hFunc(TEvTablet::TEvCommit, Handle);
526+
hFunc(TEvTablet::TEvConfirmLeader, Handle);
515527
hFunc(TEvTablet::TEvAux, Handle);
516528
hFunc(TEvTablet::TEvPreCommit, Handle);
517529
hFunc(TEvTablet::TEvTabletActive, HandleByLeader);
@@ -536,6 +548,7 @@ class TTablet : public TActor<TTablet> {
536548
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleByLeader);
537549
hFunc(TEvents::TEvUndelivered, HandleByLeader);
538550
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
551+
hFunc(TEvBlobStorage::TEvGetBlockResult, Handle);
539552
hFunc(TEvTablet::TEvGcForStepAckRequest, Handle);
540553
}
541554
}

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 94 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1646,6 +1646,63 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
16461646
}
16471647
}
16481648

1649+
void TExecutor::LeaseConfirmed(ui64 confirmedCookie) {
1650+
bool leaseUpdated = false;
1651+
while (!LeaseCommits.empty()) {
1652+
auto& l = LeaseCommits.front();
1653+
if (l.Cookie <= confirmedCookie) {
1654+
LeaseEnd = Max(LeaseEnd, l.LeaseEnd);
1655+
1656+
auto callbacks = std::move(l.Callbacks);
1657+
LeaseCommitsByEnd.erase(l.ByEndIterator);
1658+
LeaseCommits.pop_front();
1659+
1660+
for (auto& callback : callbacks) {
1661+
callback();
1662+
}
1663+
1664+
leaseUpdated = true;
1665+
} else {
1666+
break;
1667+
}
1668+
}
1669+
1670+
if (leaseUpdated && LeaseCommits.empty()) {
1671+
if (LeaseDurationIncreases < 2) {
1672+
// Calculate how much of a lease is left after a full round trip
1673+
// When we are left with less than a third of lease duration we want
1674+
// to increase lease duration so we would have enough time for
1675+
// processing read-only requests without additional commits
1676+
TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
1677+
if ((LeaseEnd - ts) < LeaseDuration / 3) {
1678+
LeaseDuration *= 2;
1679+
LeaseDurationUpdated = true;
1680+
++LeaseDurationIncreases;
1681+
}
1682+
}
1683+
1684+
// We want to schedule a new commit before the lease expires
1685+
if (!LeaseExtendPending) {
1686+
Schedule(LeaseEnd - LeaseDuration / 3, new TEvPrivate::TEvLeaseExtend);
1687+
LeaseExtendPending = true;
1688+
}
1689+
}
1690+
}
1691+
1692+
TExecutor::TLeaseCommit* TExecutor::AddLeaseConfirm() {
1693+
if (!LeaseEnabled || Y_UNLIKELY(LeaseDropped)) {
1694+
return nullptr;
1695+
}
1696+
1697+
TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
1698+
TLeaseCommit* lease = &LeaseCommits.emplace_back(0, ts, ts + LeaseDuration, ++LeaseCommitsCounter);
1699+
lease->ByEndIterator = LeaseCommitsByEnd.emplace(lease->LeaseEnd, lease);
1700+
1701+
Send(Owner->Tablet(), new TEvTablet::TEvConfirmLeader(Owner->TabletID(), Generation()), 0, lease->Cookie);
1702+
1703+
return lease;
1704+
}
1705+
16491706
TExecutor::TLeaseCommit* TExecutor::AttachLeaseCommit(TLogCommit* commit, bool force) {
16501707
if (!LeaseEnabled || Y_UNLIKELY(LeaseDropped)) {
16511708
return nullptr;
@@ -1661,17 +1718,17 @@ TExecutor::TLeaseCommit* TExecutor::AttachLeaseCommit(TLogCommit* commit, bool f
16611718
Y_ENSURE(ok);
16621719

16631720
commit->Metadata.emplace_back(ui32(NBoot::ELogCommitMeta::LeaseInfo), std::move(data));
1664-
LeaseDurationUpdated = false;
16651721
}
16661722

16671723
TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
1668-
TLeaseCommit* lease = &LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration);
1724+
TLeaseCommit* lease = &LeaseCommits.emplace_back(commit->Step, ts, ts + LeaseDuration, ++LeaseCommitsCounter);
1725+
LeaseCommitsByStep.PushBack(lease);
16691726

16701727
// It may happen in the future that LeaseDuration is decreased by this
16711728
// commit, in which case new leader might read and use it, and may not wait
16721729
// longer than the new LeaseEnd. If there are commits currently in flight
16731730
// make sure to truncate their lease extensions to the new LeaseEnd.
1674-
if (force || LeaseDurationUpdated) {
1731+
if (LeaseDurationUpdated) {
16751732
auto it = LeaseCommitsByEnd.upper_bound(lease->LeaseEnd);
16761733
while (it != LeaseCommitsByEnd.end()) {
16771734
TLeaseCommit* other = it->second;
@@ -1681,9 +1738,11 @@ TExecutor::TLeaseCommit* TExecutor::AttachLeaseCommit(TLogCommit* commit, bool f
16811738
}
16821739
// Currently confirmed lease may become truncated as well
16831740
LeaseEnd = Min(LeaseEnd, lease->LeaseEnd);
1741+
LeaseDurationUpdated = false;
16841742
}
16851743

16861744
lease->ByEndIterator = LeaseCommitsByEnd.emplace(lease->LeaseEnd, lease);
1745+
16871746
return lease;
16881747
}
16891748

@@ -1707,16 +1766,22 @@ TExecutor::TLeaseCommit* TExecutor::EnsureReadOnlyLease(TMonotonic at) {
17071766
if (itAfter != LeaseCommitsByEnd.end()) {
17081767
lease = itAfter->second;
17091768
} else if (!LeaseDropped) {
1710-
LogicRedo->FlushBatchedLog();
1769+
if (LeaseDurationUpdated || !LeasePersisted) {
1770+
// We need to make a real commit
1771+
LogicRedo->FlushBatchedLog();
17111772

1712-
auto commit = CommitManager->Begin(true, ECommit::Misc, {});
1773+
auto commit = CommitManager->Begin(true, ECommit::Misc, {});
17131774

1714-
lease = AttachLeaseCommit(commit.Get(), /* force */ true);
1775+
lease = AttachLeaseCommit(commit.Get(), /* force */ true);
17151776

1716-
CommitManager->Commit(commit);
1777+
CommitManager->Commit(commit);
17171778

1718-
if (LogicSnap->MayFlush(false)) {
1719-
MakeLogSnapshot();
1779+
if (LogicSnap->MayFlush(false)) {
1780+
MakeLogSnapshot();
1781+
}
1782+
} else {
1783+
// We want a lightweight confirmation
1784+
lease = AddLeaseConfirm();
17201785
}
17211786
}
17221787

@@ -3052,6 +3117,20 @@ void TExecutor::Handle(TEvPrivate::TEvLeaseExtend::TPtr &, const TActorContext &
30523117
EnsureReadOnlyLease(LeaseEnd);
30533118
}
30543119

3120+
void TExecutor::Handle(TEvTablet::TEvConfirmLeaderResult::TPtr &ev) {
3121+
auto *msg = ev->Get();
3122+
3123+
// Note: lease confirmation error will currenly cause tablet to stop anyway
3124+
if (msg->Status != NKikimrProto::OK) {
3125+
if (auto logl = Logger->Log(ELnLev::Error)) {
3126+
logl << NFmt::Do(*this) << " Broken on lease confirmation";
3127+
}
3128+
return Broken();
3129+
}
3130+
3131+
LeaseConfirmed(ev->Cookie);
3132+
}
3133+
30553134
void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext &ctx) {
30563135
TEvTablet::TEvCommitResult *msg = ev->Get();
30573136

@@ -3078,39 +3157,12 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
30783157
<< " for step " << step;
30793158
}
30803159

3081-
if (!LeaseCommits.empty()) {
3082-
auto& l = LeaseCommits.front();
3083-
Y_ENSURE(step <= l.Step);
3084-
if (step == l.Step) {
3160+
if (!LeaseCommitsByStep.Empty()) {
3161+
auto* l = LeaseCommitsByStep.Front();
3162+
Y_ENSURE(step <= l->Step);
3163+
if (step == l->Step) {
30853164
LeasePersisted = true;
3086-
LeaseEnd = Max(LeaseEnd, l.LeaseEnd);
3087-
3088-
auto callbacks = std::move(l.Callbacks);
3089-
LeaseCommitsByEnd.erase(l.ByEndIterator);
3090-
LeaseCommits.pop_front();
3091-
3092-
for (auto& callback : callbacks) {
3093-
callback();
3094-
}
3095-
3096-
if (LeaseDurationIncreases < 2 && LeaseCommits.empty()) {
3097-
// Calculate how much of a lease is left after a full round trip
3098-
// When we are left with less than a third of lease duration we want
3099-
// to increase lease duration so we would have enough time for
3100-
// processing read-only requests without additional commits
3101-
TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
3102-
if ((LeaseEnd - ts) < LeaseDuration / 3) {
3103-
LeaseDuration *= 2;
3104-
LeaseDurationUpdated = true;
3105-
++LeaseDurationIncreases;
3106-
}
3107-
}
3108-
3109-
// We want to schedule a new commit before the lease expires
3110-
if (LeaseCommits.empty() && !LeaseExtendPending) {
3111-
Schedule(LeaseEnd - LeaseDuration / 3, new TEvPrivate::TEvLeaseExtend);
3112-
LeaseExtendPending = true;
3113-
}
3165+
LeaseConfirmed(l->Cookie);
31143166
}
31153167
}
31163168

@@ -4154,6 +4206,7 @@ STFUNC(TExecutor::StateWork) {
41544206
hFunc(NSharedCache::TEvUpdated, Handle);
41554207
HFunc(TEvTablet::TEvDropLease, Handle);
41564208
HFunc(TEvTablet::TEvCommitResult, Handle);
4209+
hFunc(TEvTablet::TEvConfirmLeaderResult, Handle);
41574210
hFunc(TEvTablet::TEvCheckBlobstorageStatusResult, Handle);
41584211
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
41594212
HFunc(TEvBlobStorage::TEvGetResult, Handle);

0 commit comments

Comments
 (0)