From 60e4cd8b8f61eecd3d94d9a29feb2f6e255c252e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 13 Jan 2016 17:36:15 -0800 Subject: [PATCH] putObjectReadAt: Sparse upload support for ReadAt. --- api-put-object-multipart.go | 8 ++-- api-put-object-readat.go | 88 +++++++++++++++++++++++++------------ api-put-object.go | 6 +-- 3 files changed, 68 insertions(+), 34 deletions(-) diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go index ae7084c2a9..47a3a7c8e0 100644 --- a/api-put-object-multipart.go +++ b/api-put-object-multipart.go @@ -171,9 +171,11 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart) } - // Verify if totalPartsCount is not equal to total list of parts. - if totalPartsCount != len(completeMultipartUpload.Parts) { - return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts)) + if size > 0 { + // Verify if totalPartsCount is not equal to total list of parts. + if totalPartsCount != len(completeMultipartUpload.Parts) { + return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts)) + } } // Sort all completed parts. diff --git a/api-put-object-readat.go b/api-put-object-readat.go index ff2e1e37bb..137d7ca7c2 100644 --- a/api-put-object-readat.go +++ b/api-put-object-readat.go @@ -25,6 +25,50 @@ import ( "sort" ) +// missing Parts info container. +type missingPart struct { + readAtOffset int64 + missingPartSize int64 +} + +// getMissingPartsInfo missing parts info map. +func (c Client) getMissingPartsInfo(uploadedParts map[int]objectPart, size int64, totalPartsCount int, partSize int64, lastPartSize int64) (missingPartsInfo map[int]missingPart) { + missingPartsInfo = make(map[int]missingPart) + partNumber := 1 + for partNumber <= totalPartsCount { + uploadedPart, ok := uploadedParts[partNumber] + if !ok { + var missingPrt missingPart + if partNumber == totalPartsCount { + missingPrt.readAtOffset = (size - lastPartSize) + missingPrt.missingPartSize = lastPartSize + } else { + missingPrt.readAtOffset = int64(partNumber-1) * partSize + missingPrt.missingPartSize = partSize + } + missingPartsInfo[partNumber] = missingPrt + } else { + if uploadedPart.PartNumber == totalPartsCount { + if uploadedPart.Size != lastPartSize { + missingPartsInfo[uploadedPart.PartNumber] = missingPart{ + readAtOffset: (size - lastPartSize), + missingPartSize: lastPartSize, + } + } + } else { + if uploadedPart.Size != partSize { + missingPartsInfo[uploadedPart.PartNumber] = missingPart{ + readAtOffset: int64(uploadedPart.PartNumber-1) * partSize, + missingPartSize: partSize, + } + } + } + } + partNumber++ + } + return missingPartsInfo +} + // putObjectMultipartFromReadAt - Uploads files bigger than 5MiB. Supports reader // of type which implements io.ReaderAt interface (ReadAt method). // @@ -57,9 +101,6 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read // Complete multipart upload. var completeMultipartUpload completeMultipartUpload - // Previous part number. - var prevPartNumber int - // A map of all uploaded parts. var partsInfo = make(map[int]objectPart) @@ -72,7 +113,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read } // Calculate the optimal parts info for a given size. - totalPartsCount, partSize, _, err := optimalPartInfo(size) + totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size) if err != nil { return 0, err } @@ -80,15 +121,10 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read // MD5 and SHA256 hasher. var hashMD5, hashSHA256 hash.Hash - // Part number always starts with '1', moves to prevPartNumber if - // exists and is non zero. - partNumber := 1 - if prevPartNumber != 0 { - partNumber = prevPartNumber - } - - // Upload each part until totalUploadedSize reaches input reader size. - for totalUploadedSize < size { + missingPartsInfo := c.getMissingPartsInfo(partsInfo, size, totalPartsCount, partSize, lastPartSize) + // Upload each part until totalUploadedSize reaches input reader + // size. + for partNumber, missingPrt := range missingPartsInfo { // Initialize a new temporary file. tmpFile, err := newTempFile("multiparts$-putobject-partial") if err != nil { @@ -105,7 +141,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read writer := io.MultiWriter(tmpFile, hashWriter) // Choose totalUploadedSize as the current readAtOffset. - readAtOffset := totalUploadedSize + readAtOffset := missingPrt.readAtOffset // Read until partSize. var totalReadPartSize int64 @@ -116,7 +152,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read // Following block reads data at an offset from the input // reader and copies data to into local temporary file. // Temporary file data is limited to the partSize. - for totalReadPartSize < partSize { + for totalReadPartSize < missingPrt.missingPartSize { readAtSize, rerr := reader.ReadAt(readAtBuffer, readAtOffset) if rerr != nil { if rerr != io.EOF { @@ -150,26 +186,16 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read } // Proceed to upload the part. - objPart, err := c.uploadPart(bucketName, objectName, uploadID, tmpFile, partNumber, md5Sum, sha256Sum, totalReadPartSize) + objPart, err := c.uploadPart(bucketName, objectName, uploadID, tmpFile, partNumber, md5Sum, + sha256Sum, totalReadPartSize) if err != nil { // Close the read closer. tmpFile.Close() return totalUploadedSize, err } - // Save successfully uploaded size. - totalUploadedSize += totalReadPartSize - // Save successfully uploaded part metadata. partsInfo[partNumber] = objPart - - // Move to next part. - partNumber++ - } - - // Verify if we uploaded all the data. - if totalUploadedSize != size { - return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) } // Loop over uploaded parts to save them in a Parts array before completing the multipart request. @@ -177,12 +203,18 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read var complPart completePart complPart.ETag = part.ETag complPart.PartNumber = part.PartNumber + totalUploadedSize += part.Size completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart) } + // Verify if we uploaded all the data. + if totalUploadedSize != size { + return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) + } + // Verify if totalPartsCount is not equal to total list of parts. if totalPartsCount != len(completeMultipartUpload.Parts) { - return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts)) + return totalUploadedSize, ErrInvalidParts(totalPartsCount, len(completeMultipartUpload.Parts)) } // Sort all completed parts. diff --git a/api-put-object.go b/api-put-object.go index 34f79ede3a..0a8896f78a 100644 --- a/api-put-object.go +++ b/api-put-object.go @@ -30,11 +30,11 @@ func getReaderSize(reader io.Reader) (size int64, err error) { var result []reflect.Value size = -1 if reader != nil { - // Verify if there is a method by name 'Len'. - lenFn := reflect.ValueOf(reader).MethodByName("Len") + // Verify if there is a method by name 'Size'. + lenFn := reflect.ValueOf(reader).MethodByName("Size") if lenFn.IsValid() { if lenFn.Kind() == reflect.Func { - // Call the 'Len' function and save its return value. + // Call the 'Size' function and save its return value. result = lenFn.Call([]reflect.Value{}) if result != nil && len(result) == 1 { lenValue := result[0]