Skip to content

Commit

Permalink
putObjectReadAt: Sparse upload support for ReadAt.
Browse files Browse the repository at this point in the history
  • Loading branch information
Harshavardhana committed Jan 14, 2016
1 parent af6786c commit 60e4cd8
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 34 deletions.
8 changes: 5 additions & 3 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
}

// Verify if totalPartsCount is not equal to total list of parts.
if totalPartsCount != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts))
if size > 0 {
// Verify if totalPartsCount is not equal to total list of parts.
if totalPartsCount != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts))
}
}

// Sort all completed parts.
Expand Down
88 changes: 60 additions & 28 deletions api-put-object-readat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,50 @@ import (
"sort"
)

// missing Parts info container.
type missingPart struct {
readAtOffset int64
missingPartSize int64
}

// getMissingPartsInfo missing parts info map.
func (c Client) getMissingPartsInfo(uploadedParts map[int]objectPart, size int64, totalPartsCount int, partSize int64, lastPartSize int64) (missingPartsInfo map[int]missingPart) {
missingPartsInfo = make(map[int]missingPart)
partNumber := 1
for partNumber <= totalPartsCount {
uploadedPart, ok := uploadedParts[partNumber]
if !ok {
var missingPrt missingPart
if partNumber == totalPartsCount {
missingPrt.readAtOffset = (size - lastPartSize)
missingPrt.missingPartSize = lastPartSize
} else {
missingPrt.readAtOffset = int64(partNumber-1) * partSize
missingPrt.missingPartSize = partSize
}
missingPartsInfo[partNumber] = missingPrt
} else {
if uploadedPart.PartNumber == totalPartsCount {
if uploadedPart.Size != lastPartSize {
missingPartsInfo[uploadedPart.PartNumber] = missingPart{
readAtOffset: (size - lastPartSize),
missingPartSize: lastPartSize,
}
}
} else {
if uploadedPart.Size != partSize {
missingPartsInfo[uploadedPart.PartNumber] = missingPart{
readAtOffset: int64(uploadedPart.PartNumber-1) * partSize,
missingPartSize: partSize,
}
}
}
}
partNumber++
}
return missingPartsInfo
}

// putObjectMultipartFromReadAt - Uploads files bigger than 5MiB. Supports reader
// of type which implements io.ReaderAt interface (ReadAt method).
//
Expand Down Expand Up @@ -57,9 +101,6 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload

// Previous part number.
var prevPartNumber int

// A map of all uploaded parts.
var partsInfo = make(map[int]objectPart)

Expand All @@ -72,23 +113,18 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
}

// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := optimalPartInfo(size)
totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size)
if err != nil {
return 0, err
}

// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash

// Part number always starts with '1', moves to prevPartNumber if
// exists and is non zero.
partNumber := 1
if prevPartNumber != 0 {
partNumber = prevPartNumber
}

// Upload each part until totalUploadedSize reaches input reader size.
for totalUploadedSize < size {
missingPartsInfo := c.getMissingPartsInfo(partsInfo, size, totalPartsCount, partSize, lastPartSize)
// Upload each part until totalUploadedSize reaches input reader
// size.
for partNumber, missingPrt := range missingPartsInfo {
// Initialize a new temporary file.
tmpFile, err := newTempFile("multiparts$-putobject-partial")
if err != nil {
Expand All @@ -105,7 +141,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
writer := io.MultiWriter(tmpFile, hashWriter)

// Choose totalUploadedSize as the current readAtOffset.
readAtOffset := totalUploadedSize
readAtOffset := missingPrt.readAtOffset

// Read until partSize.
var totalReadPartSize int64
Expand All @@ -116,7 +152,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Following block reads data at an offset from the input
// reader and copies data to into local temporary file.
// Temporary file data is limited to the partSize.
for totalReadPartSize < partSize {
for totalReadPartSize < missingPrt.missingPartSize {
readAtSize, rerr := reader.ReadAt(readAtBuffer, readAtOffset)
if rerr != nil {
if rerr != io.EOF {
Expand Down Expand Up @@ -150,39 +186,35 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
}

// Proceed to upload the part.
objPart, err := c.uploadPart(bucketName, objectName, uploadID, tmpFile, partNumber, md5Sum, sha256Sum, totalReadPartSize)
objPart, err := c.uploadPart(bucketName, objectName, uploadID, tmpFile, partNumber, md5Sum,
sha256Sum, totalReadPartSize)
if err != nil {
// Close the read closer.
tmpFile.Close()
return totalUploadedSize, err
}

// Save successfully uploaded size.
totalUploadedSize += totalReadPartSize

// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart

// Move to next part.
partNumber++
}

// Verify if we uploaded all the data.
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
}

// Loop over uploaded parts to save them in a Parts array before completing the multipart request.
for _, part := range partsInfo {
var complPart completePart
complPart.ETag = part.ETag
complPart.PartNumber = part.PartNumber
totalUploadedSize += part.Size
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
}

// Verify if we uploaded all the data.
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
}

// Verify if totalPartsCount is not equal to total list of parts.
if totalPartsCount != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts))
return totalUploadedSize, ErrInvalidParts(totalPartsCount, len(completeMultipartUpload.Parts))
}

// Sort all completed parts.
Expand Down
6 changes: 3 additions & 3 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func getReaderSize(reader io.Reader) (size int64, err error) {
var result []reflect.Value
size = -1
if reader != nil {
// Verify if there is a method by name 'Len'.
lenFn := reflect.ValueOf(reader).MethodByName("Len")
// Verify if there is a method by name 'Size'.
lenFn := reflect.ValueOf(reader).MethodByName("Size")
if lenFn.IsValid() {
if lenFn.Kind() == reflect.Func {
// Call the 'Len' function and save its return value.
// Call the 'Size' function and save its return value.
result = lenFn.Call([]reflect.Value{})
if result != nil && len(result) == 1 {
lenValue := result[0]
Expand Down

0 comments on commit 60e4cd8

Please sign in to comment.