Skip to content

Commit

Permalink
storage/s3: allow region auto-fetch errors during session init
Browse files Browse the repository at this point in the history
If GetBucketRegion request fails, inform the caller, delegate error
handling to command execution.

Updates #237
Fixes #251
Fixes #252
  • Loading branch information
igungor committed Nov 25, 2020
1 parent 5ac8d70 commit c3f8e3d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 55 deletions.
17 changes: 10 additions & 7 deletions log/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,27 @@ package log
import (
"fmt"

"github.com/peak/s5cmd/storage"
"github.com/peak/s5cmd/storage/url"
"github.com/peak/s5cmd/strutil"
)

type JSONMarshaler interface {
JSON() string
}

// Message is an interface to print structured logs.
type Message interface {
fmt.Stringer
JSON() string
JSONMarshaler
}

// InfoMessage is a generic message structure for successful operations.
type InfoMessage struct {
Operation string `json:"operation"`
Success bool `json:"success"`
Source *url.URL `json:"source"`
Destination *url.URL `json:"destination,omitempty"`
Object *storage.Object `json:"object,omitempty"`
Operation string `json:"operation"`
Success bool `json:"success"`
Source *url.URL `json:"source"`
Destination *url.URL `json:"destination,omitempty"`
Object JSONMarshaler `json:"object,omitempty"`
}

// String is the string representation of InfoMessage.
Expand Down
24 changes: 17 additions & 7 deletions storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface"
"github.com/peak/s5cmd/log"
"github.com/peak/s5cmd/storage/url"
)

Expand Down Expand Up @@ -678,16 +679,25 @@ func (s *s3Session) newSession(ctx context.Context, opts Options) (*session.Sess
sess.Config.Region = aws.String(endpoints.UsEast1RegionID)
}

// get region of the bucket and create session accordingly
// if the region is not provided, it means we want region-independent session
// for operations such as listing buckets, making a new bucket, ...
// get region of the bucket and create session accordingly. if the region
// is not provided, it means we want region-independent session
// for operations such as listing buckets, making a new bucket etc.
if opts.bucket != "" {
region, err := s3manager.GetBucketRegion(ctx, sess, opts.bucket, "")
region, err := s3manager.GetBucketRegion(ctx, sess, opts.bucket, "", func(r *request.Request) {
r.Config.Credentials = sess.Config.Credentials
})
if err != nil {
return nil, err
if errHasCode(err, "NotFound") {
return nil, err
}
// don't deny any request to the service if region auto-fetching
// receives an error. Delegate error handling to command execution.
err = fmt.Errorf("session: fetching region failed: %v", err)
msg := log.ErrorMessage{Err: err.Error()}
log.Error(msg)
} else {
sess.Config.Region = aws.String(region)
}

sess.Config.Region = aws.String(region)
}

s.sessions[opts] = sess
Expand Down
70 changes: 29 additions & 41 deletions storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestNewSessionWithRegionSetViaEnv(t *testing.T) {
}
}

func TestS3ListSuccess(t *testing.T) {
func TestS3ListURL(t *testing.T) {
url, err := url.New("s3://bucket/key")
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -366,17 +366,17 @@ func TestS3Retry(t *testing.T) {

// Connection errors
{
name: "connection reset",
name: "ConnectionReset",
err: fmt.Errorf("connection reset by peer"),
},
{
name: "broken pipe",
name: "BrokenPipe",
err: fmt.Errorf("broken pipe"),
},

// Unknown errors
{
name: "an unknown error is also retried by SDK",
name: "UnknownSDKError",
err: fmt.Errorf("an error that is not known to the SDK"),
},
}
Expand Down Expand Up @@ -427,23 +427,6 @@ func TestS3Retry(t *testing.T) {
}
}

// Credit to aws-sdk-go
func val(i interface{}, s string) interface{} {
v, err := awsutil.ValuesAtPath(i, s)
if err != nil || len(v) == 0 {
return nil
}
if _, ok := v[0].(io.Reader); ok {
return v[0]
}

if rv := reflect.ValueOf(v[0]); rv.Kind() == reflect.Ptr {
return rv.Elem().Interface()
}

return v[0]
}

func TestS3CopyEncryptionRequest(t *testing.T) {
testcases := []struct {
name string
Expand Down Expand Up @@ -506,8 +489,8 @@ func TestS3CopyEncryptionRequest(t *testing.T) {
}

params := r.Params
sse := val(params, "ServerSideEncryption")
key := val(params, "SSEKMSKeyId")
sse := valueAtPath(params, "ServerSideEncryption")
key := valueAtPath(params, "SSEKMSKeyId")

if !(sse == nil && tc.expectedSSE == "") {
assert.Equal(t, sse, tc.expectedSSE)
Expand All @@ -516,7 +499,7 @@ func TestS3CopyEncryptionRequest(t *testing.T) {
assert.Equal(t, key, tc.expectedSSEKeyID)
}

aclVal := val(r.Params, "ACL")
aclVal := valueAtPath(r.Params, "ACL")

if aclVal == nil && tc.expectedAcl == "" {
return
Expand Down Expand Up @@ -608,8 +591,8 @@ func TestS3PutEncryptionRequest(t *testing.T) {
}

params := r.Params
sse := val(params, "ServerSideEncryption")
key := val(params, "SSEKMSKeyId")
sse := valueAtPath(params, "ServerSideEncryption")
key := valueAtPath(params, "SSEKMSKeyId")

if !(sse == nil && tc.expectedSSE == "") {
assert.Equal(t, sse, tc.expectedSSE)
Expand All @@ -618,7 +601,7 @@ func TestS3PutEncryptionRequest(t *testing.T) {
assert.Equal(t, key, tc.expectedSSEKeyID)
}

aclVal := val(r.Params, "ACL")
aclVal := valueAtPath(r.Params, "ACL")

if aclVal == nil && tc.expectedAcl == "" {
return
Expand Down Expand Up @@ -695,7 +678,6 @@ func TestS3listObjectsV2(t *testing.T) {
mockApi.Handlers.Send.Clear()

mockApi.Handlers.Send.PushBack(func(r *request.Request) {

r.HTTPResponse = &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader("")),
Expand All @@ -704,7 +686,6 @@ func TestS3listObjectsV2(t *testing.T) {
r.Data = &s3.ListObjectsV2Output{
Contents: s3objs,
}

})

mockS3 := &S3{
Expand All @@ -724,21 +705,13 @@ func TestS3listObjectsV2(t *testing.T) {
}

func TestSessionCreateAndCachingWithDifferentBuckets(t *testing.T) {

testcases := []struct {
bucket string
alreadyCreated bool // sessions should not be created again if they already have been created before
}{
{
bucket: "bucket",
},
{
bucket: "bucket",
alreadyCreated: true,
},
{
bucket: "test-bucket",
},
{bucket: "bucket"},
{bucket: "bucket", alreadyCreated: true},
{bucket: "test-bucket"},
}

sess := map[string]*session.Session{}
Expand All @@ -747,7 +720,6 @@ func TestSessionCreateAndCachingWithDifferentBuckets(t *testing.T) {
awsSess, err := sessionProvider.newSession(context.Background(), Options{
bucket: tc.bucket,
})

if err != nil {
t.Error(err)
}
Expand All @@ -760,3 +732,19 @@ func TestSessionCreateAndCachingWithDifferentBuckets(t *testing.T) {
}
}
}

func valueAtPath(i interface{}, s string) interface{} {
v, err := awsutil.ValuesAtPath(i, s)
if err != nil || len(v) == 0 {
return nil
}
if _, ok := v[0].(io.Reader); ok {
return v[0]
}

if rv := reflect.ValueOf(v[0]); rv.Kind() == reflect.Ptr {
return rv.Elem().Interface()
}

return v[0]
}

0 comments on commit c3f8e3d

Please sign in to comment.