Skip to content

Commit 7667788

Browse files
authored
Fix loading tx (#16116)
1 parent 5fac434 commit 7667788

File tree

7 files changed

+152
-103
lines changed

7 files changed

+152
-103
lines changed

ydb/core/blob_depot/assimilator.cpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,10 @@ namespace NKikimr::NBlobDepot {
298298
return;
299299
}
300300

301-
THPTimer timer;
301+
const ui64 endTime = GetCycleCountFast() + DurationToCycles(TDuration::MilliSeconds(10));
302302
ui32 numItems = 0;
303303
bool timeout = false;
304+
bool invalidate = false;
304305

305306
if (!LastPlanScannedKey) {
306307
++Self->AsStats.CopyIteration;
@@ -312,22 +313,25 @@ namespace NKikimr::NBlobDepot {
312313
LastPlanScannedKey ? TData::TKey(*LastPlanScannedKey) : TData::TKey::Min(),
313314
TData::TKey::Max(),
314315
};
316+
315317
Self->Data->ScanRange(range, nullptr, nullptr, [&](const TData::TKey& key, const TData::TValue& value) {
316-
if (++numItems == 1000) {
317-
numItems = 0;
318-
if (TDuration::Seconds(timer.Passed()) >= TDuration::MilliSeconds(1)) {
319-
timeout = true;
320-
return false;
321-
}
322-
}
323318
if (value.GoingToAssimilate) {
324319
Self->AsStats.BytesToCopy += key.GetBlobId().BlobSize();
325-
Self->JsonHandler.Invalidate();
320+
invalidate = true;
326321
}
327322
LastPlanScannedKey.emplace(key.GetBlobId());
328-
return true;
323+
if (++numItems % 1024 == 0 && endTime <= GetCycleCountFast()) {
324+
timeout = true;
325+
return false;
326+
} else {
327+
return true;
328+
}
329329
});
330330

331+
if (invalidate) {
332+
Self->JsonHandler.Invalidate();
333+
}
334+
331335
if (timeout) {
332336
ResumeScanDataForPlanningInFlight = true;
333337
TActivationContext::Send(new IEventHandle(TEvPrivate::EvResumeScanDataForPlanning, 0, SelfId(), {}, nullptr, 0));

ydb/core/blob_depot/coro_tx.cpp

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
namespace NKikimr::NBlobDepot {
99

10-
thread_local TCoroTx *TCoroTx::Current = nullptr;
11-
1210
enum class EOutcome {
1311
UNSET,
1412
FINISH_TX,
@@ -27,21 +25,24 @@ namespace NKikimr::NBlobDepot {
2725
return size;
2826
}
2927

30-
class TCoroTx::TContext : public ITrampoLine {
28+
class TCoroTx::TContext
29+
: public ITrampoLine
30+
, public TContextBase
31+
{
3132
TMappedAllocation Stack;
3233
TExceptionSafeContext Context;
3334
TExceptionSafeContext *BackContext = nullptr;
3435

3536
EOutcome Outcome = EOutcome::UNSET;
3637

3738
TTokens Tokens;
38-
std::function<void()> Body;
39+
std::function<void(TContextBase&)> Body;
3940

4041
bool Finished = false;
4142
bool Aborted = false;
4243

4344
public:
44-
TContext(TTokens&& tokens, std::function<void()>&& body)
45+
TContext(TTokens&& tokens, std::function<void(TContextBase&)>&& body)
4546
: Stack(AlignStackSize(65536))
4647
, Context({this, TArrayRef(Stack.Begin(), Stack.End())})
4748
, Tokens(std::move(tokens))
@@ -55,21 +56,26 @@ namespace NKikimr::NBlobDepot {
5556
~TContext() {
5657
if (!Finished) {
5758
Aborted = true;
58-
Resume();
59+
Resume(nullptr);
5960
}
6061
}
6162

62-
EOutcome Resume() {
63+
EOutcome Resume(NTabletFlatExecutor::TTransactionContext *txc) {
6364
Outcome = EOutcome::UNSET;
6465

66+
NTabletFlatExecutor::TTransactionContext *prevTxC = std::exchange(TxContext, txc);
67+
Y_ABORT_UNLESS(!prevTxC);
68+
6569
TExceptionSafeContext returnContext;
6670
Y_ABORT_UNLESS(!BackContext);
6771
BackContext = &returnContext;
68-
Y_DEBUG_ABORT_UNLESS(CurrentTx() || Aborted);
6972
returnContext.SwitchTo(&Context);
7073
Y_ABORT_UNLESS(BackContext == &returnContext);
7174
BackContext = nullptr;
7275

76+
prevTxC = std::exchange(TxContext, nullptr);
77+
Y_ABORT_UNLESS(prevTxC == txc);
78+
7379
Y_ABORT_UNLESS(Outcome != EOutcome::UNSET);
7480
return Outcome;
7581
}
@@ -100,7 +106,7 @@ namespace NKikimr::NBlobDepot {
100106
void DoRun() override {
101107
if (!IsExpired()) {
102108
try {
103-
Body();
109+
Body(*this);
104110
} catch (const TExDead&) {
105111
// just do nothing
106112
}
@@ -110,7 +116,7 @@ namespace NKikimr::NBlobDepot {
110116
}
111117
};
112118

113-
TCoroTx::TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void()> body)
119+
TCoroTx::TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void(TCoroTx::TContextBase&)> body)
114120
: TTransactionBase(self)
115121
, Context(std::make_unique<TContext>(std::move(tokens), std::move(body)))
116122
{}
@@ -124,18 +130,8 @@ namespace NKikimr::NBlobDepot {
124130
{}
125131

126132
bool TCoroTx::Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) {
127-
// prepare environment
128-
Y_ABORT_UNLESS(TxContext == nullptr && Current == nullptr);
129-
TxContext = &txc;
130-
Current = this;
131-
132133
Y_ABORT_UNLESS(Context);
133-
const EOutcome outcome = Context->Resume();
134-
135-
// clear environment back
136-
Y_ABORT_UNLESS(TxContext == &txc && Current == this);
137-
TxContext = nullptr;
138-
Current = nullptr;
134+
const EOutcome outcome = Context->Resume(&txc);
139135

140136
switch (outcome) {
141137
case EOutcome::FINISH_TX:
@@ -150,16 +146,8 @@ namespace NKikimr::NBlobDepot {
150146
}
151147

152148
void TCoroTx::Complete(const TActorContext&) {
153-
// prepare environment
154-
Y_ABORT_UNLESS(TxContext == nullptr && Current == nullptr);
155-
Current = this;
156-
157149
Y_ABORT_UNLESS(Context);
158-
const EOutcome outcome = Context->Resume();
159-
160-
// clear environment back
161-
Y_ABORT_UNLESS(TxContext == nullptr && Current == this);
162-
Current = nullptr;
150+
const EOutcome outcome = Context->Resume(nullptr);
163151

164152
switch (outcome) {
165153
case EOutcome::RUN_SUCCESSOR_TX:
@@ -174,28 +162,21 @@ namespace NKikimr::NBlobDepot {
174162
}
175163
}
176164

177-
TCoroTx *TCoroTx::CurrentTx() {
178-
return Current;
179-
}
180-
181-
NTabletFlatExecutor::TTransactionContext *TCoroTx::GetTxc() {
182-
Y_ABORT_UNLESS(Current->TxContext);
183-
return Current->TxContext;
165+
NTabletFlatExecutor::TTransactionContext *TCoroTx::TContextBase::GetTxc() {
166+
Y_ABORT_UNLESS(TxContext);
167+
return TxContext;
184168
}
185169

186-
void TCoroTx::FinishTx() {
187-
Y_ABORT_UNLESS(Current);
188-
Current->Context->Return(EOutcome::FINISH_TX);
170+
void TCoroTx::TContextBase::FinishTx() {
171+
static_cast<TContext*>(this)->Return(EOutcome::FINISH_TX);
189172
}
190173

191-
void TCoroTx::RestartTx() {
192-
Y_ABORT_UNLESS(Current);
193-
Current->Context->Return(EOutcome::RESTART_TX);
174+
void TCoroTx::TContextBase::RestartTx() {
175+
static_cast<TContext*>(this)->Return(EOutcome::RESTART_TX);
194176
}
195177

196-
void TCoroTx::RunSuccessorTx() {
197-
Y_ABORT_UNLESS(Current);
198-
Current->Context->Return(EOutcome::RUN_SUCCESSOR_TX);
178+
void TCoroTx::TContextBase::RunSuccessorTx() {
179+
static_cast<TContext*>(this)->Return(EOutcome::RUN_SUCCESSOR_TX);
199180
}
200181

201182
} // NKikimr::NBlobDepot

ydb/core/blob_depot/coro_tx.h

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,31 @@ namespace NKikimr::NBlobDepot {
1010
using TTokens = std::vector<std::weak_ptr<TToken>>;
1111

1212
class TCoroTx : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
13+
public:
14+
class TContextBase {
15+
protected:
16+
TTransactionContext *TxContext = nullptr;
17+
18+
public:
19+
NTabletFlatExecutor::TTransactionContext *GetTxc();
20+
void FinishTx(); // finish this transaction; function returns on Complete() entry
21+
void RestartTx(); // restart transaction; function returns on next Execute() entry
22+
void RunSuccessorTx(); // restart in new transaction -- called after FinishTx()
23+
NTabletFlatExecutor::TTransactionContext& operator *() { return *GetTxc(); }
24+
};
25+
26+
private:
1327
class TContext;
1428
std::unique_ptr<TContext> Context;
15-
TTransactionContext *TxContext = nullptr;
16-
static thread_local TCoroTx *Current;
1729

1830
public:
19-
TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void()> body);
31+
TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void(TContextBase&)> body);
2032
TCoroTx(TCoroTx& predecessor);
2133
~TCoroTx();
2234

2335
private:
2436
bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) override;
2537
void Complete(const TActorContext&) override;
26-
27-
public:
28-
static NTabletFlatExecutor::TTransactionContext *GetTxc();
29-
static TCoroTx *CurrentTx(); // obtain pointer to current tx
30-
static void FinishTx(); // finish this transaction; function returns on Complete() entry
31-
static void RestartTx(); // restart transaction; function returns on next Execute() entry
32-
static void RunSuccessorTx(); // restart in new transaction -- called after FinishTx()
3338
};
3439

3540
} // NKikimr::NBlobDepot

ydb/core/blob_depot/data.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,14 @@ namespace NKikimr::NBlobDepot {
451451
ui64 InFlightTrashSize = 0;
452452
ui64 TotalS3DataSize = 0;
453453

454+
ui64 LoadRestartTx = 0;
455+
ui64 LoadRunSuccessorTx = 0;
456+
ui64 LoadProcessingCycles = 0;
457+
ui64 LoadFinishTxCycles = 0;
458+
ui64 LoadRestartTxCycles = 0;
459+
ui64 LoadRunSuccessorTxCycles = 0;
460+
ui64 LoadTotalCycles = 0;
461+
454462
friend class TGroupAssimilator;
455463

456464
THashMultiMap<void*, TLogoBlobID> InFlightTrashBlobs; // being committed, but not yet confirmed

ydb/core/blob_depot/data_decommit.cpp

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,19 @@ namespace NKikimr::NBlobDepot {
4848
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "TResolveDecommitActor::Bootstrap", (Id, Self->GetLogId()),
4949
(Sender, Ev->Sender), (Cookie, Ev->Cookie));
5050

51-
Self->Execute(std::make_unique<TCoroTx>(Self, TTokens{{Token, ActorToken}}, std::bind(&TThis::TxPrepare, this)));
51+
Self->Execute(std::make_unique<TCoroTx>(Self, TTokens{{Token, ActorToken}}, std::bind(&TThis::TxPrepare,
52+
this, std::placeholders::_1)));
5253
++TxInFlight;
5354
Become(&TThis::StateFunc);
5455
}
5556

56-
void TxPrepare() {
57+
void TxPrepare(TCoroTx::TContextBase& tx) {
5758
for (const auto& item : Ev->Get()->Record.GetItems()) {
5859
switch (item.GetKeyDesignatorCase()) {
5960
case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: {
6061
if (!item.HasTabletId()) {
61-
return FinishWithError(NLog::PRI_CRIT, "incorrect request");
62+
tx.FinishTx();
63+
return FinishWithError(NLog::PRI_CRIT, "incorrect request");
6264
}
6365

6466
const ui64 tabletId = item.GetTabletId();
@@ -80,20 +82,20 @@ namespace NKikimr::NBlobDepot {
8082
// adjust minId to skip already assimilated items in range query
8183
if (minId < Self->Data->LastAssimilatedBlobId) {
8284
if (item.GetMustRestoreFirst()) {
83-
ScanRange(TKey(minId), TKey(*Self->Data->LastAssimilatedBlobId),
85+
ScanRange(tx, TKey(minId), TKey(*Self->Data->LastAssimilatedBlobId),
8486
EScanFlags::INCLUDE_BEGIN, true /*issueGets*/);
8587
}
8688
minId = *Self->Data->LastAssimilatedBlobId;
8789
}
8890

8991
// prepare the range first -- we must have it loaded in memory
90-
ScanRange(TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END,
92+
ScanRange(tx, TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END,
9193
false /*issueGets*/);
9294

9395
// issue scan query
9496
IssueRange(tabletId, minId, maxId, item.GetMustRestoreFirst());
9597
} else if (item.GetMustRestoreFirst()) {
96-
ScanRange(TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END,
98+
ScanRange(tx, TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END,
9799
true /*issueGets*/);
98100
}
99101

@@ -102,8 +104,8 @@ namespace NKikimr::NBlobDepot {
102104

103105
case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: {
104106
TData::TKey key = TKey::FromBinaryKey(item.GetExactKey(), Self->Config);
105-
while (!Self->Data->EnsureKeyLoaded(key, *TCoroTx::GetTxc())) {
106-
TCoroTx::RestartTx();
107+
while (!Self->Data->EnsureKeyLoaded(key, *tx)) {
108+
tx.RestartTx();
107109
}
108110
const TValue *value = Self->Data->FindKey(key);
109111
const bool notYetAssimilated = Self->Data->LastAssimilatedBlobId < key.GetBlobId();
@@ -121,11 +123,11 @@ namespace NKikimr::NBlobDepot {
121123
}
122124
}
123125

124-
TCoroTx::FinishTx();
126+
tx.FinishTx();
125127
TActivationContext::Send(new IEventHandle(TEvPrivate::EvTxComplete, 0, SelfId(), {}, nullptr, 0));
126128
}
127129

128-
void ScanRange(TKey from, TKey to, TScanFlags flags, bool issueGets) {
130+
void ScanRange(TCoroTx::TContextBase& tx, TKey from, TKey to, TScanFlags flags, bool issueGets) {
129131
bool progress = false;
130132

131133
auto callback = [&](const TKey& key, const TValue& value) {
@@ -136,12 +138,12 @@ namespace NKikimr::NBlobDepot {
136138
};
137139

138140
TScanRange r{from, to, flags};
139-
while (!Self->Data->ScanRange(r, TCoroTx::GetTxc(), &progress, callback)) {
141+
while (!Self->Data->ScanRange(r, tx.GetTxc(), &progress, callback)) {
140142
if (std::exchange(progress, false)) {
141-
TCoroTx::FinishTx();
142-
TCoroTx::RunSuccessorTx();
143+
tx.FinishTx();
144+
tx.RunSuccessorTx();
143145
} else {
144-
TCoroTx::RestartTx();
146+
tx.RestartTx();
145147
}
146148
}
147149
}
@@ -317,12 +319,10 @@ namespace NKikimr::NBlobDepot {
317319
}
318320

319321
void CheckIfDone() {
320-
if (TxInFlight + RangesInFlight + GetsInFlight + GetQ.size() + PutsInFlight == 0) {
321-
FinishWithSuccess();
322+
if (TxInFlight + RangesInFlight + GetsInFlight + GetQ.size() + PutsInFlight != 0) {
323+
return;
322324
}
323-
}
324325

325-
void FinishWithSuccess() {
326326
Y_ABORT_UNLESS(!Finished);
327327
Finished = true;
328328

@@ -331,19 +331,19 @@ namespace NKikimr::NBlobDepot {
331331
(DecommitBlobs.size, DecommitBlobs.size()));
332332

333333
Self->Execute(std::make_unique<TCoroTx>(Self, TTokens{{Token}}, [self = Self, decommitBlobs = std::move(DecommitBlobs),
334-
ev = Ev, resolutionErrors = std::move(ResolutionErrors)]() mutable {
334+
ev = Ev, resolutionErrors = std::move(ResolutionErrors)](TCoroTx::TContextBase& tx) mutable {
335335
ui32 numItemsProcessed = 0;
336336
for (const auto& blob : decommitBlobs) {
337337
if (numItemsProcessed == 10'000) {
338-
TCoroTx::FinishTx();
339-
self->Data->CommitTrash(TCoroTx::CurrentTx());
338+
tx.FinishTx();
339+
self->Data->CommitTrash(&tx);
340340
numItemsProcessed = 0;
341-
TCoroTx::RunSuccessorTx();
341+
tx.RunSuccessorTx();
342342
}
343-
numItemsProcessed += self->Data->AddDataOnDecommit(blob, *TCoroTx::GetTxc(), TCoroTx::CurrentTx());
343+
numItemsProcessed += self->Data->AddDataOnDecommit(blob, *tx, &tx);
344344
}
345-
TCoroTx::FinishTx();
346-
self->Data->CommitTrash(TCoroTx::CurrentTx());
345+
tx.FinishTx();
346+
self->Data->CommitTrash(&tx);
347347
self->Data->ExecuteTxResolve(ev, std::move(resolutionErrors));
348348
}));
349349

0 commit comments

Comments
 (0)