@@ -38,8 +38,11 @@ constexpr size_t kMinBlobSize = 32_KB;
38
38
39
39
SliceSnapshot::SliceSnapshot (CompressionMode compression_mode, DbSlice* slice,
40
40
SnapshotDataConsumerInterface* consumer, Context* cntx)
41
- : db_slice_(slice), compression_mode_(compression_mode), consumer_(consumer), cntx_(cntx) {
42
- db_array_ = slice->databases ();
41
+ : db_slice_(slice),
42
+ db_array_ (slice->databases ()),
43
+ compression_mode_(compression_mode),
44
+ consumer_(consumer),
45
+ cntx_(cntx) {
43
46
tl_slice_snapshots.insert (this );
44
47
}
45
48
@@ -126,7 +129,9 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
126
129
127
130
journal->UnregisterOnChange (cb_id);
128
131
if (!cancel) {
129
- serializer_->SendJournalOffset (journal->GetLsn ());
132
+ CallSerializerUnderLock ([&journal](RdbSerializer* serializer) {
133
+ serializer->SendJournalOffset (journal->GetLsn ());
134
+ });
130
135
PushSerialized (true );
131
136
}
132
137
}
@@ -163,16 +168,15 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
163
168
164
169
uint64_t last_yield = 0 ;
165
170
PrimeTable* pt = &db_array_[db_indx]->prime ;
166
- current_db_ = db_indx;
167
171
168
172
VLOG (1 ) << " Start traversing " << pt->size () << " items for index " << db_indx;
169
173
do {
170
174
if (cntx_->IsCancelled ()) {
171
175
return ;
172
176
}
173
177
174
- PrimeTable::Cursor next =
175
- pt-> TraverseBuckets ( cursor, [this ](auto it) { return BucketSaveCb (it); });
178
+ PrimeTable::Cursor next = pt-> TraverseBuckets (
179
+ cursor, [this , db_indx ](auto it) { return BucketSaveCb (db_indx, it); });
176
180
cursor = next;
177
181
PushSerialized (false );
178
182
@@ -194,7 +198,8 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
194
198
195
199
CHECK (!serialize_bucket_running_);
196
200
if (send_full_sync_cut) {
197
- CHECK (!serializer_->SendFullSyncCut ());
201
+ CallSerializerUnderLock (
202
+ [](RdbSerializer* serializer) { CHECK (!serializer->SendFullSyncCut ()); });
198
203
PushSerialized (true );
199
204
}
200
205
@@ -212,7 +217,9 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
212
217
213
218
// The replica sends the LSN of the next entry is wants to receive.
214
219
while (!cntx_->IsCancelled () && journal->IsLSNInBuffer (lsn)) {
215
- serializer_->WriteJournalEntry (journal->GetEntry (lsn));
220
+ CallSerializerUnderLock ([entry = journal->GetEntry (lsn)](RdbSerializer* serializer) {
221
+ serializer->WriteJournalEntry (entry);
222
+ });
216
223
PushSerialized (false );
217
224
lsn++;
218
225
}
@@ -229,10 +236,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
229
236
230
237
// GetLsn() is always the next lsn that we expect to create.
231
238
if (journal->GetLsn () == lsn) {
232
- {
233
- FiberAtomicGuard fg;
234
- serializer_->SendFullSyncCut ();
235
- }
239
+ CallSerializerUnderLock ([](RdbSerializer* serializer) { serializer->SendFullSyncCut (); });
236
240
auto journal_cb = [this ](const journal::JournalItem& item, bool await) {
237
241
OnJournalEntry (item, await);
238
242
};
@@ -248,9 +252,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
248
252
}
249
253
}
250
254
251
- bool SliceSnapshot::BucketSaveCb (PrimeTable::bucket_iterator it) {
252
- std::lock_guard guard (big_value_mu_);
253
-
255
+ bool SliceSnapshot::BucketSaveCb (DbIndex db_index, PrimeTable::bucket_iterator it) {
254
256
++stats_.savecb_calls ;
255
257
256
258
auto check = [&](auto v) {
@@ -267,7 +269,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
267
269
return false ;
268
270
}
269
271
270
- db_slice_->FlushChangeToEarlierCallbacks (current_db_ , DbSlice::Iterator::FromPrime (it),
272
+ db_slice_->FlushChangeToEarlierCallbacks (db_index , DbSlice::Iterator::FromPrime (it),
271
273
snapshot_version_);
272
274
273
275
auto * blocking_counter = db_slice_->BlockingCounter ();
@@ -276,7 +278,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
276
278
// zero.
277
279
std::lock_guard blocking_counter_guard (*blocking_counter);
278
280
279
- stats_.loop_serialized += SerializeBucket (current_db_ , it);
281
+ stats_.loop_serialized += SerializeBucket (db_index , it);
280
282
281
283
return false ;
282
284
}
@@ -292,20 +294,19 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
292
294
while (!it.is_done ()) {
293
295
++result;
294
296
// might preempt due to big value serialization.
295
- SerializeEntry (db_index, it->first , it->second , nullopt, serializer_. get () );
297
+ SerializeEntry (db_index, it->first , it->second );
296
298
++it;
297
299
}
298
300
serialize_bucket_running_ = false ;
299
301
return result;
300
302
}
301
303
302
- void SliceSnapshot::SerializeEntry (DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
303
- optional<uint64_t > expire, RdbSerializer* serializer) {
304
+ void SliceSnapshot::SerializeEntry (DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv) {
304
305
if (pv.IsExternal () && pv.IsCool ())
305
- return SerializeEntry (db_indx, pk, pv.GetCool ().record ->value , expire, serializer );
306
+ return SerializeEntry (db_indx, pk, pv.GetCool ().record ->value );
306
307
307
- time_t expire_time = expire. value_or ( 0 ) ;
308
- if (!expire && pv.HasExpire ()) {
308
+ time_t expire_time = 0 ;
309
+ if (pv.HasExpire ()) {
309
310
auto eit = db_array_[db_indx]->expire .Find (pk);
310
311
expire_time = db_slice_->ExpireTime (eit);
311
312
}
@@ -318,18 +319,26 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
318
319
EngineShard::tlocal ()->tiered_storage ()->Read (
319
320
db_indx, pk.ToString (), pv,
320
321
[future](const std::string& v) mutable { future.Resolve (PrimeValue (v)); });
321
- delayed_entries_.push_back (
322
- {db_indx, PrimeKey (pk.ToString ()), std::move (future), expire_time, mc_flags});
322
+
323
+ auto key = PrimeKey (pk.ToString ());
324
+
325
+ delayed_entries_.push_back ({db_indx, std::move (key), std::move (future), expire_time, mc_flags});
323
326
++type_freq_map_[RDB_TYPE_STRING];
324
327
} else {
325
- io::Result<uint8_t > res = serializer->SaveEntry (pk, pv, expire_time, mc_flags, db_indx);
328
+ io::Result<uint8_t > res;
329
+ CallSerializerUnderLock ([&](RdbSerializer* serializer) {
330
+ res = serializer->SaveEntry (pk, pv, expire_time, mc_flags, db_indx);
331
+ });
326
332
CHECK (res);
327
333
++type_freq_map_[*res];
328
334
}
329
335
}
330
336
331
337
size_t SliceSnapshot::FlushSerialized (SerializerBase::FlushState flush_state) {
332
338
io::StringFile sfile;
339
+
340
+ // FlushSerialized is already under lock
341
+ // So no locking is needed
333
342
serializer_->FlushToSink (&sfile, flush_state);
334
343
335
344
size_t serialized = sfile.val .size ();
@@ -339,53 +348,59 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
339
348
uint64_t id = rec_id_++;
340
349
DVLOG (2 ) << " Pushing " << id;
341
350
342
- fb2::NoOpLock lk;
343
-
344
- // We create a critical section here that ensures that records are pushed in sequential order.
345
- // As a result, it is not possible for two fiber producers to push concurrently.
346
- // If A.id = 5, and then B.id = 6, and both are blocked here, it means that last_pushed_id_ < 4.
347
- // Once last_pushed_id_ = 4, A will be unblocked, while B will wait until A finishes pushing and
348
- // update last_pushed_id_ to 5.
349
- seq_cond_.wait (lk, [&] { return id == this ->last_pushed_id_ + 1 ; });
350
-
351
351
// Blocking point.
352
352
consumer_->ConsumeData (std::move (sfile.val ), cntx_);
353
353
354
- DCHECK_EQ (last_pushed_id_ + 1 , id);
355
- last_pushed_id_ = id;
356
- seq_cond_.notify_all ();
357
-
358
354
VLOG (2 ) << " Pushed with Serialize() " << serialized;
359
355
360
356
return serialized;
361
357
}
362
358
359
+ template <typename Callback> void SliceSnapshot::CallSerializerUnderLock (Callback cb) {
360
+ util::fb2::LockGuard guard (big_value_mu_);
361
+ cb (serializer_.get ());
362
+ }
363
+
363
364
bool SliceSnapshot::PushSerialized (bool force) {
364
- if (!force && serializer_->SerializedLen () < kMinBlobSize )
365
- return false ;
365
+ if (!force) {
366
+ size_t len = 0 ;
367
+ CallSerializerUnderLock (
368
+ [&len](RdbSerializer* serializer) { len = serializer->SerializedLen (); });
369
+ if (len < kMinBlobSize ) {
370
+ return false ;
371
+ }
372
+ }
366
373
367
374
// Flush any of the leftovers to avoid interleavings
368
- size_t serialized = FlushSerialized (FlushState::kFlushEndEntry );
375
+ size_t serialized = 0 ;
376
+ {
377
+ util::fb2::LockGuard guard (big_value_mu_);
378
+ FlushSerialized (FlushState::kFlushEndEntry );
379
+ }
369
380
370
381
if (!delayed_entries_.empty ()) {
371
382
// Async bucket serialization might have accumulated some delayed values.
372
383
// Because we can finally block in this function, we'll await and serialize them
373
384
do {
385
+ /* We can preempt on SaveEntry so first we need to change all data before
386
+ * calling it */
374
387
auto & entry = delayed_entries_.back ();
375
- serializer_->SaveEntry (entry.key , entry.value .Get (), entry.expire , entry.dbid ,
376
- entry.mc_flags );
377
388
delayed_entries_.pop_back ();
389
+
390
+ CallSerializerUnderLock ([&entry](RdbSerializer* serializer) {
391
+ serializer->SaveEntry (entry.key , entry.value .Get (), entry.expire , entry.dbid ,
392
+ entry.mc_flags );
393
+ });
378
394
} while (!delayed_entries_.empty ());
379
395
380
396
// blocking point.
397
+ util::fb2::LockGuard guard (big_value_mu_);
381
398
serialized += FlushSerialized (FlushState::kFlushEndEntry );
382
399
}
383
400
return serialized > 0 ;
384
401
}
385
402
386
403
void SliceSnapshot::OnDbChange (DbIndex db_index, const DbSlice::ChangeReq& req) {
387
- std::lock_guard guard (big_value_mu_);
388
-
389
404
PrimeTable* table = db_slice_->GetTables (db_index).first ;
390
405
const PrimeTable::bucket_iterator* bit = req.update ();
391
406
@@ -410,9 +425,9 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
410
425
// To enable journal flushing to sync after non auto journal command is executed we call
411
426
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
412
427
// additional journal change to serialize, it simply invokes PushSerialized.
413
- std::lock_guard guard (big_value_mu_);
414
428
if (item.opcode != journal::Op::NOOP) {
415
- serializer_->WriteJournalEntry (item.data );
429
+ CallSerializerUnderLock (
430
+ [&item](RdbSerializer* serializer) { CHECK (!serializer->WriteJournalEntry (item.data )); });
416
431
}
417
432
418
433
if (await) {
0 commit comments