-
Notifications
You must be signed in to change notification settings - Fork 884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: yield when serialization is in progress #3220
base: main
Are you sure you want to change the base?
Conversation
src/server/snapshot.h
Outdated
@@ -171,6 +171,8 @@ class SliceSnapshot { | |||
size_t savecb_calls = 0; | |||
size_t keys_total = 0; | |||
} stats_; | |||
|
|||
bool bucket_ser_in_progress_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this, it suffices to use the other bool above
src/server/snapshot.cc
Outdated
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); | ||
++it; | ||
} | ||
og.SetVersion(snapshot_version_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not bump the version before we yield because other fibers might check against the version and decide to move on instead of block and cause a bucket split
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is interesting on its own, as:
other fibers might check against the version and decide to move on instead of block and cause a bucket split
If that other fiber is older, it would have been called by now (we call by the order of snapshot versions), and if it's newer it wouldn't matter if the value is snapshot_version_
or smaller (it can't be bigger). Furthermore, we shouldn't really get to the point in parallel in multiple fibers because we have the above (to-be-changed) polling.
Furthermore^2, if you set it to be after, we could actually serialize the same key twice in a rare case in which we call OnDbChange
, block, but the Traverse()
fiber continues and reaches this bucket. This will generate an invalid RDB, which Dragonfly will reject loading.
src/server/snapshot.cc
Outdated
return false; | ||
|
||
while (bucket_ser_in_progress_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will block all operations on the thread, even actions on different buckets. Effectively this means if the bucket is not serialized and there is serialization in progress. block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely do not actively poll. this will consume 100% CPU, causing alerts and such, especially on huge entries...
@chakaz draft so we can discuss :) |
src/server/snapshot.cc
Outdated
// Must be atomic because after we call it.snapshot_version_ we're starting | ||
// to send incremental updates instead of serializing the whole bucket: We must not | ||
// send the update until the initial SerializeBucket is called. | ||
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this | ||
// bucket. | ||
FiberAtomicGuard fg; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the whole point of this PR to remove the atomicity requirement?
I.e. instead of fixing the typo in the comment, you should remove it (as well as the FiberAtomicGuard
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't remove it since we (yet) don't yield. We will yield when we actually start serializing in chunks but I am happy to remove this on this PR -- no objections
src/server/snapshot.cc
Outdated
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); | ||
++it; | ||
} | ||
og.SetVersion(snapshot_version_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is interesting on its own, as:
other fibers might check against the version and decide to move on instead of block and cause a bucket split
If that other fiber is older, it would have been called by now (we call by the order of snapshot versions), and if it's newer it wouldn't matter if the value is snapshot_version_
or smaller (it can't be bigger). Furthermore, we shouldn't really get to the point in parallel in multiple fibers because we have the above (to-be-changed) polling.
Furthermore^2, if you set it to be after, we could actually serialize the same key twice in a rare case in which we call OnDbChange
, block, but the Traverse()
fiber continues and reaches this bucket. This will generate an invalid RDB, which Dragonfly will reject loading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that in addition to snapshot.cc
, we also have streamer.cc
which has some very similar logic.
I would say that it's not worth modifying, as it's way more rarely used (only during slot migrations), but the thing is, it must support blocking as well, as the underlying value serialization will block :|
src/server/snapshot.cc
Outdated
// to send incremental updates instead of serializing the whole bucket: We must not | ||
// send the update until the initial SerializeBucket is called. | ||
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this | ||
// bucket. | ||
FiberAtomicGuard fg; | ||
DCHECK_LT(it.GetVersion(), snapshot_version_); | ||
|
||
bucket_ser_in_progress_ = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this boolean is the same as serialize_bucket_running_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes yes, this was for testing, it's a draft PR I will clean this up -- I wanted to discuss it with Shahar
src/server/snapshot.cc
Outdated
// maybe worth adding it on an event queue? | ||
while (bucket_ser_in_progress_) { | ||
ThisFiber::Yield(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it could be that the bucket was already serialized by the time you run this was sleeping. i.e snapshot version for this bucket was update. you need to check it again after the sleep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch!
src/server/journal/streamer.cc
Outdated
@@ -312,7 +310,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { | |||
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { | |||
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; | |||
|
|||
FiberAtomicGuard fg; | |||
{ std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you put it on its own scope? It will be freed immediately after, which will mean that other OnDbChange
callbacks could run in parallel, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so this is the case. Once it acquires the lock, it will drop it immediately but it won't preempt
. It will preempt
potentially when we call WriteBucket
. All the steps between dropping the lock on line 313 and calling WriteBucket
are atomic, there is no preemption. However, when we call WriteBucket
we will reqacquire the lock on the same fiber before we preempt
. Think of this {}
as transferring ownership of the lock without explicitly doing it (e.g. lk.unlock()
before we call WriteBucket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts on what you wrote:
- Is what you wrote also true for
CVCUponInsert()
? I see that it uses a callback. I think it's true, but you should verify if we're counting on that - I know that "regular" mutexes (i.e. non fiber ones) do not guarantee that, even if the lock is unlocked, it will not preempt to other threads. I don't know if that's true here, but I guess it is. In other words, this may not guarantee a mutex transfer without yielding in between
- I understand that
WriteBucket()
locks this lock, so I don't see any benefit to lock (and immediately release) here as well. Why even do that? - There might be a subtle bug here:
CVCUponInsert()
might callWriteBucket()
multiple times (over multiple buckets). If we lose the mutex to some other operation, which touches that other bucket, it might change the underlying structure causing us to lose some changes
I'd recommend adding a WriteBucketNoLock()
method which will be called here and in WriteBucket()
. Bonus points if you add ABSL_LOCKS_EXCLUDED
and ABSL_LOCKS_REQUIRED
annotations to let the compiler verify we don't create deadlocks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also dont see why we need to lock and release here, as if we lock in WriteBucket this should be enough.
@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we do not need the lock OnDbChange
that was a mistake.
I know that "regular" mutexes (i.e. non fiber ones) do not guarantee that, even if the lock is unlocked, it will not preempt to other threads. I don't know if that's true here, but I guess it is. In other words, this may not guarantee a mutex transfer without yielding in between
Regular mutexes behave differently when notifying other threads to make progress. If this code was running on thread a but wakes up thread b and releases the mutex then it's a race for which thread locks the mutex. This is not true for fibers:
90 void Mutex::unlock() {
91 detail::FiberInterface* active = detail::FiberActive();
92
93 unique_lock lk(wait_queue_splk_);
94 CHECK(owner_ == active);
95 owner_ = nullptr;
96
97 wait_queue_.NotifyOne(active);
98 }
And if you follow NotifyOne
it calls NotifyImpl
which calls ActivateOther
116
117 // Schedules another fiber without switching to it.
118 // other can belong to another thread.
119 void ActivateOther(FiberInterface* other);
120
The gist here is a) That ActivateOther adds the fiber to the ready queue b) that in our case all the interesting fibers (the fiber that runs OnDbChange && IterateFB) that use this mutex run on the same thread
, this is both true for SaveStagesController && FullSyncFb for replication (on RDB version they do push on the same serialization channel which is fine since we don’t (yet until phase 2) split individual slots.
+1 for the rest
@adiholden won't happen, when we release the lock we continue running on the same fiber, we don't switch.
Plz let me know if I missed anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because what we care is that we dont break the bucket into multiple pieces
We do want to break the bucket into multiple pieces, otherwise what's the point ? (edit: we just don't want to allow interleaved bucket serialization per SliceSnapshot)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do want to break the bucket into multiple pieces, otherwise what's the point ?
We want to break entries, not buckets. Breaking on a bucket granularity isn't enough.
Of course the challenge is no to have interleaved entries in the stream.
src/server/journal/streamer.cc
Outdated
@@ -312,7 +310,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { | |||
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { | |||
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; | |||
|
|||
FiberAtomicGuard fg; | |||
{ std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also dont see why we need to lock and release here, as if we lock in WriteBucket this should be enough.
@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right?
@@ -171,6 +172,8 @@ class SliceSnapshot { | |||
size_t savecb_calls = 0; | |||
size_t keys_total = 0; | |||
} stats_; | |||
|
|||
util::fb2::Mutex bucket_ser_mu_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have different mutex one for snapshot and one for cluster, I think this is a problem.
If we have a snapshot created at the same time there is a migration created and we yield in the bucket serialization
I'm afraid of some edge case that will cause us to lose data being sent to replica/RDB file. For example, touching |
db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, | ||
DbSlice::Iterator::FromPrime(it), snapshot_version_); | ||
if (WriteBucket(it)) { | ||
if (WriteBucketNoLock(it)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chakaz do we really need to variants for WriteBucket
(one with lock and one without) ? I think only one suffices but if you think this can be improved I am more than happy to do it
@@ -92,18 +93,20 @@ class SliceSnapshot { | |||
void IterateBucketsFb(const Cancellation* cll, bool send_full_sync_cut); | |||
|
|||
// Called on traversing cursor by IterateBucketsFb. | |||
bool BucketSaveCb(PrimeIterator it); | |||
bool BucketSaveCb(PrimeIterator it) ABSL_LOCKS_EXCLUDED(bucket_ser_mu_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chakaz from my understanding ABSL_LOCKS_EXCLUDED
roughly means that the caller
of this function should not hold the lock bucket_ser_mu_
. Isn't this redundant since util::fb2::lock
actually checks for this 70 CHECK(active != owner_);
?
I think I agree with what you wrote but if this is the case that we have a problem for example the CVCUponBump flow which call the OnDbChange in a loop, so in this case we can preempt between transactions and this will cause a bug |
@kostasrim I believe we can use CondVarAny here instead of the mutex, this will be faster without locks and in most use cases we will not have any preeptions as the values will not be big so we will not need to preempt. |
I think this won't work because once we unclock the mutex we will schedule the fiber for execution (make it active) and if we preempt on the next round it will interleave it bypassing the atomicity the lock provides. This can be fixed with a predicate and a boolean and use wait_until on the condition variable but there is no much gain imo here other than making the code a little bit more verbose |
After we preempt on the first call of the |
No description provided.