-
Notifications
You must be signed in to change notification settings - Fork 920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: yield when serialization is in progress #3220
Changes from 3 commits
96c6aef
babfc79
de9b143
c4cb04b
0da6961
03afdb6
8caf8d2
b657286
bfdbfe9
9f000fb
6c05796
dca21cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that in addition to |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ | |
#include <absl/strings/match.h> | ||
#include <absl/strings/str_cat.h> | ||
|
||
#include <mutex> | ||
|
||
#include "base/logging.h" | ||
#include "core/heap_size.h" | ||
#include "server/db_slice.h" | ||
|
@@ -16,6 +18,7 @@ | |
#include "server/rdb_extensions.h" | ||
#include "server/rdb_save.h" | ||
#include "server/tiered_storage.h" | ||
#include "util/fibers/synchronization.h" | ||
|
||
namespace dfly { | ||
|
||
|
@@ -235,16 +238,27 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn | |
} | ||
|
||
bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { | ||
++stats_.savecb_calls; | ||
|
||
uint64_t v = it.GetVersion(); | ||
if (v >= snapshot_version_) { | ||
// either has been already serialized or added after snapshotting started. | ||
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id() | ||
<< " at " << v; | ||
++stats_.skipped; | ||
return false; | ||
// We need to block if serialization is in progress | ||
{ | ||
std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); | ||
++stats_.savecb_calls; | ||
|
||
auto check = [&](auto v) { | ||
if (v >= snapshot_version_) { | ||
// either has been already serialized or added after snapshotting started. | ||
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id() | ||
<< " at " << v; | ||
++stats_.skipped; | ||
return false; | ||
} | ||
return true; | ||
kostasrim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
|
||
uint64_t v = it.GetVersion(); | ||
if (!check(v)) | ||
return false; | ||
} | ||
|
||
db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am trying to understand how this works as FlushChangeToEarlierCallbacks will also call serialize bucket for snapshot with earlier version and they will be locked on the mutex/BucketSerializationGuard . is that right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not be the case because the callbacks we apply are registered via a different
And the |
||
snapshot_version_); | ||
|
||
|
@@ -253,12 +267,8 @@ bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { | |
} | ||
|
||
unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator it) { | ||
// Must be atomic because after after we call it.snapshot_version_ we're starting | ||
// to send incremental updates instead of serializing the whole bucket: We must not | ||
// send the update until the initial SerializeBucket is called. | ||
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this | ||
// bucket. | ||
FiberAtomicGuard fg; | ||
std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); | ||
|
||
DCHECK_LT(it.GetVersion(), snapshot_version_); | ||
|
||
// traverse physical bucket and write it into string file. | ||
|
@@ -268,6 +278,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite | |
|
||
while (!it.is_done()) { | ||
++result; | ||
// might yield | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you check this changes with a temporary change here (not to be pushed) that calls yield here? |
||
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); | ||
++it; | ||
} | ||
|
@@ -330,10 +341,11 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { | |
} | ||
|
||
void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { | ||
FiberAtomicGuard fg; | ||
{ std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); } | ||
PrimeTable* table = db_slice_->GetTables(db_index).first; | ||
const PrimeTable::bucket_iterator* bit = req.update(); | ||
|
||
if (const PrimeTable::bucket_iterator* bit = req.update()) { | ||
if (bit) { | ||
if (bit->GetVersion() < snapshot_version_) { | ||
stats_.side_saved += SerializeBucket(db_index, *bit); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
#include "server/rdb_save.h" | ||
#include "server/table.h" | ||
#include "util/fibers/future.h" | ||
#include "util/fibers/synchronization.h" | ||
|
||
namespace dfly { | ||
|
||
|
@@ -171,6 +172,8 @@ class SliceSnapshot { | |
size_t savecb_calls = 0; | ||
size_t keys_total = 0; | ||
} stats_; | ||
|
||
util::fb2::Mutex bucket_ser_mu_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you have different mutex one for snapshot and one for cluster, I think this is a problem. |
||
}; | ||
|
||
} // namespace dfly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you put it on its own scope? It will be freed immediately after, which will mean that other
OnDbChange
callbacks could run in parallel, no?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so this is the case. Once it acquires the lock, it will drop it immediately but it won't
preempt
. It willpreempt
potentially when we callWriteBucket
. All the steps between dropping the lock on line 313 and callingWriteBucket
are atomic, there is no preemption. However, when we callWriteBucket
we will reqacquire the lock on the same fiberbefore we preempt
. Think of this{}
as transferring ownership of the lock without explicitly doing it (e.g.lk.unlock()
before we callWriteBucket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts on what you wrote:
CVCUponInsert()
? I see that it uses a callback. I think it's true, but you should verify if we're counting on thatWriteBucket()
locks this lock, so I don't see any benefit to lock (and immediately release) here as well. Why even do that?CVCUponInsert()
might callWriteBucket()
multiple times (over multiple buckets). If we lose the mutex to some other operation, which touches that other bucket, it might change the underlying structure causing us to lose some changesI'd recommend adding a
WriteBucketNoLock()
method which will be called here and inWriteBucket()
. Bonus points if you addABSL_LOCKS_EXCLUDED
andABSL_LOCKS_REQUIRED
annotations to let the compiler verify we don't create deadlocksThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also dont see why we need to lock and release here, as if we lock in WriteBucket this should be enough.
@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chakaz
Yes we do not need the lock
OnDbChange
that was a mistake.Regular mutexes behave differently when notifying other threads to make progress. If this code was running on thread a but wakes up thread b and releases the mutex then it's a race for which thread locks the mutex. This is not true for fibers:
And if you follow
NotifyOne
it callsNotifyImpl
which callsActivateOther
The gist here is a) That ActivateOther adds the fiber to the ready queue b) that in our case all the interesting fibers (the fiber that runs OnDbChange && IterateFB) that use this mutex run on the same
thread
, this is both true for SaveStagesController && FullSyncFb for replication (on RDB version they do push on the same serialization channel which is fine since we don’t (yet until phase 2) split individual slots.+1 for the rest
@adiholden won't happen, when we release the lock we continue running on the same fiber, we don't switch.
Plz let me know if I missed anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adiholden
We do want to break the bucket into multiple pieces, otherwise what's the point ? (edit: we just don't want to allow interleaved bucket serialization per SliceSnapshot)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to break entries, not buckets. Breaking on a bucket granularity isn't enough.
Of course the challenge is no to have interleaved entries in the stream.