@@ -13,7 +13,7 @@ namespace NKikimr::NBlobDepot {
13
13
TBlobDepot* const Self;
14
14
std::weak_ptr<TToken> Token;
15
15
std::shared_ptr<TToken> ActorToken = std::make_shared<TToken>();
16
- std::vector <TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs;
16
+ std::deque <TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs;
17
17
THashSet<TLogoBlobID> ResolutionErrors;
18
18
TEvBlobDepot::TEvResolve::TPtr Ev;
19
19
@@ -35,6 +35,159 @@ namespace NKikimr::NBlobDepot {
35
35
36
36
std::vector<TAssimilatedBlobInfo> AssimilatedBlobs;
37
37
38
+ using TRange = std::tuple<ui64, TLogoBlobID, TLogoBlobID, bool >;
39
+ using TScan = std::tuple<TKey, TKey, TScanFlags, bool , std::optional<TRange>>;
40
+
41
+ class TTxPrepare : public NTabletFlatExecutor ::TTransactionBase<TBlobDepot> {
42
+ TResolveDecommitActor *Actor;
43
+ std::weak_ptr<TToken> ActorToken;
44
+ int Index = 0 ;
45
+ std::deque<TScan> Scans;
46
+ std::optional<TScanRange> ScanRange;
47
+ bool IssueGets;
48
+ std::optional<TRange> IssueRangeAfter;
49
+
50
+ // transaction-local state
51
+ bool Progress = false ;
52
+ bool RestartTx = false ;
53
+
54
+ public:
55
+ TTxType GetTxType () const override { return NKikimrBlobDepot::TXTYPE_DECOMMIT_BLOBS; }
56
+
57
+ TTxPrepare (TResolveDecommitActor *actor, std::deque<TScan>&& scans)
58
+ : TTransactionBase(actor->Self)
59
+ , Actor(actor)
60
+ , ActorToken(Actor->ActorToken)
61
+ , Scans(std::move(scans))
62
+ {}
63
+
64
+ TTxPrepare (TTxPrepare& other)
65
+ : TTransactionBase(other.Self)
66
+ , Actor(other.Actor)
67
+ , ActorToken(std::move(other.ActorToken))
68
+ , Index(other.Index)
69
+ , Scans(std::move(other.Scans))
70
+ , ScanRange(std::move(other.ScanRange))
71
+ , IssueGets(other.IssueGets)
72
+ , IssueRangeAfter(std::move(other.IssueRangeAfter))
73
+ {}
74
+
75
+ bool Execute (TTransactionContext& txc, const TActorContext&) override {
76
+ if (ActorToken.expired ()) {
77
+ return true ;
78
+ }
79
+
80
+ auto checkProgress = [&] {
81
+ if (Progress) {
82
+ RestartTx = true ;
83
+ return true ;
84
+ } else {
85
+ return false ;
86
+ }
87
+ };
88
+
89
+ // process pending scans
90
+ auto doScanRange = [&] {
91
+ auto callback = [&](const TKey& key, const TValue& value) {
92
+ if (IssueGets && value.GoingToAssimilate ) {
93
+ InvokeOtherActor (*Actor, &TResolveDecommitActor::IssueGet, key.GetBlobId (), true /* mustRestoreFirst*/ );
94
+ }
95
+ return true ;
96
+ };
97
+ if (Self->Data ->ScanRange (*ScanRange, &txc, &Progress, callback)) { // scan has been finished completely
98
+ ScanRange.reset ();
99
+ if (IssueRangeAfter) {
100
+ std::apply ([&](auto &&... args) {
101
+ InvokeOtherActor (*Actor, &TResolveDecommitActor::IssueRange, std::move (args)...);
102
+ }, *IssueRangeAfter);
103
+ }
104
+ return true ;
105
+ } else { // some data remains
106
+ return false ;
107
+ }
108
+ };
109
+ if (ScanRange && !doScanRange ()) {
110
+ return checkProgress ();
111
+ }
112
+ while (!Scans.empty ()) {
113
+ auto & [from, to, flags, issueGets, issueRangeAfter] = Scans.front ();
114
+ ScanRange.emplace (std::move (from), std::move (to), flags);
115
+ IssueGets = issueGets;
116
+ IssueRangeAfter = std::move (issueRangeAfter);
117
+ Scans.pop_front ();
118
+
119
+ if (!doScanRange ()) {
120
+ return checkProgress ();
121
+ }
122
+ }
123
+
124
+ // process explicit items after doing all scans
125
+ for (auto & items = Actor->Ev ->Get ()->Record .GetItems (); Index < items.size (); ++Index) {
126
+ if (const auto & item = items[Index]; item.HasExactKey ()) {
127
+ TData::TKey key = TKey::FromBinaryKey (item.GetExactKey (), Self->Config );
128
+ if (!Self->Data ->EnsureKeyLoaded (key, txc, &Progress)) {
129
+ return checkProgress ();
130
+ }
131
+ const TValue *value = Self->Data ->FindKey (key);
132
+ const bool notYetAssimilated = Self->Data ->LastAssimilatedBlobId < key.GetBlobId ();
133
+ const bool doGet = !value ? notYetAssimilated :
134
+ value->GoingToAssimilate ? item.GetMustRestoreFirst () : notYetAssimilated;
135
+ if (doGet) {
136
+ InvokeOtherActor (*Actor, &TResolveDecommitActor::IssueGet, key.GetBlobId (),
137
+ item.GetMustRestoreFirst ());
138
+ }
139
+ }
140
+ }
141
+
142
+ return true ;
143
+ }
144
+
145
+ void Complete (const TActorContext&) override {
146
+ if (ActorToken.expired ()) {
147
+ return ;
148
+ } else if (RestartTx) {
149
+ Self->Execute (std::make_unique<TTxPrepare>(*this ));
150
+ } else {
151
+ TActivationContext::Send (new IEventHandle (TEvPrivate::EvTxComplete, 0 , Actor->SelfId (), {}, nullptr , 0 ));
152
+ }
153
+ }
154
+ };
155
+
156
+ class TTxDecommitBlobs : public NTabletFlatExecutor ::TTransactionBase<TBlobDepot> {
157
+ THashSet<TLogoBlobID> ResolutionErrors;
158
+ std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs;
159
+ TEvBlobDepot::TEvResolve::TPtr Ev;
160
+
161
+ public:
162
+ TTxType GetTxType () const override { return NKikimrBlobDepot::TXTYPE_DECOMMIT_BLOBS; }
163
+
164
+ TTxDecommitBlobs (TBlobDepot *self, THashSet<TLogoBlobID>&& resolutionErrors,
165
+ std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob>&& decommitBlobs,
166
+ TEvBlobDepot::TEvResolve::TPtr ev)
167
+ : TTransactionBase(self)
168
+ , ResolutionErrors(std::move(resolutionErrors))
169
+ , DecommitBlobs(std::move(decommitBlobs))
170
+ , Ev(ev)
171
+ {}
172
+
173
+ bool Execute (TTransactionContext& txc, const TActorContext&) override {
174
+ for (size_t num = 0 ; !DecommitBlobs.empty () && num < 10'000 ; DecommitBlobs.pop_front ()) {
175
+ num += Self->Data ->AddDataOnDecommit (DecommitBlobs.front (), txc, this );
176
+ }
177
+ return true ;
178
+ }
179
+
180
+ void Complete (const TActorContext&) override {
181
+ Self->Data ->CommitTrash (this );
182
+ if (DecommitBlobs.empty ()) {
183
+ Self->Data ->ExecuteTxResolve (Ev, std::move (ResolutionErrors));
184
+ } else {
185
+ Self->Execute (std::make_unique<TTxDecommitBlobs>(Self, std::move (ResolutionErrors),
186
+ std::move (DecommitBlobs), Ev));
187
+ }
188
+ }
189
+ };
190
+
38
191
public:
39
192
TResolveDecommitActor (TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr ev)
40
193
: Self(self)
@@ -50,19 +203,13 @@ namespace NKikimr::NBlobDepot {
50
203
STLOG (PRI_DEBUG, BLOB_DEPOT, BDT42, " TResolveDecommitActor::Bootstrap" , (Id, Self->GetLogId ()),
51
204
(Sender, Ev->Sender ), (Cookie, Ev->Cookie ));
52
205
53
- Self->Execute (std::make_unique<TCoroTx>(Self, TTokens{{Token, ActorToken}}, std::bind (&TThis::TxPrepare,
54
- this , std::placeholders::_1)));
55
- ++TxInFlight;
56
- Become (&TThis::StateFunc);
57
- }
206
+ std::deque<TScan> scans;
58
207
59
- void TxPrepare (TCoroTx::TContextBase& tx) {
60
208
for (const auto & item : Ev->Get ()->Record .GetItems ()) {
61
209
switch (item.GetKeyDesignatorCase ()) {
62
210
case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange : {
63
211
if (!item.HasTabletId ()) {
64
- tx.FinishTx ();
65
- return FinishWithError (NLog::PRI_CRIT, " incorrect request" );
212
+ return FinishWithError (NLog::PRI_CRIT, " incorrect request: tablet id not set" );
66
213
}
67
214
68
215
const ui64 tabletId = item.GetTabletId ();
@@ -78,76 +225,46 @@ namespace NKikimr::NBlobDepot {
78
225
TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId,
79
226
TLogoBlobID::MaxCrcMode);
80
227
228
+ if (maxId < minId) {
229
+ return FinishWithError (NLog::PRI_CRIT, " incorrect request: ending key goes before beginning one" );
230
+ }
231
+
81
232
Y_ABORT_UNLESS (minId <= maxId);
82
233
83
234
if (Self->Data ->LastAssimilatedBlobId < maxId) {
84
235
// adjust minId to skip already assimilated items in range query
85
236
if (minId < Self->Data ->LastAssimilatedBlobId ) {
86
237
if (item.GetMustRestoreFirst ()) {
87
- ScanRange (tx, TKey (minId), TKey (*Self->Data ->LastAssimilatedBlobId ),
88
- EScanFlags::INCLUDE_BEGIN, true /* issueGets */ );
238
+ scans. emplace_back ( TKey (minId), TKey (*Self->Data ->LastAssimilatedBlobId ),
239
+ EScanFlags::INCLUDE_BEGIN, true , std::nullopt );
89
240
}
90
241
minId = *Self->Data ->LastAssimilatedBlobId ;
91
242
}
92
243
93
244
// prepare the range first -- we must have it loaded in memory
94
- ScanRange (tx, TKey (minId), TKey (maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END,
95
- false /* issueGets*/ );
96
-
97
- // issue scan query
98
- IssueRange (tabletId, minId, maxId, item.GetMustRestoreFirst ());
245
+ scans.emplace_back (TKey (minId), TKey (maxId),
246
+ EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, false ,
247
+ std::make_tuple (tabletId, minId, maxId, item.GetMustRestoreFirst ()));
99
248
} else if (item.GetMustRestoreFirst ()) {
100
- ScanRange (tx, TKey (minId), TKey (maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END ,
101
- true /* issueGets */ );
249
+ scans. emplace_back ( TKey (minId), TKey (maxId),
250
+ EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, true , std::nullopt );
102
251
}
103
252
104
253
break ;
105
254
}
106
255
107
- case NKikimrBlobDepot::TEvResolve::TItem::kExactKey : {
108
- TData::TKey key = TKey::FromBinaryKey (item.GetExactKey (), Self->Config );
109
- while (!Self->Data ->EnsureKeyLoaded (key, *tx)) {
110
- tx.RestartTx ();
111
- }
112
- const TValue *value = Self->Data ->FindKey (key);
113
- const bool notYetAssimilated = Self->Data ->LastAssimilatedBlobId < key.GetBlobId ();
114
- const bool doGet = !value ? notYetAssimilated :
115
- value->GoingToAssimilate ? item.GetMustRestoreFirst () : notYetAssimilated;
116
- if (doGet) {
117
- IssueGet (key.GetBlobId (), item.GetMustRestoreFirst ());
118
- }
256
+ case NKikimrBlobDepot::TEvResolve::TItem::kExactKey :
257
+ // this would be processed inside the tx
119
258
break ;
120
- }
121
259
122
260
case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET:
123
- Y_DEBUG_ABORT_UNLESS (false );
124
- break ;
261
+ return FinishWithError (NLog::PRI_CRIT, " incorrect request: key designator not set" );
125
262
}
126
263
}
127
264
128
- tx.FinishTx ();
129
- TActivationContext::Send (new IEventHandle (TEvPrivate::EvTxComplete, 0 , SelfId (), {}, nullptr , 0 ));
130
- }
131
-
132
- void ScanRange (TCoroTx::TContextBase& tx, TKey from, TKey to, TScanFlags flags, bool issueGets) {
133
- bool progress = false ;
134
-
135
- auto callback = [&](const TKey& key, const TValue& value) {
136
- if (issueGets && value.GoingToAssimilate ) {
137
- IssueGet (key.GetBlobId (), true /* mustRestoreFirst*/ );
138
- }
139
- return true ;
140
- };
141
-
142
- TScanRange r{from, to, flags};
143
- while (!Self->Data ->ScanRange (r, tx.GetTxc (), &progress, callback)) {
144
- if (std::exchange (progress, false )) {
145
- tx.FinishTx ();
146
- tx.RunSuccessorTx ();
147
- } else {
148
- tx.RestartTx ();
149
- }
150
- }
265
+ Self->Execute (std::make_unique<TTxPrepare>(this , std::move (scans)));
266
+ ++TxInFlight;
267
+ Become (&TThis::StateFunc);
151
268
}
152
269
153
270
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -240,6 +357,9 @@ namespace NKikimr::NBlobDepot {
240
357
}
241
358
} else if (r.Status == NKikimrProto::NODATA) {
242
359
AssimilatedBlobs.push_back ({TData::TKey (r.Id ), TAssimilatedBlobInfo::TDrop{}});
360
+ if (AssimilatedBlobs.size () >= 10'000 ) {
361
+ IssueTxCommitAssimilatedBlob ();
362
+ }
243
363
} else {
244
364
// mark this specific key as unresolvable
245
365
ResolutionErrors.emplace (r.Id );
@@ -298,6 +418,9 @@ namespace NKikimr::NBlobDepot {
298
418
} else {
299
419
AssimilatedBlobs.push_back ({std::move (key), TAssimilatedBlobInfo::TUpdate{
300
420
TBlobSeqId::FromLogoBlobId (msg.Id ), keep, doNotKeep}});
421
+ if (AssimilatedBlobs.size () >= 10'000 ) {
422
+ IssueTxCommitAssimilatedBlob ();
423
+ }
301
424
}
302
425
303
426
Y_ABORT_UNLESS (PutsInFlight);
@@ -319,10 +442,7 @@ namespace NKikimr::NBlobDepot {
319
442
}
320
443
321
444
if (!AssimilatedBlobs.empty ()) {
322
- Self->Data ->ExecuteTxCommitAssimilatedBlob (std::exchange (AssimilatedBlobs, {}), TEvPrivate::EvTxComplete,
323
- SelfId (), 0 );
324
- ++TxInFlight;
325
- return ;
445
+ return IssueTxCommitAssimilatedBlob ();
326
446
}
327
447
328
448
Y_ABORT_UNLESS (!Finished);
@@ -332,26 +452,16 @@ namespace NKikimr::NBlobDepot {
332
452
(Cookie, Ev->Cookie ), (ResolutionErrors.size , ResolutionErrors.size ()),
333
453
(DecommitBlobs.size , DecommitBlobs.size ()));
334
454
335
- Self->Execute (std::make_unique<TCoroTx>(Self, TTokens{{Token}}, [self = Self, decommitBlobs = std::move (DecommitBlobs),
336
- ev = Ev, resolutionErrors = std::move (ResolutionErrors)](TCoroTx::TContextBase& tx) mutable {
337
- ui32 numItemsProcessed = 0 ;
338
- for (const auto & blob : decommitBlobs) {
339
- if (numItemsProcessed == 10'000 ) {
340
- tx.FinishTx ();
341
- self->Data ->CommitTrash (&tx);
342
- numItemsProcessed = 0 ;
343
- tx.RunSuccessorTx ();
344
- }
345
- numItemsProcessed += self->Data ->AddDataOnDecommit (blob, *tx, &tx);
346
- }
347
- tx.FinishTx ();
348
- self->Data ->CommitTrash (&tx);
349
- self->Data ->ExecuteTxResolve (ev, std::move (resolutionErrors));
350
- }));
351
-
455
+ Self->Execute (std::make_unique<TTxDecommitBlobs>(Self, std::move (ResolutionErrors), std::move (DecommitBlobs), Ev));
352
456
PassAway ();
353
457
}
354
458
459
+ void IssueTxCommitAssimilatedBlob () {
460
+ Self->Data ->ExecuteTxCommitAssimilatedBlob (std::exchange (AssimilatedBlobs, {}), TEvPrivate::EvTxComplete,
461
+ SelfId (), 0 );
462
+ ++TxInFlight;
463
+ }
464
+
355
465
void FinishWithError (NLog::EPriority prio, TString errorReason) {
356
466
Y_ABORT_UNLESS (!Finished);
357
467
Finished = true ;
0 commit comments