Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: yield when serialization is in progress #3220

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

kostasrim
Copy link
Contributor

No description provided.

@kostasrim kostasrim self-assigned this Jun 26, 2024
@@ -171,6 +171,8 @@ class SliceSnapshot {
size_t savecb_calls = 0;
size_t keys_total = 0;
} stats_;

bool bucket_ser_in_progress_ = false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this, it suffices to use the other bool above

src/server/snapshot.cc Outdated Show resolved Hide resolved
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
++it;
}
og.SetVersion(snapshot_version_);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not bump the version before we yield because other fibers might check against the version and decide to move on instead of block and cause a bucket split

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is interesting on its own, as:

other fibers might check against the version and decide to move on instead of block and cause a bucket split

If that other fiber is older, it would have been called by now (we call by the order of snapshot versions), and if it's newer it wouldn't matter if the value is snapshot_version_ or smaller (it can't be bigger). Furthermore, we shouldn't really get to the point in parallel in multiple fibers because we have the above (to-be-changed) polling.

Furthermore^2, if you set it to be after, we could actually serialize the same key twice in a rare case in which we call OnDbChange, block, but the Traverse() fiber continues and reaches this bucket. This will generate an invalid RDB, which Dragonfly will reject loading.

return false;

while (bucket_ser_in_progress_) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block all operations on the thread, even actions on different buckets. Effectively this means if the bucket is not serialized and there is serialization in progress. block

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely do not actively poll. this will consume 100% CPU, causing alerts and such, especially on huge entries...

@kostasrim kostasrim requested a review from chakaz June 26, 2024 07:09
@kostasrim
Copy link
Contributor Author

@chakaz draft so we can discuss :)

src/server/snapshot.cc Outdated Show resolved Hide resolved
Comment on lines 269 to 274
// Must be atomic because after we call it.snapshot_version_ we're starting
// to send incremental updates instead of serializing the whole bucket: We must not
// send the update until the initial SerializeBucket is called.
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this
// bucket.
FiberAtomicGuard fg;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the whole point of this PR to remove the atomicity requirement?
I.e. instead of fixing the typo in the comment, you should remove it (as well as the FiberAtomicGuard)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't remove it since we (yet) don't yield. We will yield when we actually start serializing in chunks but I am happy to remove this on this PR -- no objections

src/server/snapshot.cc Outdated Show resolved Hide resolved
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
++it;
}
og.SetVersion(snapshot_version_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is interesting on its own, as:

other fibers might check against the version and decide to move on instead of block and cause a bucket split

If that other fiber is older, it would have been called by now (we call by the order of snapshot versions), and if it's newer it wouldn't matter if the value is snapshot_version_ or smaller (it can't be bigger). Furthermore, we shouldn't really get to the point in parallel in multiple fibers because we have the above (to-be-changed) polling.

Furthermore^2, if you set it to be after, we could actually serialize the same key twice in a rare case in which we call OnDbChange, block, but the Traverse() fiber continues and reaches this bucket. This will generate an invalid RDB, which Dragonfly will reject loading.

src/server/snapshot.cc Outdated Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in addition to snapshot.cc, we also have streamer.cc which has some very similar logic.
I would say that it's not worth modifying, as it's way more rarely used (only during slot migrations), but the thing is, it must support blocking as well, as the underlying value serialization will block :|

// to send incremental updates instead of serializing the whole bucket: We must not
// send the update until the initial SerializeBucket is called.
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this
// bucket.
FiberAtomicGuard fg;
DCHECK_LT(it.GetVersion(), snapshot_version_);

bucket_ser_in_progress_ = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this boolean is the same as serialize_bucket_running_

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes yes, this was for testing, it's a draft PR I will clean this up -- I wanted to discuss it with Shahar

// maybe worth adding it on an event queue?
while (bucket_ser_in_progress_) {
ThisFiber::Yield();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could be that the bucket was already serialized by the time you run this was sleeping. i.e snapshot version for this bucket was update. you need to check it again after the sleep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch!

@kostasrim kostasrim marked this pull request as ready for review June 28, 2024 10:20
@@ -312,7 +310,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0";

FiberAtomicGuard fg;
{ std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you put it on its own scope? It will be freed immediately after, which will mean that other OnDbChange callbacks could run in parallel, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so this is the case. Once it acquires the lock, it will drop it immediately but it won't preempt. It will preempt potentially when we call WriteBucket. All the steps between dropping the lock on line 313 and calling WriteBucket are atomic, there is no preemption. However, when we call WriteBucket we will reqacquire the lock on the same fiber before we preempt. Think of this {} as transferring ownership of the lock without explicitly doing it (e.g. lk.unlock() before we call WriteBucket

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts on what you wrote:

  • Is what you wrote also true for CVCUponInsert()? I see that it uses a callback. I think it's true, but you should verify if we're counting on that
  • I know that "regular" mutexes (i.e. non fiber ones) do not guarantee that, even if the lock is unlocked, it will not preempt to other threads. I don't know if that's true here, but I guess it is. In other words, this may not guarantee a mutex transfer without yielding in between
  • I understand that WriteBucket() locks this lock, so I don't see any benefit to lock (and immediately release) here as well. Why even do that?
  • There might be a subtle bug here: CVCUponInsert() might call WriteBucket() multiple times (over multiple buckets). If we lose the mutex to some other operation, which touches that other bucket, it might change the underlying structure causing us to lose some changes

I'd recommend adding a WriteBucketNoLock() method which will be called here and in WriteBucket(). Bonus points if you add ABSL_LOCKS_EXCLUDED and ABSL_LOCKS_REQUIRED annotations to let the compiler verify we don't create deadlocks

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also dont see why we need to lock and release here, as if we lock in WriteBucket this should be enough.
@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chakaz

Yes we do not need the lock OnDbChange that was a mistake.

I know that "regular" mutexes (i.e. non fiber ones) do not guarantee that, even if the lock is unlocked, it will not preempt to other threads. I don't know if that's true here, but I guess it is. In other words, this may not guarantee a mutex transfer without yielding in between

Regular mutexes behave differently when notifying other threads to make progress. If this code was running on thread a but wakes up thread b and releases the mutex then it's a race for which thread locks the mutex. This is not true for fibers:

   90 void Mutex::unlock() {                                                                             
   91   detail::FiberInterface* active = detail::FiberActive();                                          
   92                                                                                                    
   93   unique_lock lk(wait_queue_splk_);                                                                
   94   CHECK(owner_ == active);                                                                         
   95   owner_ = nullptr;                                                                                
   96                                                                                                    
   97   wait_queue_.NotifyOne(active);                                                                   
   98 }  

And if you follow NotifyOne it calls NotifyImpl which calls ActivateOther

  116   
  117   // Schedules another fiber without switching to it.                                              
  118   // other can belong to another thread.                                                           
  119   void ActivateOther(FiberInterface* other);
  120   

The gist here is a) That ActivateOther adds the fiber to the ready queue b) that in our case all the interesting fibers (the fiber that runs OnDbChange && IterateFB) that use this mutex run on the same thread, this is both true for SaveStagesController && FullSyncFb for replication (on RDB version they do push on the same serialization channel which is fine since we don’t (yet until phase 2) split individual slots.

+1 for the rest

@adiholden won't happen, when we release the lock we continue running on the same fiber, we don't switch.

Plz let me know if I missed anything

Copy link
Contributor Author

@kostasrim kostasrim Jun 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adiholden

because what we care is that we dont break the bucket into multiple pieces

We do want to break the bucket into multiple pieces, otherwise what's the point ? (edit: we just don't want to allow interleaved bucket serialization per SliceSnapshot)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do want to break the bucket into multiple pieces, otherwise what's the point ?

We want to break entries, not buckets. Breaking on a bucket granularity isn't enough.
Of course the challenge is no to have interleaved entries in the stream.

@kostasrim kostasrim requested a review from chakaz June 30, 2024 07:44
@@ -312,7 +310,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0";

FiberAtomicGuard fg;
{ std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also dont see why we need to lock and release here, as if we lock in WriteBucket this should be enough.
@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right?

@@ -171,6 +172,8 @@ class SliceSnapshot {
size_t savecb_calls = 0;
size_t keys_total = 0;
} stats_;

util::fb2::Mutex bucket_ser_mu_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have different mutex one for snapshot and one for cluster, I think this is a problem.
If we have a snapshot created at the same time there is a migration created and we yield in the bucket serialization

@chakaz
Copy link
Collaborator

chakaz commented Jul 1, 2024

@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right?

I'm afraid of some edge case that will cause us to lose data being sent to replica/RDB file. For example, touching k1 triggers OnDbChange and also causes a bucket/segment split. We start serializing the bucket, and yield to k2 (same bucket/segment) which splits the bucket and moves keys around, and when we return to k1's OnDbChange we missed stuff.
Also, when do we set the bucket version? If before we start serializing the bucket in OnDbChange, then a parallel OnDbChange on another key in the same bucket will be missed, but if after then we will process the bucket twice (which is also not good because we can't save the same key twice, as it produces corrupted RDB files).
I think that locking the entire operation instead of individual keys can solve these (and potentially other more complex issues we didn't think about), and really there's no price to pay for it

db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);
if (WriteBucket(it)) {
if (WriteBucketNoLock(it)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chakaz do we really need to variants for WriteBucket (one with lock and one without) ? I think only one suffices but if you think this can be improved I am more than happy to do it

@@ -92,18 +93,20 @@ class SliceSnapshot {
void IterateBucketsFb(const Cancellation* cll, bool send_full_sync_cut);

// Called on traversing cursor by IterateBucketsFb.
bool BucketSaveCb(PrimeIterator it);
bool BucketSaveCb(PrimeIterator it) ABSL_LOCKS_EXCLUDED(bucket_ser_mu_);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chakaz from my understanding ABSL_LOCKS_EXCLUDED roughly means that the caller of this function should not hold the lock bucket_ser_mu_. Isn't this redundant since util::fb2::lock actually checks for this 70 CHECK(active != owner_); ?

@kostasrim kostasrim requested a review from adiholden July 1, 2024 09:07
@adiholden
Copy link
Collaborator

@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right?

I'm afraid of some edge case that will cause us to lose data being sent to replica/RDB file. For example, touching k1 triggers OnDbChange and also causes a bucket/segment split. We start serializing the bucket, and yield to k2 (same bucket/segment) which splits the bucket and moves keys around, and when we return to k1's OnDbChange we missed stuff. Also, when do we set the bucket version? If before we start serializing the bucket in OnDbChange, then a parallel OnDbChange on another key in the same bucket will be missed, but if after then we will process the bucket twice (which is also not good because we can't save the same key twice, as it produces corrupted RDB files). I think that locking the entire operation instead of individual keys can solve these (and potentially other more complex issues we didn't think about), and really there's no price to pay for it

I think I agree with what you wrote but if this is the case that we have a problem for example the CVCUponBump flow which call the OnDbChange in a loop, so in this case we can preempt between transactions and this will cause a bug

@adiholden
Copy link
Collaborator

@kostasrim I believe we can use CondVarAny here instead of the mutex, this will be faster without locks and in most use cases we will not have any preeptions as the values will not be big so we will not need to preempt.
Take an example with util::fb2::CondVarAny pipeline_cnd; in our code

@kostasrim
Copy link
Contributor Author

@kostasrim I believe we can use CondVarAny here instead of the mutex, this will be faster without locks and in most use cases we will not have any preeptions as the values will not be big so we will not need to preempt. Take an example with util::fb2::CondVarAny pipeline_cnd; in our code

I think this won't work because once we unclock the mutex we will schedule the fiber for execution (make it active) and if we preempt on the next round it will interleave it bypassing the atomicity the lock provides. This can be fixed with a predicate and a boolean and use wait_until on the condition variable but there is no much gain imo here other than making the code a little bit more verbose

@kostasrim
Copy link
Contributor Author

kostasrim commented Jul 1, 2024

@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right

I'm afraid of some edge case that will cause us to lose data being sent to replica/RDB file. For example, touching k1 triggers OnDbChange and also causes a bucket/segment split. We start serializing the bucket, and yield to k2 (same bucket/segment) which splits the bucket and moves keys around, and when we return to k1's OnDbChange we missed stuff. Also, when do we set the bucket version? If before we start serializing the bucket in OnDbChange, then a parallel OnDbChange on another key in the same bucket will be missed, but if after then we will process the bucket twice (which is also not good because we can't save the same key twice, as it produces corrupted RDB files). I think that locking the entire operation instead of individual keys can solve these (and potentially other more complex issues we didn't think about), and really there's no price to pay for it

I think I agree with what you wrote but if this is the case that we have a problem for example the CVCUponBump flow which call the OnDbChange in a loop, so in this case we can preempt between transactions and this will cause a bug

After we preempt on the first call of the cb and unlock the mutex it will schedule the fiber for execution but won't switch to it so it will call the next cb. If that cb preempts the next fiber (which is the scheduled above) will try to lock the mutex and fail (it got notified but that doesn't mean it can lock the mutex so it has to try and if it fails it need to fall back and wait again for it be notified) so each cb will be applied in order without interleavings

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants