-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
buckets: PathsPaths: avoid bucket race #522
Conversation
Signed-off-by: Sander Pick <[email protected]>
if ok { | ||
log.Debugf("pinBlock: storage: %d used, %d available, %d delta", | ||
owner.StorageUsed, owner.StorageAvailable, owner.StorageDelta) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some logging around available storage that I'll use in a follow up PR addressing that issue.
@@ -1950,6 +1958,7 @@ func (s *Service) PushPaths(server pb.APIService_PushPathsServer) error { | |||
if buckRoot != "" && buckRoot != buck.Path { | |||
return status.Error(codes.FailedPrecondition, buckets.ErrNonFastForward.Error()) | |||
} | |||
verifyBuck := buck.Copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy the bucket instance for use in the queue. When a file is added to the queue, we just need to check if the requestor is allowed to make the change. We can re-add the changes serially when the file is done being added to IPFS, i.e., we get a result in addedCh
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, so making a copy to avoid concurrent access.
Maybe it would be a good idea to later rename this variable to "readOnlyBuck" or something that can hint a future development that it shouldn't use this copy for anything but reading. Now that in the same scope are two buckets (one copied and one the real one to save), it might be error-prone to do new changes in the wrong one.
} | ||
return err | ||
} | ||
|
||
for { | ||
select { | ||
case res := <-addedCh: | ||
fn, err := s.IPFSClient.ResolveNode(ctx, res.resolved) | ||
ctxLock.RLock() | ||
newCtx := ctx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensuring ctx
is not accessed anywhere during processing of res
.
buck.SetMetadataAtPath(res.path, tdb.Metadata{ | ||
UpdatedAt: buck.UpdatedAt, | ||
}) | ||
buck.UnsetMetadataWithPrefix(res.path + "/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since processing of res
is handled serially, we can safely alter the bucket right after the UnixFS DAG is altered.
} | ||
|
||
c.MinerIndex, err = mi.NewClient(minerIndexTarget, mindexOpts...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was causing the client to be configured with grpc.WithTransportCredentials
and grpc.WithInsecure
in the case of a local hub. Since the miner index is not authenticated, there's no need grab the grpc.WithPerRPCCredentials
from the hub client options.
Signed-off-by: Sander Pick <[email protected]>
} else if cus.GracePeriodEnd == 0 { | ||
// Customer has not started grace period, but there's no way to start the grace period _after_ | ||
// a successful request, so we use grace quota here and let the post usage handler start the | ||
// grace period in the event the request exceeds the free quota | ||
owner.StorageAvailable = cus.DailyUsage["stored_data"].Grace |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use grace quota before grace starts... see comments for more
@@ -1950,6 +1958,7 @@ func (s *Service) PushPaths(server pb.APIService_PushPathsServer) error { | |||
if buckRoot != "" && buckRoot != buck.Path { | |||
return status.Error(codes.FailedPrecondition, buckets.ErrNonFastForward.Error()) | |||
} | |||
verifyBuck := buck.Copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, so making a copy to avoid concurrent access.
Maybe it would be a good idea to later rename this variable to "readOnlyBuck" or something that can hint a future development that it shouldn't use this copy for anything but reading. Now that in the same scope are two buckets (one copied and one the real one to save), it might be error-prone to do new changes in the wrong one.
Signed-off-by: Sander Pick <[email protected]>
Signed-off-by: Sander Pick <[email protected]>
PushPaths
handler when accessingbuck
buck
CLI Miner Index client gRPC options whenapi
is local