@@ -38,8 +38,7 @@ constexpr size_t kMinBlobSize = 32_KB;
38
38
39
39
SliceSnapshot::SliceSnapshot (CompressionMode compression_mode, DbSlice* slice,
40
40
SnapshotDataConsumerInterface* consumer, Context* cntx)
41
- : db_slice_(slice), compression_mode_(compression_mode), consumer_(consumer), cntx_(cntx) {
42
- db_array_ = slice->databases ();
41
+ : db_slice_(slice), db_array_(slice->databases ()) compression_mode_(compression_mode), consumer_(consumer), cntx_(cntx) {
43
42
tl_slice_snapshots.insert (this );
44
43
}
45
44
@@ -126,8 +125,9 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
126
125
127
126
journal->UnregisterOnChange (cb_id);
128
127
if (!cancel) {
128
+ util::fb2::LockGuard guard (big_value_mu_);
129
129
serializer_->SendJournalOffset (journal->GetLsn ());
130
- PushSerialized (true );
130
+ PushSerializedUnderLock (true );
131
131
}
132
132
}
133
133
@@ -163,7 +163,6 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
163
163
164
164
uint64_t last_yield = 0 ;
165
165
PrimeTable* pt = &db_array_[db_indx]->prime ;
166
- current_db_ = db_indx;
167
166
168
167
VLOG (1 ) << " Start traversing " << pt->size () << " items for index " << db_indx;
169
168
do {
@@ -172,7 +171,7 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
172
171
}
173
172
174
173
PrimeTable::Cursor next =
175
- pt->TraverseBuckets (cursor, [ this ]( auto it) { return BucketSaveCb (it); } );
174
+ pt->TraverseBuckets (cursor, absl::bind_front (&SliceSnapshot:: BucketSaveCb, this ) );
176
175
cursor = next;
177
176
PushSerialized (false );
178
177
@@ -194,8 +193,9 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
194
193
195
194
CHECK (!serialize_bucket_running_);
196
195
if (send_full_sync_cut) {
196
+ util::fb2::LockGuard guard (big_value_mu_);
197
197
CHECK (!serializer_->SendFullSyncCut ());
198
- PushSerialized (true );
198
+ PushSerializedUnderLock (true );
199
199
}
200
200
201
201
// serialized + side_saved must be equal to the total saved.
@@ -211,7 +211,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
211
211
VLOG (1 ) << " Starting incremental snapshot from lsn=" << lsn;
212
212
213
213
// The replica sends the LSN of the next entry is wants to receive.
214
- while (!cntx_ ->IsCancelled () && journal->IsLSNInBuffer (lsn)) {
214
+ while (!cntx ->IsCancelled () && journal->IsLSNInBuffer (lsn)) {
215
215
serializer_->WriteJournalEntry (journal->GetEntry (lsn));
216
216
PushSerialized (false );
217
217
lsn++;
@@ -230,7 +230,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
230
230
// GetLsn() is always the next lsn that we expect to create.
231
231
if (journal->GetLsn () == lsn) {
232
232
{
233
- FiberAtomicGuard fg ;
233
+ util::fb2::LockGuard guard (big_value_mu_) ;
234
234
serializer_->SendFullSyncCut ();
235
235
}
236
236
auto journal_cb = [this ](const journal::JournalItem& item, bool await) {
@@ -248,9 +248,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
248
248
}
249
249
}
250
250
251
- bool SliceSnapshot::BucketSaveCb (PrimeTable::bucket_iterator it) {
252
- std::lock_guard guard (big_value_mu_);
253
-
251
+ bool SliceSnapshot::BucketSaveCb (DbIndex db_index, PrimeTable::bucket_iterator it) {
254
252
++stats_.savecb_calls ;
255
253
256
254
auto check = [&](auto v) {
@@ -267,7 +265,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
267
265
return false ;
268
266
}
269
267
270
- db_slice_->FlushChangeToEarlierCallbacks (current_db_ , DbSlice::Iterator::FromPrime (it),
268
+ db_slice_->FlushChangeToEarlierCallbacks (db_index , DbSlice::Iterator::FromPrime (it),
271
269
snapshot_version_);
272
270
273
271
auto * blocking_counter = db_slice_->BlockingCounter ();
@@ -276,7 +274,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
276
274
// zero.
277
275
std::lock_guard blocking_counter_guard (*blocking_counter);
278
276
279
- stats_.loop_serialized += SerializeBucket (current_db_ , it);
277
+ stats_.loop_serialized += SerializeBucket (db_index , it);
280
278
281
279
return false ;
282
280
}
@@ -292,20 +290,19 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
292
290
while (!it.is_done ()) {
293
291
++result;
294
292
// might preempt due to big value serialization.
295
- SerializeEntry (db_index, it->first , it->second , nullopt, serializer_. get () );
293
+ SerializeEntry (db_index, it->first , it->second );
296
294
++it;
297
295
}
298
296
serialize_bucket_running_ = false ;
299
297
return result;
300
298
}
301
299
302
- void SliceSnapshot::SerializeEntry (DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
303
- optional<uint64_t > expire, RdbSerializer* serializer) {
300
+ void SliceSnapshot::SerializeEntry (DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv) {
304
301
if (pv.IsExternal () && pv.IsCool ())
305
- return SerializeEntry (db_indx, pk, pv.GetCool ().record ->value , expire, serializer );
302
+ return SerializeEntry (db_indx, pk, pv.GetCool ().record ->value );
306
303
307
- time_t expire_time = expire. value_or ( 0 ) ;
308
- if (!expire && pv.HasExpire ()) {
304
+ time_t expire_time = 0 ;
305
+ if (pv.HasExpire ()) {
309
306
auto eit = db_array_[db_indx]->expire .Find (pk);
310
307
expire_time = db_slice_->ExpireTime (eit);
311
308
}
@@ -318,11 +315,15 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
318
315
EngineShard::tlocal ()->tiered_storage ()->Read (
319
316
db_indx, pk.ToString (), pv,
320
317
[future](const std::string& v) mutable { future.Resolve (PrimeValue (v)); });
321
- delayed_entries_.push_back (
322
- {db_indx, PrimeKey (pk.ToString ()), std::move (future), expire_time, mc_flags});
318
+
319
+ auto key = PrimeKey (pk.ToString ());
320
+
321
+ util::fb2::LockGuard guard (big_value_mu_);
322
+ delayed_entries_.push_back ({db_indx, std::move (key), std::move (future), expire_time, mc_flags});
323
323
++type_freq_map_[RDB_TYPE_STRING];
324
324
} else {
325
- io::Result<uint8_t > res = serializer->SaveEntry (pk, pv, expire_time, mc_flags, db_indx);
325
+ util::fb2::LockGuard guard (big_value_mu_);
326
+ io::Result<uint8_t > res = serializer_->SaveEntry (pk, pv, expire_time, mc_flags, db_indx);
326
327
CHECK (res);
327
328
++type_freq_map_[*res];
328
329
}
@@ -361,6 +362,11 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
361
362
}
362
363
363
364
bool SliceSnapshot::PushSerialized (bool force) {
365
+ util::fb2::LockGuard guard (big_value_mu_);
366
+ return PushSerializedUnderLock (force);
367
+ }
368
+
369
+ bool SliceSnapshot::PushSerializedUnderLock (bool force) {
364
370
if (!force && serializer_->SerializedLen () < kMinBlobSize )
365
371
return false ;
366
372
@@ -384,8 +390,6 @@ bool SliceSnapshot::PushSerialized(bool force) {
384
390
}
385
391
386
392
void SliceSnapshot::OnDbChange (DbIndex db_index, const DbSlice::ChangeReq& req) {
387
- std::lock_guard guard (big_value_mu_);
388
-
389
393
PrimeTable* table = db_slice_->GetTables (db_index).first ;
390
394
const PrimeTable::bucket_iterator* bit = req.update ();
391
395
@@ -410,9 +414,9 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
410
414
// To enable journal flushing to sync after non auto journal command is executed we call
411
415
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
412
416
// additional journal change to serialize, it simply invokes PushSerialized.
413
- std::lock_guard guard (big_value_mu_);
414
417
if (item.opcode != journal::Op::NOOP) {
415
- serializer_->WriteJournalEntry (item.data );
418
+ util::fb2::LockGuard guard (big_value_mu_);
419
+ CHECK (!serializer_->WriteJournalEntry (item.data ));
416
420
}
417
421
418
422
if (await) {
0 commit comments