Skip to content

Commit

Permalink
service/s3/s3manager: Move part buffer pool upwards to allow reuse. (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
skmcgrail authored Oct 1, 2019
1 parent 02207b1 commit fe72a52
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
### SDK Features

### SDK Enhancements
* `service/s3/s3manager`: Allow reuse of Uploader buffer `sync.Pool` amongst multiple Upload calls ([#2863](https://github.com/aws/aws-sdk-go/pull/2863))
* The `sync.Pool` used for the reuse of `[]byte` slices when handling streaming payloads will now be shared across multiple Upload calls when the upload part size remains constant.

### SDK Bugs
* `internal/ini`: Fix ini parser to handle empty values [#2860](https://github.com/aws/aws-sdk-go/pull/2860)
Expand Down
33 changes: 27 additions & 6 deletions service/s3/s3manager/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ type Uploader struct {

// Defines the buffer strategy used when uploading a part
BufferProvider ReadSeekerWriteToProvider

// partPool allows for the re-usage of streaming payload part buffers between upload calls
partPool *partPool
}

// NewUploader creates a new Uploader instance to upload objects to S3. Pass In
Expand Down Expand Up @@ -201,6 +204,8 @@ func newUploader(client s3iface.S3API, options ...func(*Uploader)) *Uploader {
option(u)
}

u.partPool = newPartPool(u.PartSize)

return u
}

Expand Down Expand Up @@ -283,6 +288,7 @@ func (u Uploader) UploadWithContext(ctx aws.Context, input *UploadInput, opts ..
for _, opt := range opts {
opt(&i.cfg)
}

i.cfg.RequestOptions = append(i.cfg.RequestOptions, request.WithAppendUserAgent("S3Manager"))

return i.upload()
Expand Down Expand Up @@ -352,8 +358,6 @@ type uploader struct {

readerPos int64 // current reader position
totalSize int64 // set to -1 if the size is not known

bufferPool sync.Pool
}

// internal logic for deciding whether to upload a single part or use a
Expand Down Expand Up @@ -393,8 +397,10 @@ func (u *uploader) init() error {
u.cfg.MaxUploadParts = MaxUploadParts
}

u.bufferPool = sync.Pool{
New: func() interface{} { return make([]byte, u.cfg.PartSize) },
// If PartSize was changed or partPool was never setup then we need to allocated a new pool
// so that we return []byte slices of the correct size
if u.cfg.partPool == nil || u.cfg.partPool.partSize != u.cfg.PartSize {
u.cfg.partPool = newPartPool(u.cfg.PartSize)
}

// Try to get the total size for some optimizations
Expand Down Expand Up @@ -466,12 +472,12 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) {
return reader, int(n), cleanup, err

default:
part := u.bufferPool.Get().([]byte)
part := u.cfg.partPool.Get().([]byte)
n, err := readFillBuf(r, part)
u.readerPos += int64(n)

cleanup := func() {
u.bufferPool.Put(part)
u.cfg.partPool.Put(part)
}

return bytes.NewReader(part[0:n]), n, cleanup, err
Expand Down Expand Up @@ -751,3 +757,18 @@ func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput {

return resp
}

type partPool struct {
partSize int64
sync.Pool
}

func newPartPool(partSize int64) *partPool {
p := &partPool{partSize: partSize}

p.New = func() interface{} {
return make([]byte, p.partSize)
}

return p
}

0 comments on commit fe72a52

Please sign in to comment.