@@ -38,7 +38,11 @@ constexpr size_t kMinBlobSize = 32_KB;
38
38
39
39
SliceSnapshot::SliceSnapshot (CompressionMode compression_mode, DbSlice* slice,
40
40
SnapshotDataConsumerInterface* consumer, Context* cntx)
41
- : db_slice_(slice), db_array_(slice->databases ()) compression_mode_(compression_mode), consumer_(consumer), cntx_(cntx) {
41
+ : db_slice_(slice),
42
+ db_array_ (slice->databases ()),
43
+ compression_mode_(compression_mode),
44
+ consumer_(consumer),
45
+ cntx_(cntx) {
42
46
tl_slice_snapshots.insert (this );
43
47
}
44
48
@@ -125,9 +129,10 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
125
129
126
130
journal->UnregisterOnChange (cb_id);
127
131
if (!cancel) {
128
- util::fb2::LockGuard guard (big_value_mu_);
129
- serializer_->SendJournalOffset (journal->GetLsn ());
130
- PushSerializedUnderLock (true );
132
+ CallSerializerUnderLock ([&journal](RdbSerializer* serializer) {
133
+ serializer->SendJournalOffset (journal->GetLsn ());
134
+ });
135
+ PushSerialized (true );
131
136
}
132
137
}
133
138
@@ -170,8 +175,8 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
170
175
return ;
171
176
}
172
177
173
- PrimeTable::Cursor next =
174
- pt-> TraverseBuckets ( cursor, absl::bind_front (&SliceSnapshot:: BucketSaveCb, this ) );
178
+ PrimeTable::Cursor next = pt-> TraverseBuckets (
179
+ cursor, [ this , db_indx]( auto it) { return BucketSaveCb (db_indx, it); } );
175
180
cursor = next;
176
181
PushSerialized (false );
177
182
@@ -193,9 +198,9 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
193
198
194
199
CHECK (!serialize_bucket_running_);
195
200
if (send_full_sync_cut) {
196
- util::fb2::LockGuard guard (big_value_mu_);
197
- CHECK (!serializer_ ->SendFullSyncCut ());
198
- PushSerializedUnderLock (true );
201
+ CallSerializerUnderLock (
202
+ [](RdbSerializer* serializer) { CHECK (!serializer ->SendFullSyncCut ()); } );
203
+ PushSerialized (true );
199
204
}
200
205
201
206
// serialized + side_saved must be equal to the total saved.
@@ -211,8 +216,10 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
211
216
VLOG (1 ) << " Starting incremental snapshot from lsn=" << lsn;
212
217
213
218
// The replica sends the LSN of the next entry is wants to receive.
214
- while (!cntx->IsCancelled () && journal->IsLSNInBuffer (lsn)) {
215
- serializer_->WriteJournalEntry (journal->GetEntry (lsn));
219
+ while (!cntx_->IsCancelled () && journal->IsLSNInBuffer (lsn)) {
220
+ CallSerializerUnderLock ([entry = journal->GetEntry (lsn)](RdbSerializer* serializer) {
221
+ serializer->WriteJournalEntry (entry);
222
+ });
216
223
PushSerialized (false );
217
224
lsn++;
218
225
}
@@ -229,10 +236,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
229
236
230
237
// GetLsn() is always the next lsn that we expect to create.
231
238
if (journal->GetLsn () == lsn) {
232
- {
233
- util::fb2::LockGuard guard (big_value_mu_);
234
- serializer_->SendFullSyncCut ();
235
- }
239
+ CallSerializerUnderLock ([](RdbSerializer* serializer) { serializer->SendFullSyncCut (); });
236
240
auto journal_cb = [this ](const journal::JournalItem& item, bool await) {
237
241
OnJournalEntry (item, await);
238
242
};
@@ -322,15 +326,20 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
322
326
delayed_entries_.push_back ({db_indx, std::move (key), std::move (future), expire_time, mc_flags});
323
327
++type_freq_map_[RDB_TYPE_STRING];
324
328
} else {
325
- util::fb2::LockGuard guard (big_value_mu_);
326
- io::Result<uint8_t > res = serializer_->SaveEntry (pk, pv, expire_time, mc_flags, db_indx);
329
+ io::Result<uint8_t > res;
330
+ CallSerializerUnderLock ([&](RdbSerializer* serializer) {
331
+ res = serializer->SaveEntry (pk, pv, expire_time, mc_flags, db_indx);
332
+ });
327
333
CHECK (res);
328
334
++type_freq_map_[*res];
329
335
}
330
336
}
331
337
332
338
size_t SliceSnapshot::FlushSerialized (SerializerBase::FlushState flush_state) {
333
339
io::StringFile sfile;
340
+
341
+ // FlushSerialized is already under lock
342
+ // So no locking is needed
334
343
serializer_->FlushToSink (&sfile, flush_state);
335
344
336
345
size_t serialized = sfile.val .size ();
@@ -361,29 +370,45 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
361
370
return serialized;
362
371
}
363
372
364
- bool SliceSnapshot::PushSerialized ( bool force ) {
373
+ template < typename Callback> void SliceSnapshot::CallSerializerUnderLock (Callback cb ) {
365
374
util::fb2::LockGuard guard (big_value_mu_);
366
- return PushSerializedUnderLock (force );
375
+ cb (serializer_. get () );
367
376
}
368
377
369
- bool SliceSnapshot::PushSerializedUnderLock (bool force) {
370
- if (!force && serializer_->SerializedLen () < kMinBlobSize )
371
- return false ;
378
+ bool SliceSnapshot::PushSerialized (bool force) {
379
+ if (!force) {
380
+ size_t len = 0 ;
381
+ CallSerializerUnderLock (
382
+ [&len](RdbSerializer* serializer) { len = serializer->SerializedLen (); });
383
+ if (len < kMinBlobSize ) {
384
+ return false ;
385
+ }
386
+ }
372
387
373
388
// Flush any of the leftovers to avoid interleavings
374
- size_t serialized = FlushSerialized (FlushState::kFlushEndEntry );
389
+ size_t serialized = 0 ;
390
+ {
391
+ util::fb2::LockGuard guard (big_value_mu_);
392
+ FlushSerialized (FlushState::kFlushEndEntry );
393
+ }
375
394
376
395
if (!delayed_entries_.empty ()) {
377
396
// Async bucket serialization might have accumulated some delayed values.
378
397
// Because we can finally block in this function, we'll await and serialize them
379
398
do {
399
+ /* We can preempt on SaveEntry so first we need to change all data before
400
+ * calling it */
380
401
auto & entry = delayed_entries_.back ();
381
- serializer_->SaveEntry (entry.key , entry.value .Get (), entry.expire , entry.dbid ,
382
- entry.mc_flags );
383
402
delayed_entries_.pop_back ();
403
+
404
+ CallSerializerUnderLock ([&entry](RdbSerializer* serializer) {
405
+ serializer->SaveEntry (entry.key , entry.value .Get (), entry.expire , entry.dbid ,
406
+ entry.mc_flags );
407
+ });
384
408
} while (!delayed_entries_.empty ());
385
409
386
410
// blocking point.
411
+ util::fb2::LockGuard guard (big_value_mu_);
387
412
serialized += FlushSerialized (FlushState::kFlushEndEntry );
388
413
}
389
414
return serialized > 0 ;
@@ -415,8 +440,8 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
415
440
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
416
441
// additional journal change to serialize, it simply invokes PushSerialized.
417
442
if (item.opcode != journal::Op::NOOP) {
418
- util::fb2::LockGuard guard (big_value_mu_);
419
- CHECK (!serializer_ ->WriteJournalEntry (item.data ));
443
+ CallSerializerUnderLock (
444
+ [&item](RdbSerializer* serializer) { CHECK (!serializer ->WriteJournalEntry (item.data )); } );
420
445
}
421
446
422
447
if (await) {
0 commit comments