Skip to content

Commit 2c4e79d

Browse files
refactor(snapshot): Add CallSerializerUnderLock method
Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>
1 parent 1cc2c59 commit 2c4e79d

File tree

2 files changed

+59
-31
lines changed

2 files changed

+59
-31
lines changed

src/server/snapshot.cc

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ constexpr size_t kMinBlobSize = 32_KB;
3838

3939
SliceSnapshot::SliceSnapshot(CompressionMode compression_mode, DbSlice* slice,
4040
SnapshotDataConsumerInterface* consumer, Context* cntx)
41-
: db_slice_(slice), db_array_(slice->databases()) compression_mode_(compression_mode), consumer_(consumer), cntx_(cntx) {
41+
: db_slice_(slice),
42+
db_array_(slice->databases()),
43+
compression_mode_(compression_mode),
44+
consumer_(consumer),
45+
cntx_(cntx) {
4246
tl_slice_snapshots.insert(this);
4347
}
4448

@@ -125,9 +129,10 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
125129

126130
journal->UnregisterOnChange(cb_id);
127131
if (!cancel) {
128-
util::fb2::LockGuard guard(big_value_mu_);
129-
serializer_->SendJournalOffset(journal->GetLsn());
130-
PushSerializedUnderLock(true);
132+
CallSerializerUnderLock([&journal](RdbSerializer* serializer) {
133+
serializer->SendJournalOffset(journal->GetLsn());
134+
});
135+
PushSerialized(true);
131136
}
132137
}
133138

@@ -170,8 +175,8 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
170175
return;
171176
}
172177

173-
PrimeTable::Cursor next =
174-
pt->TraverseBuckets(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this));
178+
PrimeTable::Cursor next = pt->TraverseBuckets(
179+
cursor, [this, db_indx](auto it) { return BucketSaveCb(db_indx, it); });
175180
cursor = next;
176181
PushSerialized(false);
177182

@@ -193,9 +198,9 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
193198

194199
CHECK(!serialize_bucket_running_);
195200
if (send_full_sync_cut) {
196-
util::fb2::LockGuard guard(big_value_mu_);
197-
CHECK(!serializer_->SendFullSyncCut());
198-
PushSerializedUnderLock(true);
201+
CallSerializerUnderLock(
202+
[](RdbSerializer* serializer) { CHECK(!serializer->SendFullSyncCut()); });
203+
PushSerialized(true);
199204
}
200205

201206
// serialized + side_saved must be equal to the total saved.
@@ -211,8 +216,10 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
211216
VLOG(1) << "Starting incremental snapshot from lsn=" << lsn;
212217

213218
// The replica sends the LSN of the next entry is wants to receive.
214-
while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) {
215-
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
219+
while (!cntx_->IsCancelled() && journal->IsLSNInBuffer(lsn)) {
220+
CallSerializerUnderLock([entry = journal->GetEntry(lsn)](RdbSerializer* serializer) {
221+
serializer->WriteJournalEntry(entry);
222+
});
216223
PushSerialized(false);
217224
lsn++;
218225
}
@@ -229,10 +236,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
229236

230237
// GetLsn() is always the next lsn that we expect to create.
231238
if (journal->GetLsn() == lsn) {
232-
{
233-
util::fb2::LockGuard guard(big_value_mu_);
234-
serializer_->SendFullSyncCut();
235-
}
239+
CallSerializerUnderLock([](RdbSerializer* serializer) { serializer->SendFullSyncCut(); });
236240
auto journal_cb = [this](const journal::JournalItem& item, bool await) {
237241
OnJournalEntry(item, await);
238242
};
@@ -322,15 +326,20 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
322326
delayed_entries_.push_back({db_indx, std::move(key), std::move(future), expire_time, mc_flags});
323327
++type_freq_map_[RDB_TYPE_STRING];
324328
} else {
325-
util::fb2::LockGuard guard(big_value_mu_);
326-
io::Result<uint8_t> res = serializer_->SaveEntry(pk, pv, expire_time, mc_flags, db_indx);
329+
io::Result<uint8_t> res;
330+
CallSerializerUnderLock([&](RdbSerializer* serializer) {
331+
res = serializer->SaveEntry(pk, pv, expire_time, mc_flags, db_indx);
332+
});
327333
CHECK(res);
328334
++type_freq_map_[*res];
329335
}
330336
}
331337

332338
size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
333339
io::StringFile sfile;
340+
341+
// FlushSerialized is already under lock
342+
// So no locking is needed
334343
serializer_->FlushToSink(&sfile, flush_state);
335344

336345
size_t serialized = sfile.val.size();
@@ -361,29 +370,45 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
361370
return serialized;
362371
}
363372

364-
bool SliceSnapshot::PushSerialized(bool force) {
373+
template <typename Callback> void SliceSnapshot::CallSerializerUnderLock(Callback cb) {
365374
util::fb2::LockGuard guard(big_value_mu_);
366-
return PushSerializedUnderLock(force);
375+
cb(serializer_.get());
367376
}
368377

369-
bool SliceSnapshot::PushSerializedUnderLock(bool force) {
370-
if (!force && serializer_->SerializedLen() < kMinBlobSize)
371-
return false;
378+
bool SliceSnapshot::PushSerialized(bool force) {
379+
if (!force) {
380+
size_t len = 0;
381+
CallSerializerUnderLock(
382+
[&len](RdbSerializer* serializer) { len = serializer->SerializedLen(); });
383+
if (len < kMinBlobSize) {
384+
return false;
385+
}
386+
}
372387

373388
// Flush any of the leftovers to avoid interleavings
374-
size_t serialized = FlushSerialized(FlushState::kFlushEndEntry);
389+
size_t serialized = 0;
390+
{
391+
util::fb2::LockGuard guard(big_value_mu_);
392+
FlushSerialized(FlushState::kFlushEndEntry);
393+
}
375394

376395
if (!delayed_entries_.empty()) {
377396
// Async bucket serialization might have accumulated some delayed values.
378397
// Because we can finally block in this function, we'll await and serialize them
379398
do {
399+
/* We can preempt on SaveEntry so first we need to change all data before
400+
* calling it */
380401
auto& entry = delayed_entries_.back();
381-
serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid,
382-
entry.mc_flags);
383402
delayed_entries_.pop_back();
403+
404+
CallSerializerUnderLock([&entry](RdbSerializer* serializer) {
405+
serializer->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid,
406+
entry.mc_flags);
407+
});
384408
} while (!delayed_entries_.empty());
385409

386410
// blocking point.
411+
util::fb2::LockGuard guard(big_value_mu_);
387412
serialized += FlushSerialized(FlushState::kFlushEndEntry);
388413
}
389414
return serialized > 0;
@@ -415,8 +440,8 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
415440
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
416441
// additional journal change to serialize, it simply invokes PushSerialized.
417442
if (item.opcode != journal::Op::NOOP) {
418-
util::fb2::LockGuard guard(big_value_mu_);
419-
CHECK(!serializer_->WriteJournalEntry(item.data));
443+
CallSerializerUnderLock(
444+
[&item](RdbSerializer* serializer) { CHECK(!serializer->WriteJournalEntry(item.data)); });
420445
}
421446

422447
if (await) {

src/server/snapshot.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,15 @@ class SliceSnapshot {
117117
// Push regardless of buffer size if force is true.
118118
// Return true if pushed. Can block. Is called from the snapshot thread.
119119
bool PushSerialized(bool force);
120-
bool PushSerializedUnderLock(bool force) ABSL_EXCLUSIVE_LOCKS_REQUIRED(big_value_mu_);
121120

122121
// Helper function that flushes the serialized items into the RecordStream.
123122
// Can block.
124123
using FlushState = SerializerBase::FlushState;
125124
size_t FlushSerialized(FlushState flush_state); // ABSL_EXCLUSIVE_LOCKS_REQUIRED(big_value_mu_)
126125

126+
// Calls the provided callback with the serializer under a lock.
127+
template <typename Callback> void CallSerializerUnderLock(Callback cb);
128+
127129
public:
128130
uint64_t snapshot_version() const {
129131
return snapshot_version_;
@@ -152,17 +154,18 @@ class SliceSnapshot {
152154
DbSlice* db_slice_;
153155
const DbTableArray db_array_;
154156

155-
std::unique_ptr<RdbSerializer> serializer_; // ABSL_GUARDED_BY(big_value_mu_);
157+
// Guarded by big_value_mu_
158+
std::unique_ptr<RdbSerializer> serializer_;
156159

157160
// collected during atomic bucket traversal
158-
std::vector<DelayedEntry> delayed_entries_ ABSL_GUARDED_BY(big_value_mu_);
161+
std::vector<DelayedEntry> delayed_entries_;
159162

160163
// Used for sanity checks.
161164
bool serialize_bucket_running_ = false;
162165
util::fb2::Fiber snapshot_fb_; // IterateEntriesFb
163166
util::fb2::CondVarAny seq_cond_;
164167
const CompressionMode compression_mode_;
165-
RdbTypeFreqMap type_freq_map_ ABSL_GUARDED_BY(big_value_mu_);
168+
RdbTypeFreqMap type_freq_map_;
166169

167170
// version upper bound for entries that should be saved (not included).
168171
uint64_t snapshot_version_ = 0;

0 commit comments

Comments
 (0)