Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service/s3/s3manager: Move part buffer pool upwards to allow reuse. #2863

Merged
merged 1 commit into from
Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
### SDK Features

### SDK Enhancements
* `service/s3/s3manager`: Allow reuse of Uploader buffer `sync.Pool` amongst multiple Upload calls ([#2863](https://github.com/aws/aws-sdk-go/pull/2863))
* The `sync.Pool` used for the reuse of `[]byte` slices when handling streaming payloads will now be shared across multiple Upload calls when the upload part size remains constant.

### SDK Bugs
33 changes: 27 additions & 6 deletions service/s3/s3manager/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ type Uploader struct {

// Defines the buffer strategy used when uploading a part
BufferProvider ReadSeekerWriteToProvider

// partPool allows for the re-usage of streaming payload part buffers between upload calls
partPool *partPool
}

// NewUploader creates a new Uploader instance to upload objects to S3. Pass In
Expand Down Expand Up @@ -201,6 +204,8 @@ func newUploader(client s3iface.S3API, options ...func(*Uploader)) *Uploader {
option(u)
}

u.partPool = newPartPool(u.PartSize)

return u
}

Expand Down Expand Up @@ -283,6 +288,7 @@ func (u Uploader) UploadWithContext(ctx aws.Context, input *UploadInput, opts ..
for _, opt := range opts {
opt(&i.cfg)
}

i.cfg.RequestOptions = append(i.cfg.RequestOptions, request.WithAppendUserAgent("S3Manager"))

return i.upload()
Expand Down Expand Up @@ -352,8 +358,6 @@ type uploader struct {

readerPos int64 // current reader position
totalSize int64 // set to -1 if the size is not known

bufferPool sync.Pool
}

// internal logic for deciding whether to upload a single part or use a
Expand Down Expand Up @@ -393,8 +397,10 @@ func (u *uploader) init() error {
u.cfg.MaxUploadParts = MaxUploadParts
}

u.bufferPool = sync.Pool{
New: func() interface{} { return make([]byte, u.cfg.PartSize) },
// If PartSize was changed or partPool was never setup then we need to allocated a new pool
// so that we return []byte slices of the correct size
if u.cfg.partPool == nil || u.cfg.partPool.partSize != u.cfg.PartSize {
u.cfg.partPool = newPartPool(u.cfg.PartSize)
}

// Try to get the total size for some optimizations
Expand Down Expand Up @@ -466,12 +472,12 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) {
return reader, int(n), cleanup, err

default:
part := u.bufferPool.Get().([]byte)
part := u.cfg.partPool.Get().([]byte)
n, err := readFillBuf(r, part)
u.readerPos += int64(n)

cleanup := func() {
u.bufferPool.Put(part)
u.cfg.partPool.Put(part)
}

return bytes.NewReader(part[0:n]), n, cleanup, err
Expand Down Expand Up @@ -751,3 +757,18 @@ func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput {

return resp
}

type partPool struct {
partSize int64
sync.Pool
}

func newPartPool(partSize int64) *partPool {
p := &partPool{partSize: partSize}

p.New = func() interface{} {
return make([]byte, p.partSize)
}

return p
}