@@ -41,10 +41,10 @@ SliceSnapshot::SliceSnapshot(DbSlice* slice, CompressionMode compression_mode,
41
41
std::function<void (std::string)> on_push_record,
42
42
std::function<void ()> on_snapshot_finish)
43
43
: db_slice_(slice),
44
+ db_array_ (slice->databases ()),
44
45
compression_mode_(compression_mode),
45
46
on_push_(on_push_record),
46
47
on_snapshot_finish_(on_snapshot_finish) {
47
- db_array_ = slice->databases ();
48
48
tl_slice_snapshots.insert (this );
49
49
}
50
50
@@ -127,8 +127,9 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
127
127
128
128
journal->UnregisterOnChange (cb_id);
129
129
if (!cancel) {
130
+ util::fb2::LockGuard guard (big_value_mu_);
130
131
serializer_->SendJournalOffset (journal->GetLsn ());
131
- PushSerialized (true );
132
+ PushSerializedUnderLock (true );
132
133
}
133
134
}
134
135
@@ -164,16 +165,15 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
164
165
165
166
uint64_t last_yield = 0 ;
166
167
PrimeTable* pt = &db_array_[db_indx]->prime ;
167
- current_db_ = db_indx;
168
168
169
169
VLOG (1 ) << " Start traversing " << pt->size () << " items for index " << db_indx;
170
170
do {
171
171
if (cll->IsCancelled ()) {
172
172
return ;
173
173
}
174
174
175
- PrimeTable::Cursor next =
176
- pt-> TraverseBuckets ( cursor, absl::bind_front (&SliceSnapshot::BucketSaveCb, this ));
175
+ PrimeTable::Cursor next = pt-> TraverseBuckets (
176
+ cursor, absl::bind_front (&SliceSnapshot::BucketSaveCb, this , db_indx ));
177
177
cursor = next;
178
178
PushSerialized (false );
179
179
@@ -195,8 +195,9 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
195
195
196
196
CHECK (!serialize_bucket_running_);
197
197
if (send_full_sync_cut) {
198
+ util::fb2::LockGuard guard (big_value_mu_);
198
199
CHECK (!serializer_->SendFullSyncCut ());
199
- PushSerialized (true );
200
+ PushSerializedUnderLock (true );
200
201
}
201
202
202
203
// serialized + side_saved must be equal to the total saved.
@@ -213,8 +214,11 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
213
214
214
215
// The replica sends the LSN of the next entry is wants to receive.
215
216
while (!cntx->IsCancelled () && journal->IsLSNInBuffer (lsn)) {
216
- serializer_->WriteJournalEntry (journal->GetEntry (lsn));
217
- PushSerialized (false );
217
+ {
218
+ util::fb2::LockGuard guard (big_value_mu_);
219
+ serializer_->WriteJournalEntry (journal->GetEntry (lsn));
220
+ PushSerializedUnderLock (false );
221
+ }
218
222
lsn++;
219
223
}
220
224
@@ -231,7 +235,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
231
235
// GetLsn() is always the next lsn that we expect to create.
232
236
if (journal->GetLsn () == lsn) {
233
237
{
234
- FiberAtomicGuard fg ;
238
+ util::fb2::LockGuard guard (big_value_mu_) ;
235
239
serializer_->SendFullSyncCut ();
236
240
}
237
241
auto journal_cb = absl::bind_front (&SliceSnapshot::OnJournalEntry, this );
@@ -247,9 +251,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
247
251
}
248
252
}
249
253
250
- bool SliceSnapshot::BucketSaveCb (PrimeTable::bucket_iterator it) {
251
- std::lock_guard guard (big_value_mu_);
252
-
254
+ bool SliceSnapshot::BucketSaveCb (DbIndex db_index, PrimeTable::bucket_iterator it) {
253
255
++stats_.savecb_calls ;
254
256
255
257
auto check = [&](auto v) {
@@ -266,7 +268,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
266
268
return false ;
267
269
}
268
270
269
- db_slice_->FlushChangeToEarlierCallbacks (current_db_ , DbSlice::Iterator::FromPrime (it),
271
+ db_slice_->FlushChangeToEarlierCallbacks (db_index , DbSlice::Iterator::FromPrime (it),
270
272
snapshot_version_);
271
273
272
274
auto * blocking_counter = db_slice_->BlockingCounter ();
@@ -275,7 +277,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
275
277
// zero.
276
278
std::lock_guard blocking_counter_guard (*blocking_counter);
277
279
278
- stats_.loop_serialized += SerializeBucket (current_db_ , it);
280
+ stats_.loop_serialized += SerializeBucket (db_index , it);
279
281
280
282
return false ;
281
283
}
@@ -291,20 +293,19 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
291
293
while (!it.is_done ()) {
292
294
++result;
293
295
// might preempt due to big value serialization.
294
- SerializeEntry (db_index, it->first , it->second , nullopt, serializer_. get () );
296
+ SerializeEntry (db_index, it->first , it->second );
295
297
++it;
296
298
}
297
299
serialize_bucket_running_ = false ;
298
300
return result;
299
301
}
300
302
301
- void SliceSnapshot::SerializeEntry (DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
302
- optional<uint64_t > expire, RdbSerializer* serializer) {
303
+ void SliceSnapshot::SerializeEntry (DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv) {
303
304
if (pv.IsExternal () && pv.IsCool ())
304
- return SerializeEntry (db_indx, pk, pv.GetCool ().record ->value , expire, serializer );
305
+ return SerializeEntry (db_indx, pk, pv.GetCool ().record ->value );
305
306
306
- time_t expire_time = expire. value_or ( 0 ) ;
307
- if (!expire && pv.HasExpire ()) {
307
+ time_t expire_time = 0 ;
308
+ if (pv.HasExpire ()) {
308
309
auto eit = db_array_[db_indx]->expire .Find (pk);
309
310
expire_time = db_slice_->ExpireTime (eit);
310
311
}
@@ -317,11 +318,15 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
317
318
EngineShard::tlocal ()->tiered_storage ()->Read (
318
319
db_indx, pk.ToString (), pv,
319
320
[future](const std::string& v) mutable { future.Resolve (PrimeValue (v)); });
320
- delayed_entries_.push_back (
321
- {db_indx, PrimeKey (pk.ToString ()), std::move (future), expire_time, mc_flags});
321
+
322
+ auto key = PrimeKey (pk.ToString ());
323
+
324
+ util::fb2::LockGuard guard (big_value_mu_);
325
+ delayed_entries_.push_back ({db_indx, std::move (key), std::move (future), expire_time, mc_flags});
322
326
++type_freq_map_[RDB_TYPE_STRING];
323
327
} else {
324
- io::Result<uint8_t > res = serializer->SaveEntry (pk, pv, expire_time, mc_flags, db_indx);
328
+ util::fb2::LockGuard guard (big_value_mu_);
329
+ io::Result<uint8_t > res = serializer_->SaveEntry (pk, pv, expire_time, mc_flags, db_indx);
325
330
CHECK (res);
326
331
++type_freq_map_[*res];
327
332
}
@@ -360,6 +365,11 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
360
365
}
361
366
362
367
bool SliceSnapshot::PushSerialized (bool force) {
368
+ util::fb2::LockGuard guard (big_value_mu_);
369
+ return PushSerializedUnderLock (force);
370
+ }
371
+
372
+ bool SliceSnapshot::PushSerializedUnderLock (bool force) {
363
373
if (!force && serializer_->SerializedLen () < kMinBlobSize )
364
374
return false ;
365
375
@@ -383,8 +393,6 @@ bool SliceSnapshot::PushSerialized(bool force) {
383
393
}
384
394
385
395
void SliceSnapshot::OnDbChange (DbIndex db_index, const DbSlice::ChangeReq& req) {
386
- std::lock_guard guard (big_value_mu_);
387
-
388
396
PrimeTable* table = db_slice_->GetTables (db_index).first ;
389
397
const PrimeTable::bucket_iterator* bit = req.update ();
390
398
@@ -409,9 +417,9 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
409
417
// To enable journal flushing to sync after non auto journal command is executed we call
410
418
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
411
419
// additional journal change to serialize, it simply invokes PushSerialized.
412
- std::lock_guard guard (big_value_mu_);
413
420
if (item.opcode != journal::Op::NOOP) {
414
- serializer_->WriteJournalEntry (item.data );
421
+ util::fb2::LockGuard guard (big_value_mu_);
422
+ CHECK (!serializer_->WriteJournalEntry (item.data ));
415
423
}
416
424
417
425
if (await) {
0 commit comments