Skip to content

fix(snapshot): Refactor the big_value_mu_ mutex #4327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class RdbSerializer : public SerializerBase {
// Must be called in the thread to which `it` belongs.
// Returns the serialized rdb_type or the error.
// expire_ms = 0 means no expiry.
// This function might preempt if flush_fun_ is used.
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms,
uint32_t mc_flags, DbIndex dbid);

Expand Down
111 changes: 63 additions & 48 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ constexpr size_t kMinBlobSize = 32_KB;

SliceSnapshot::SliceSnapshot(CompressionMode compression_mode, DbSlice* slice,
SnapshotDataConsumerInterface* consumer, Context* cntx)
: db_slice_(slice), compression_mode_(compression_mode), consumer_(consumer), cntx_(cntx) {
db_array_ = slice->databases();
: db_slice_(slice),
db_array_(slice->databases()),
compression_mode_(compression_mode),
consumer_(consumer),
cntx_(cntx) {
tl_slice_snapshots.insert(this);
}

Expand Down Expand Up @@ -126,7 +129,9 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {

journal->UnregisterOnChange(cb_id);
if (!cancel) {
serializer_->SendJournalOffset(journal->GetLsn());
CallSerializerUnderLock([&journal](RdbSerializer* serializer) {
serializer->SendJournalOffset(journal->GetLsn());
});
PushSerialized(true);
}
}
Expand Down Expand Up @@ -163,16 +168,15 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {

uint64_t last_yield = 0;
PrimeTable* pt = &db_array_[db_indx]->prime;
current_db_ = db_indx;

VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx;
do {
if (cntx_->IsCancelled()) {
return;
}

PrimeTable::Cursor next =
pt->TraverseBuckets(cursor, [this](auto it) { return BucketSaveCb(it); });
PrimeTable::Cursor next = pt->TraverseBuckets(
cursor, [this, db_indx](auto it) { return BucketSaveCb(db_indx, it); });
cursor = next;
PushSerialized(false);

Expand All @@ -194,7 +198,8 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {

CHECK(!serialize_bucket_running_);
if (send_full_sync_cut) {
CHECK(!serializer_->SendFullSyncCut());
CallSerializerUnderLock(
[](RdbSerializer* serializer) { CHECK(!serializer->SendFullSyncCut()); });
PushSerialized(true);
}

Expand All @@ -212,7 +217,9 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {

// The replica sends the LSN of the next entry is wants to receive.
while (!cntx_->IsCancelled() && journal->IsLSNInBuffer(lsn)) {
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
CallSerializerUnderLock([entry = journal->GetEntry(lsn)](RdbSerializer* serializer) {
serializer->WriteJournalEntry(entry);
});
PushSerialized(false);
lsn++;
}
Expand All @@ -229,10 +236,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {

// GetLsn() is always the next lsn that we expect to create.
if (journal->GetLsn() == lsn) {
{
FiberAtomicGuard fg;
serializer_->SendFullSyncCut();
}
CallSerializerUnderLock([](RdbSerializer* serializer) { serializer->SendFullSyncCut(); });
auto journal_cb = [this](const journal::JournalItem& item, bool await) {
OnJournalEntry(item, await);
};
Expand All @@ -248,9 +252,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
}
}

bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
std::lock_guard guard(big_value_mu_);

bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it) {
++stats_.savecb_calls;

auto check = [&](auto v) {
Expand All @@ -267,7 +269,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
return false;
}

db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it),
db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it),
snapshot_version_);

auto* blocking_counter = db_slice_->BlockingCounter();
Expand All @@ -276,7 +278,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) {
// zero.
std::lock_guard blocking_counter_guard(*blocking_counter);

stats_.loop_serialized += SerializeBucket(current_db_, it);
stats_.loop_serialized += SerializeBucket(db_index, it);

return false;
}
Expand All @@ -292,20 +294,19 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
while (!it.is_done()) {
++result;
// might preempt due to big value serialization.
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
SerializeEntry(db_index, it->first, it->second);
++it;
}
serialize_bucket_running_ = false;
return result;
}

void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
optional<uint64_t> expire, RdbSerializer* serializer) {
void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv) {
if (pv.IsExternal() && pv.IsCool())
return SerializeEntry(db_indx, pk, pv.GetCool().record->value, expire, serializer);
return SerializeEntry(db_indx, pk, pv.GetCool().record->value);

time_t expire_time = expire.value_or(0);
if (!expire && pv.HasExpire()) {
time_t expire_time = 0;
if (pv.HasExpire()) {
auto eit = db_array_[db_indx]->expire.Find(pk);
expire_time = db_slice_->ExpireTime(eit);
}
Expand All @@ -318,18 +319,26 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
EngineShard::tlocal()->tiered_storage()->Read(
db_indx, pk.ToString(), pv,
[future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); });
delayed_entries_.push_back(
{db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time, mc_flags});

auto key = PrimeKey(pk.ToString());

delayed_entries_.push_back({db_indx, std::move(key), std::move(future), expire_time, mc_flags});
++type_freq_map_[RDB_TYPE_STRING];
} else {
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, mc_flags, db_indx);
io::Result<uint8_t> res;
CallSerializerUnderLock([&](RdbSerializer* serializer) {
res = serializer->SaveEntry(pk, pv, expire_time, mc_flags, db_indx);
});
CHECK(res);
++type_freq_map_[*res];
}
}

size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
io::StringFile sfile;

// FlushSerialized is already under lock
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes is not good
This is exactly what I wanted to simplify. I dont want us to think or review different flows to understand if the lock is already taken but instead have the getter protect us.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest revert now all this changes that are for the CallSerializerUnderLock
we can keep the other small changes that you did that does not impact the mutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that FlushSerialized can be invoked from the flush_fn callback passed to the RdbSerializer, which can be called during the serializer->SaveEntry method (where the mutex is already locked).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok therefore I suggest reverting the changes as they do not simplify the code.
I will revisit this later and see if we can improve this

// So no locking is needed
serializer_->FlushToSink(&sfile, flush_state);

size_t serialized = sfile.val.size();
Expand All @@ -339,53 +348,59 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
uint64_t id = rec_id_++;
DVLOG(2) << "Pushing " << id;

fb2::NoOpLock lk;

// We create a critical section here that ensures that records are pushed in sequential order.
// As a result, it is not possible for two fiber producers to push concurrently.
// If A.id = 5, and then B.id = 6, and both are blocked here, it means that last_pushed_id_ < 4.
// Once last_pushed_id_ = 4, A will be unblocked, while B will wait until A finishes pushing and
// update last_pushed_id_ to 5.
seq_cond_.wait(lk, [&] { return id == this->last_pushed_id_ + 1; });
Copy link
Contributor Author

@BagritsevichStepan BagritsevichStepan Dec 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this because FlushSerialized is called when big_value_mu_ is locked

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you are right here, but this is not the change I wanted by this PR.
I wanted the mutex to protect the serializer and only it.
So we will have a getter for the serializer when trying to access it and once done we release the mutex.
So in this point the mutex will not be taken


// Blocking point.
consumer_->ConsumeData(std::move(sfile.val), cntx_);

DCHECK_EQ(last_pushed_id_ + 1, id);
last_pushed_id_ = id;
seq_cond_.notify_all();

VLOG(2) << "Pushed with Serialize() " << serialized;

return serialized;
}

template <typename Callback> void SliceSnapshot::CallSerializerUnderLock(Callback cb) {
util::fb2::LockGuard guard(big_value_mu_);
cb(serializer_.get());
}

bool SliceSnapshot::PushSerialized(bool force) {
if (!force && serializer_->SerializedLen() < kMinBlobSize)
return false;
if (!force) {
size_t len = 0;
CallSerializerUnderLock(
[&len](RdbSerializer* serializer) { len = serializer->SerializedLen(); });
if (len < kMinBlobSize) {
return false;
}
}

// Flush any of the leftovers to avoid interleavings
size_t serialized = FlushSerialized(FlushState::kFlushEndEntry);
size_t serialized = 0;
{
util::fb2::LockGuard guard(big_value_mu_);
FlushSerialized(FlushState::kFlushEndEntry);
}

if (!delayed_entries_.empty()) {
// Async bucket serialization might have accumulated some delayed values.
// Because we can finally block in this function, we'll await and serialize them
do {
/* We can preempt on SaveEntry so first we need to change all data before
* calling it */
auto& entry = delayed_entries_.back();
serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid,
entry.mc_flags);
delayed_entries_.pop_back();

CallSerializerUnderLock([&entry](RdbSerializer* serializer) {
serializer->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid,
entry.mc_flags);
});
} while (!delayed_entries_.empty());

// blocking point.
util::fb2::LockGuard guard(big_value_mu_);
serialized += FlushSerialized(FlushState::kFlushEndEntry);
}
return serialized > 0;
}

void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
std::lock_guard guard(big_value_mu_);

PrimeTable* table = db_slice_->GetTables(db_index).first;
const PrimeTable::bucket_iterator* bit = req.update();

Expand All @@ -410,9 +425,9 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
// To enable journal flushing to sync after non auto journal command is executed we call
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
// additional journal change to serialize, it simply invokes PushSerialized.
std::lock_guard guard(big_value_mu_);
if (item.opcode != journal::Op::NOOP) {
serializer_->WriteJournalEntry(item.data);
CallSerializerUnderLock(
[&item](RdbSerializer* serializer) { CHECK(!serializer->WriteJournalEntry(item.data)); });
}

if (await) {
Expand Down
27 changes: 16 additions & 11 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ class SliceSnapshot {
void SwitchIncrementalFb(LSN lsn);

// Called on traversing cursor by IterateBucketsFb.
bool BucketSaveCb(PrimeTable::bucket_iterator it);
bool BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it);

// Serialize single bucket.
// Returns number of serialized entries, updates bucket version to snapshot version.
unsigned SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator bucket_it);

// Serialize entry into passed serializer.
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv,
std::optional<uint64_t> expire, RdbSerializer* serializer);
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv);

// DbChange listener
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
Expand All @@ -119,11 +118,16 @@ class SliceSnapshot {
// Return true if pushed. Can block. Is called from the snapshot thread.
bool PushSerialized(bool force);

using FlushState = SerializerBase::FlushState;

// Helper function that flushes the serialized items into the RecordStream.
// Can block.
using FlushState = SerializerBase::FlushState;
// Lock big_value_mu_ before calling it
size_t FlushSerialized(FlushState flush_state);

// Calls the provided callback with the serializer under a lock.
template <typename Callback> void CallSerializerUnderLock(Callback cb);

public:
uint64_t snapshot_version() const {
return snapshot_version_;
Expand All @@ -150,25 +154,26 @@ class SliceSnapshot {
};

DbSlice* db_slice_;
DbTableArray db_array_;

DbIndex current_db_;
const DbTableArray db_array_;

// Guarded by big_value_mu_
std::unique_ptr<RdbSerializer> serializer_;
std::vector<DelayedEntry> delayed_entries_; // collected during atomic bucket traversal

// collected during atomic bucket traversal
std::vector<DelayedEntry> delayed_entries_;

// Used for sanity checks.
bool serialize_bucket_running_ = false;
util::fb2::Fiber snapshot_fb_; // IterateEntriesFb
util::fb2::CondVarAny seq_cond_;
CompressionMode compression_mode_;
const CompressionMode compression_mode_;
RdbTypeFreqMap type_freq_map_;

// version upper bound for entries that should be saved (not included).
uint64_t snapshot_version_ = 0;
uint32_t journal_cb_id_ = 0;

uint64_t rec_id_ = 1, last_pushed_id_ = 0;
uint64_t rec_id_ = 1;

struct Stats {
size_t loop_serialized = 0;
Expand All @@ -178,7 +183,7 @@ class SliceSnapshot {
size_t keys_total = 0;
} stats_;

ThreadLocalMutex big_value_mu_;
mutable ThreadLocalMutex big_value_mu_;

SnapshotDataConsumerInterface* consumer_;
Context* cntx_;
Expand Down
Loading