Skip to content

Commit 5cd04f0

Browse files
fix(snapshot): Refactor the big_value_mu_ mutex
fixes #4152 Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>
1 parent 0fe5e86 commit 5cd04f0

File tree

3 files changed

+70
-52
lines changed

3 files changed

+70
-52
lines changed

src/server/rdb_save.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ class RdbSerializer : public SerializerBase {
225225
// Must be called in the thread to which `it` belongs.
226226
// Returns the serialized rdb_type or the error.
227227
// expire_ms = 0 means no expiry.
228+
// This function might preempt if flush_fun_ is used.
228229
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms,
229230
uint32_t mc_flags, DbIndex dbid);
230231

src/server/snapshot.cc

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ SliceSnapshot::SliceSnapshot(DbSlice* slice, CompressionMode compression_mode,
4141
std::function<void(std::string)> on_push_record,
4242
std::function<void()> on_snapshot_finish)
4343
: db_slice_(slice),
44+
db_array_(slice->databases()),
4445
compression_mode_(compression_mode),
4546
on_push_(on_push_record),
4647
on_snapshot_finish_(on_snapshot_finish) {
47-
db_array_ = slice->databases();
4848
tl_slice_snapshots.insert(this);
4949
}
5050

@@ -65,7 +65,8 @@ bool SliceSnapshot::IsSnaphotInProgress() {
6565
return tl_slice_snapshots.size() > 0;
6666
}
6767

68-
void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, SnapshotFlush allow_flush) {
68+
void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, SnapshotFlush allow_flush)
69+
ABSL_LOCKS_EXCLUDED(big_value_mu_) {
6970
DCHECK(!snapshot_fb_.IsJoinable());
7071

7172
auto db_cb = absl::bind_front(&SliceSnapshot::OnDbChange, this);
@@ -81,15 +82,16 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot
8182
const auto flush_threshold = ServerState::tlocal()->serialization_max_chunk_size;
8283
std::function<void(size_t, RdbSerializer::FlushState)> flush_fun;
8384
if (flush_threshold != 0 && allow_flush == SnapshotFlush::kAllow) {
84-
flush_fun = [this, flush_threshold](size_t bytes_serialized,
85-
RdbSerializer::FlushState flush_state) {
86-
if (bytes_serialized > flush_threshold) {
87-
size_t serialized = FlushSerialized(flush_state);
88-
VLOG(2) << "FlushSerialized " << serialized << " bytes";
89-
auto& stats = ServerState::tlocal()->stats;
90-
++stats.big_value_preemptions;
91-
}
92-
};
85+
flush_fun =
86+
[this, flush_threshold](size_t bytes_serialized, RdbSerializer::FlushState flush_state)
87+
ABSL_LOCKS_EXCLUDED(big_value_mu_) {
88+
if (bytes_serialized > flush_threshold) {
89+
size_t serialized = FlushSerialized(flush_state);
90+
VLOG(2) << "FlushSerialized " << serialized << " bytes";
91+
auto& stats = ServerState::tlocal()->stats;
92+
++stats.big_value_preemptions;
93+
}
94+
};
9395
}
9496
serializer_ = std::make_unique<RdbSerializer>(compression_mode_, flush_fun);
9597

@@ -102,7 +104,8 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot
102104
});
103105
}
104106

105-
void SliceSnapshot::StartIncremental(Context* cntx, LSN start_lsn) {
107+
void SliceSnapshot::StartIncremental(Context* cntx, LSN start_lsn)
108+
ABSL_LOCKS_EXCLUDED(big_value_mu_) {
106109
serializer_ = std::make_unique<RdbSerializer>(compression_mode_);
107110

108111
snapshot_fb_ = fb2::Fiber("incremental_snapshot", [cntx, start_lsn, this] {
@@ -127,8 +130,9 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
127130

128131
journal->UnregisterOnChange(cb_id);
129132
if (!cancel) {
133+
util::fb2::LockGuard guard(big_value_mu_);
130134
serializer_->SendJournalOffset(journal->GetLsn());
131-
PushSerialized(true);
135+
PushSerializedUnderLock(true);
132136
}
133137
}
134138

@@ -164,16 +168,15 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
164168

165169
uint64_t last_yield = 0;
166170
PrimeTable* pt = &db_array_[db_indx]->prime;
167-
current_db_ = db_indx;
168171

169172
VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx;
170173
do {
171174
if (cll->IsCancelled()) {
172175
return;
173176
}
174177

175-
PrimeTable::Cursor next =
176-
pt->TraverseBuckets(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this));
178+
PrimeTable::Cursor next = pt->TraverseBuckets(
179+
cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this, db_indx));
177180
cursor = next;
178181
PushSerialized(false);
179182

@@ -195,8 +198,9 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
195198

196199
CHECK(!serialize_bucket_running_);
197200
if (send_full_sync_cut) {
201+
util::fb2::LockGuard guard(big_value_mu_);
198202
CHECK(!serializer_->SendFullSyncCut());
199-
PushSerialized(true);
203+
PushSerializedUnderLock(true);
200204
}
201205

202206
// serialized + side_saved must be equal to the total saved.
@@ -213,8 +217,11 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
213217

214218
// The replica sends the LSN of the next entry is wants to receive.
215219
while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) {
216-
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
217-
PushSerialized(false);
220+
{
221+
util::fb2::LockGuard guard(big_value_mu_);
222+
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
223+
PushSerializedUnderLock(false);
224+
}
218225
lsn++;
219226
}
220227

@@ -231,7 +238,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
231238
// GetLsn() is always the next lsn that we expect to create.
232239
if (journal->GetLsn() == lsn) {
233240
{
234-
FiberAtomicGuard fg;
241+
util::fb2::LockGuard guard(big_value_mu_);
235242
serializer_->SendFullSyncCut();
236243
}
237244
auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this);
@@ -247,9 +254,7 @@ void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
247254
}
248255
}
249256

250-
bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
251-
std::lock_guard guard(big_value_mu_);
252-
257+
bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it) {
253258
++stats_.savecb_calls;
254259

255260
auto check = [&](auto v) {
@@ -266,7 +271,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
266271
return false;
267272
}
268273

269-
db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it),
274+
db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it),
270275
snapshot_version_);
271276

272277
auto* blocking_counter = db_slice_->BlockingCounter();
@@ -275,7 +280,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
275280
// zero.
276281
std::lock_guard blocking_counter_guard(*blocking_counter);
277282

278-
stats_.loop_serialized += SerializeBucket(current_db_, it);
283+
stats_.loop_serialized += SerializeBucket(db_index, it);
279284

280285
return false;
281286
}
@@ -291,20 +296,19 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
291296
while (!it.is_done()) {
292297
++result;
293298
// might preempt due to big value serialization.
294-
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
299+
SerializeEntry(db_index, it->first, it->second);
295300
++it;
296301
}
297302
serialize_bucket_running_ = false;
298303
return result;
299304
}
300305

301-
void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
302-
optional<uint64_t> expire, RdbSerializer* serializer) {
306+
void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv) {
303307
if (pv.IsExternal() && pv.IsCool())
304-
return SerializeEntry(db_indx, pk, pv.GetCool().record->value, expire, serializer);
308+
return SerializeEntry(db_indx, pk, pv.GetCool().record->value);
305309

306-
time_t expire_time = expire.value_or(0);
307-
if (!expire && pv.HasExpire()) {
310+
time_t expire_time = 0;
311+
if (pv.HasExpire()) {
308312
auto eit = db_array_[db_indx]->expire.Find(pk);
309313
expire_time = db_slice_->ExpireTime(eit);
310314
}
@@ -317,11 +321,15 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
317321
EngineShard::tlocal()->tiered_storage()->Read(
318322
db_indx, pk.ToString(), pv,
319323
[future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); });
320-
delayed_entries_.push_back(
321-
{db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time, mc_flags});
324+
325+
auto key = PrimeKey(pk.ToString());
326+
327+
util::fb2::LockGuard guard(big_value_mu_);
328+
delayed_entries_.push_back({db_indx, std::move(key), std::move(future), expire_time, mc_flags});
322329
++type_freq_map_[RDB_TYPE_STRING];
323330
} else {
324-
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, mc_flags, db_indx);
331+
util::fb2::LockGuard guard(big_value_mu_);
332+
io::Result<uint8_t> res = serializer_->SaveEntry(pk, pv, expire_time, mc_flags, db_indx);
325333
CHECK(res);
326334
++type_freq_map_[*res];
327335
}
@@ -360,6 +368,11 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
360368
}
361369

362370
bool SliceSnapshot::PushSerialized(bool force) {
371+
util::fb2::LockGuard guard(big_value_mu_);
372+
return PushSerializedUnderLock(force);
373+
}
374+
375+
bool SliceSnapshot::PushSerializedUnderLock(bool force) {
363376
if (!force && serializer_->SerializedLen() < kMinBlobSize)
364377
return false;
365378

@@ -383,8 +396,6 @@ bool SliceSnapshot::PushSerialized(bool force) {
383396
}
384397

385398
void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
386-
std::lock_guard guard(big_value_mu_);
387-
388399
PrimeTable* table = db_slice_->GetTables(db_index).first;
389400
const PrimeTable::bucket_iterator* bit = req.update();
390401

@@ -409,9 +420,9 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
409420
// To enable journal flushing to sync after non auto journal command is executed we call
410421
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
411422
// additional journal change to serialize, it simply invokes PushSerialized.
412-
std::lock_guard guard(big_value_mu_);
413423
if (item.opcode != journal::Op::NOOP) {
414-
serializer_->WriteJournalEntry(item.data);
424+
util::fb2::LockGuard guard(big_value_mu_);
425+
CHECK(!serializer_->WriteJournalEntry(item.data));
415426
}
416427

417428
if (await) {
@@ -422,6 +433,8 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
422433
}
423434

424435
size_t SliceSnapshot::GetBufferCapacity() const {
436+
util::fb2::LockGuard guard(big_value_mu_);
437+
425438
if (serializer_ == nullptr) {
426439
return 0;
427440
}
@@ -430,6 +443,8 @@ size_t SliceSnapshot::GetBufferCapacity() const {
430443
}
431444

432445
size_t SliceSnapshot::GetTempBuffersSize() const {
446+
util::fb2::LockGuard guard(big_value_mu_);
447+
433448
if (serializer_ == nullptr) {
434449
return 0;
435450
}

src/server/snapshot.h

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,15 @@ class SliceSnapshot {
6161
enum class SnapshotFlush { kAllow, kDisallow };
6262

6363
void Start(bool stream_journal, const Cancellation* cll,
64-
SnapshotFlush allow_flush = SnapshotFlush::kDisallow);
64+
SnapshotFlush allow_flush = SnapshotFlush::kDisallow)
65+
ABSL_LOCKS_EXCLUDED(big_value_mu_);
6566

6667
// Initialize a snapshot that sends only the missing journal updates
6768
// since start_lsn and then registers a callback switches into the
6869
// journal streaming mode until stopped.
6970
// If we're slower than the buffer and can't continue, `Cancel()` is
7071
// called.
71-
void StartIncremental(Context* cntx, LSN start_lsn);
72+
void StartIncremental(Context* cntx, LSN start_lsn) ABSL_LOCKS_EXCLUDED(big_value_mu_);
7273

7374
// Finalizes journal streaming writes. Only called for replication.
7475
// Blocking. Must be called from the Snapshot thread.
@@ -89,15 +90,14 @@ class SliceSnapshot {
8990
void SwitchIncrementalFb(Context* cntx, LSN lsn);
9091

9192
// Called on traversing cursor by IterateBucketsFb.
92-
bool BucketSaveCb(PrimeTable::bucket_iterator it);
93+
bool BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it);
9394

9495
// Serialize single bucket.
9596
// Returns number of serialized entries, updates bucket version to snapshot version.
9697
unsigned SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator bucket_it);
9798

9899
// Serialize entry into passed serializer.
99-
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv,
100-
std::optional<uint64_t> expire, RdbSerializer* serializer);
100+
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv);
101101

102102
// DbChange listener
103103
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
@@ -109,11 +109,12 @@ class SliceSnapshot {
109109
// Push regardless of buffer size if force is true.
110110
// Return true if pushed. Can block. Is called from the snapshot thread.
111111
bool PushSerialized(bool force);
112+
bool PushSerializedUnderLock(bool force) ABSL_EXCLUSIVE_LOCKS_REQUIRED(big_value_mu_);
112113

113114
// Helper function that flushes the serialized items into the RecordStream.
114115
// Can block.
115116
using FlushState = SerializerBase::FlushState;
116-
size_t FlushSerialized(FlushState flush_state);
117+
size_t FlushSerialized(FlushState flush_state) ABSL_EXCLUSIVE_LOCKS_REQUIRED(big_value_mu_);
117118

118119
public:
119120
uint64_t snapshot_version() const {
@@ -141,24 +142,25 @@ class SliceSnapshot {
141142
};
142143

143144
DbSlice* db_slice_;
144-
DbTableArray db_array_;
145+
const DbTableArray db_array_;
145146

146-
DbIndex current_db_;
147+
std::unique_ptr<RdbSerializer> serializer_ ABSL_GUARDED_BY(big_value_mu_);
147148

148-
std::unique_ptr<RdbSerializer> serializer_;
149-
std::vector<DelayedEntry> delayed_entries_; // collected during atomic bucket traversal
149+
// collected during atomic bucket traversal
150+
std::vector<DelayedEntry> delayed_entries_ ABSL_GUARDED_BY(big_value_mu_);
150151

151152
// Used for sanity checks.
152153
bool serialize_bucket_running_ = false;
153154
util::fb2::Fiber snapshot_fb_; // IterateEntriesFb
154155
util::fb2::CondVarAny seq_cond_;
155-
CompressionMode compression_mode_;
156-
RdbTypeFreqMap type_freq_map_;
156+
const CompressionMode compression_mode_;
157+
RdbTypeFreqMap type_freq_map_ ABSL_GUARDED_BY(big_value_mu_);
157158

158159
// version upper bound for entries that should be saved (not included).
159160
uint64_t snapshot_version_ = 0;
160161
uint32_t journal_cb_id_ = 0;
161162

163+
// TODO: remove or use mutex
162164
uint64_t rec_id_ = 1, last_pushed_id_ = 0;
163165

164166
struct Stats {
@@ -167,9 +169,9 @@ class SliceSnapshot {
167169
size_t side_saved = 0;
168170
size_t savecb_calls = 0;
169171
size_t keys_total = 0;
170-
} stats_;
172+
} stats_; // TODO: maybe need to lock
171173

172-
ThreadLocalMutex big_value_mu_;
174+
mutable ThreadLocalMutex big_value_mu_;
173175

174176
std::function<void(std::string)> on_push_;
175177
std::function<void()> on_snapshot_finish_;

0 commit comments

Comments
 (0)