-
Notifications
You must be signed in to change notification settings - Fork 1k
fix(snapshot): Refactor the big_value_mu_ mutex #4327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,8 +38,11 @@ constexpr size_t kMinBlobSize = 32_KB; | |
|
||
SliceSnapshot::SliceSnapshot(CompressionMode compression_mode, DbSlice* slice, | ||
SnapshotDataConsumerInterface* consumer, Context* cntx) | ||
: db_slice_(slice), compression_mode_(compression_mode), consumer_(consumer), cntx_(cntx) { | ||
db_array_ = slice->databases(); | ||
: db_slice_(slice), | ||
db_array_(slice->databases()), | ||
compression_mode_(compression_mode), | ||
consumer_(consumer), | ||
cntx_(cntx) { | ||
tl_slice_snapshots.insert(this); | ||
} | ||
|
||
|
@@ -126,7 +129,9 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) { | |
|
||
journal->UnregisterOnChange(cb_id); | ||
if (!cancel) { | ||
serializer_->SendJournalOffset(journal->GetLsn()); | ||
CallSerializerUnderLock([&journal](RdbSerializer* serializer) { | ||
serializer->SendJournalOffset(journal->GetLsn()); | ||
}); | ||
PushSerialized(true); | ||
} | ||
} | ||
|
@@ -163,16 +168,15 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { | |
|
||
uint64_t last_yield = 0; | ||
PrimeTable* pt = &db_array_[db_indx]->prime; | ||
current_db_ = db_indx; | ||
|
||
VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx; | ||
do { | ||
if (cntx_->IsCancelled()) { | ||
return; | ||
} | ||
|
||
PrimeTable::Cursor next = | ||
pt->TraverseBuckets(cursor, [this](auto it) { return BucketSaveCb(it); }); | ||
PrimeTable::Cursor next = pt->TraverseBuckets( | ||
cursor, [this, db_indx](auto it) { return BucketSaveCb(db_indx, it); }); | ||
cursor = next; | ||
PushSerialized(false); | ||
|
||
|
@@ -194,7 +198,8 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { | |
|
||
CHECK(!serialize_bucket_running_); | ||
if (send_full_sync_cut) { | ||
CHECK(!serializer_->SendFullSyncCut()); | ||
CallSerializerUnderLock( | ||
[](RdbSerializer* serializer) { CHECK(!serializer->SendFullSyncCut()); }); | ||
PushSerialized(true); | ||
} | ||
|
||
|
@@ -212,7 +217,9 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) { | |
|
||
// The replica sends the LSN of the next entry is wants to receive. | ||
while (!cntx_->IsCancelled() && journal->IsLSNInBuffer(lsn)) { | ||
serializer_->WriteJournalEntry(journal->GetEntry(lsn)); | ||
CallSerializerUnderLock([entry = journal->GetEntry(lsn)](RdbSerializer* serializer) { | ||
serializer->WriteJournalEntry(entry); | ||
}); | ||
PushSerialized(false); | ||
lsn++; | ||
} | ||
|
@@ -229,10 +236,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) { | |
|
||
// GetLsn() is always the next lsn that we expect to create. | ||
if (journal->GetLsn() == lsn) { | ||
{ | ||
FiberAtomicGuard fg; | ||
serializer_->SendFullSyncCut(); | ||
} | ||
CallSerializerUnderLock([](RdbSerializer* serializer) { serializer->SendFullSyncCut(); }); | ||
auto journal_cb = [this](const journal::JournalItem& item, bool await) { | ||
OnJournalEntry(item, await); | ||
}; | ||
|
@@ -248,9 +252,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) { | |
} | ||
} | ||
|
||
bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { | ||
std::lock_guard guard(big_value_mu_); | ||
|
||
bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator it) { | ||
++stats_.savecb_calls; | ||
|
||
auto check = [&](auto v) { | ||
|
@@ -267,7 +269,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { | |
return false; | ||
} | ||
|
||
db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it), | ||
db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it), | ||
snapshot_version_); | ||
|
||
auto* blocking_counter = db_slice_->BlockingCounter(); | ||
|
@@ -276,7 +278,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeTable::bucket_iterator it) { | |
// zero. | ||
std::lock_guard blocking_counter_guard(*blocking_counter); | ||
|
||
stats_.loop_serialized += SerializeBucket(current_db_, it); | ||
stats_.loop_serialized += SerializeBucket(db_index, it); | ||
|
||
return false; | ||
} | ||
|
@@ -292,20 +294,19 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite | |
while (!it.is_done()) { | ||
++result; | ||
// might preempt due to big value serialization. | ||
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); | ||
SerializeEntry(db_index, it->first, it->second); | ||
++it; | ||
} | ||
serialize_bucket_running_ = false; | ||
return result; | ||
} | ||
|
||
void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv, | ||
optional<uint64_t> expire, RdbSerializer* serializer) { | ||
void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv) { | ||
if (pv.IsExternal() && pv.IsCool()) | ||
return SerializeEntry(db_indx, pk, pv.GetCool().record->value, expire, serializer); | ||
return SerializeEntry(db_indx, pk, pv.GetCool().record->value); | ||
|
||
time_t expire_time = expire.value_or(0); | ||
if (!expire && pv.HasExpire()) { | ||
time_t expire_time = 0; | ||
if (pv.HasExpire()) { | ||
auto eit = db_array_[db_indx]->expire.Find(pk); | ||
expire_time = db_slice_->ExpireTime(eit); | ||
} | ||
|
@@ -318,18 +319,26 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr | |
EngineShard::tlocal()->tiered_storage()->Read( | ||
db_indx, pk.ToString(), pv, | ||
[future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); }); | ||
delayed_entries_.push_back( | ||
{db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time, mc_flags}); | ||
|
||
auto key = PrimeKey(pk.ToString()); | ||
|
||
delayed_entries_.push_back({db_indx, std::move(key), std::move(future), expire_time, mc_flags}); | ||
++type_freq_map_[RDB_TYPE_STRING]; | ||
} else { | ||
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, mc_flags, db_indx); | ||
io::Result<uint8_t> res; | ||
CallSerializerUnderLock([&](RdbSerializer* serializer) { | ||
res = serializer->SaveEntry(pk, pv, expire_time, mc_flags, db_indx); | ||
}); | ||
CHECK(res); | ||
++type_freq_map_[*res]; | ||
} | ||
} | ||
|
||
size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) { | ||
io::StringFile sfile; | ||
|
||
// FlushSerialized is already under lock | ||
// So no locking is needed | ||
serializer_->FlushToSink(&sfile, flush_state); | ||
|
||
size_t serialized = sfile.val.size(); | ||
|
@@ -339,53 +348,59 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) { | |
uint64_t id = rec_id_++; | ||
DVLOG(2) << "Pushing " << id; | ||
|
||
fb2::NoOpLock lk; | ||
|
||
// We create a critical section here that ensures that records are pushed in sequential order. | ||
// As a result, it is not possible for two fiber producers to push concurrently. | ||
// If A.id = 5, and then B.id = 6, and both are blocked here, it means that last_pushed_id_ < 4. | ||
// Once last_pushed_id_ = 4, A will be unblocked, while B will wait until A finishes pushing and | ||
// update last_pushed_id_ to 5. | ||
seq_cond_.wait(lk, [&] { return id == this->last_pushed_id_ + 1; }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't need this because FlushSerialized is called when big_value_mu_ is locked There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe you are right here, but this is not the change I wanted by this PR. |
||
|
||
// Blocking point. | ||
consumer_->ConsumeData(std::move(sfile.val), cntx_); | ||
|
||
DCHECK_EQ(last_pushed_id_ + 1, id); | ||
last_pushed_id_ = id; | ||
seq_cond_.notify_all(); | ||
|
||
VLOG(2) << "Pushed with Serialize() " << serialized; | ||
|
||
return serialized; | ||
} | ||
|
||
template <typename Callback> void SliceSnapshot::CallSerializerUnderLock(Callback cb) { | ||
util::fb2::LockGuard guard(big_value_mu_); | ||
cb(serializer_.get()); | ||
} | ||
|
||
bool SliceSnapshot::PushSerialized(bool force) { | ||
if (!force && serializer_->SerializedLen() < kMinBlobSize) | ||
return false; | ||
if (!force) { | ||
size_t len = 0; | ||
CallSerializerUnderLock( | ||
[&len](RdbSerializer* serializer) { len = serializer->SerializedLen(); }); | ||
if (len < kMinBlobSize) { | ||
return false; | ||
} | ||
} | ||
|
||
// Flush any of the leftovers to avoid interleavings | ||
size_t serialized = FlushSerialized(FlushState::kFlushEndEntry); | ||
size_t serialized = 0; | ||
{ | ||
util::fb2::LockGuard guard(big_value_mu_); | ||
FlushSerialized(FlushState::kFlushEndEntry); | ||
} | ||
|
||
if (!delayed_entries_.empty()) { | ||
// Async bucket serialization might have accumulated some delayed values. | ||
// Because we can finally block in this function, we'll await and serialize them | ||
do { | ||
/* We can preempt on SaveEntry so first we need to change all data before | ||
* calling it */ | ||
auto& entry = delayed_entries_.back(); | ||
serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid, | ||
entry.mc_flags); | ||
delayed_entries_.pop_back(); | ||
|
||
CallSerializerUnderLock([&entry](RdbSerializer* serializer) { | ||
serializer->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid, | ||
entry.mc_flags); | ||
}); | ||
} while (!delayed_entries_.empty()); | ||
|
||
// blocking point. | ||
util::fb2::LockGuard guard(big_value_mu_); | ||
serialized += FlushSerialized(FlushState::kFlushEndEntry); | ||
} | ||
return serialized > 0; | ||
} | ||
|
||
void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { | ||
std::lock_guard guard(big_value_mu_); | ||
|
||
PrimeTable* table = db_slice_->GetTables(db_index).first; | ||
const PrimeTable::bucket_iterator* bit = req.update(); | ||
|
||
|
@@ -410,9 +425,9 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) | |
// To enable journal flushing to sync after non auto journal command is executed we call | ||
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no | ||
// additional journal change to serialize, it simply invokes PushSerialized. | ||
std::lock_guard guard(big_value_mu_); | ||
if (item.opcode != journal::Op::NOOP) { | ||
serializer_->WriteJournalEntry(item.data); | ||
CallSerializerUnderLock( | ||
[&item](RdbSerializer* serializer) { CHECK(!serializer->WriteJournalEntry(item.data)); }); | ||
} | ||
|
||
if (await) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes is not good
This is exactly what I wanted to simplify. I dont want us to think or review different flows to understand if the lock is already taken but instead have the getter protect us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest revert now all this changes that are for the CallSerializerUnderLock
we can keep the other small changes that you did that does not impact the mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is that
FlushSerialized
can be invoked from theflush_fn
callback passed to theRdbSerializer
, which can be called during theserializer->SaveEntry
method (where the mutex is already locked).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok therefore I suggest reverting the changes as they do not simplify the code.
I will revisit this later and see if we can improve this