Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
disq committed Mar 15, 2018
2 parents d978f45 + 2001dfb commit 3195958
Show file tree
Hide file tree
Showing 2,372 changed files with 19,568 additions and 1,463,655 deletions.
89 changes: 73 additions & 16 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 11 additions & 6 deletions Gopkg.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
[[dependencies]]
[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = ">=1.8.0, <2.0.0"

[[dependencies]]
[[constraint]]
name = "github.com/cenkalti/backoff"
version = "1.0.0"

[[dependencies]]
[[constraint]]
name = "github.com/google/gops"
version = "0.3.1"

[[dependencies]]
[[constraint]]
branch = "master"
name = "github.com/termie/go-shutil"

[[dependencies]]
[[constraint]]
name = "github.com/posener/complete"
revision = "55ecf96557cf9eb690221d52354c4408ff59003a"
version = "1.1"

[prune]
non-go = true
go-tests = true
unused-packages = true
8 changes: 7 additions & 1 deletion complete/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func s3predictor(a cmp.Args) []string {
}

// Quickly create a new session with defaults
ses, err := core.NewAwsSession(-1)
ses, err := core.NewAwsSession(-1, "")
if err != nil {
return nil
}
Expand Down Expand Up @@ -239,6 +239,12 @@ func s3predictor(a cmp.Args) []string {
}

if s3bucket != "" {
// Override default region with bucket
ses, err := core.GetSessionForBucket(client, s3bucket)
if err == nil {
client = s3.New(ses)
}

var ret []string

prefix := "s3://" + s3bucket + "/"
Expand Down
2 changes: 1 addition & 1 deletion core/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *CommandMap) String(optsOverride ...opt.OptionType) (s string) {

// GetCommandList returns a text of accepted Commands with their options and arguments
func GetCommandList() string {
list := make(map[string][]string, 0)
list := make(map[string][]string)
overrides := map[op.Operation]string{
op.Abort: "exit [exit code]",
op.ShellExec: "! command [parameters...]",
Expand Down
63 changes: 37 additions & 26 deletions core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
)

const sdkPanicErrCode = "SdkPanic"
Expand All @@ -32,38 +33,48 @@ func (e AcceptableErrorType) Acceptable() bool {
}

// IsRetryableError returns if an error (probably awserr) is retryable, along with an error code
func IsRetryableError(err error) (string, bool) {
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
//fmt.Println("awsErr", awsErr.Code(), awsErr.Message(), awsErr.OrigErr())
func IsRetryableError(err error) (code string, retryable bool) {
if err == nil {
return
}

errCode := awsErr.Code()
switch errCode {
case "SlowDown", "SerializationError", "RequestError", sdkPanicErrCode:
return errCode, true
}
code = "UNK"
retryable = request.IsErrorRetryable(err) || request.IsErrorThrottle(err)

if origCode, origRetryable := IsRetryableError(awsErr.OrigErr()); origRetryable {
return origCode, true
}
if awsErr, ok := err.(awserr.Error); ok {
//fmt.Println("awsErr", awsErr.Code(), awsErr.Message(), awsErr.OrigErr())

errCode := awsErr.Code()
switch errCode {
case "SlowDown", "SerializationError", "RequestError", sdkPanicErrCode:
code = errCode
return
}

if reqErr, ok := err.(awserr.RequestFailure); ok {
// A service error occurred
//fmt.Println("reqErr", reqErr.StatusCode(), reqErr.RequestID())
errCode = reqErr.Code()
switch errCode {
case "InternalError", "SerializationError", "RequestError":
return errCode, true
}
status := reqErr.StatusCode()
switch status {
case 400, 500:
return fmt.Sprintf("HTTP%d", status), true
}
if origCode, origRetryable := IsRetryableError(awsErr.OrigErr()); origRetryable {
code = origCode
return
}

if reqErr, ok := err.(awserr.RequestFailure); ok {
// A service error occurred
//fmt.Println("reqErr", reqErr.StatusCode(), reqErr.RequestID())
errCode = reqErr.Code()
switch errCode {
case "InternalError", "SerializationError", "RequestError":
code = errCode
return
}
status := reqErr.StatusCode()
switch status {
case 500:
code = fmt.Sprintf("HTTP%d", status)
return
}
}
}
return "", false

return
}

// CleanupError converts multiline error messages generated by aws-sdk-go into a single line
Expand Down
39 changes: 17 additions & 22 deletions core/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,36 +859,31 @@ Midway-failing lister() fns are not thoroughly tested and may hang or panic

func wildOperation(wp *WorkerParams, lister wildLister, callback wildCallback) error {
ch := make(chan interface{})
closer := make(chan bool)
closer := make(chan struct{})
subjobStats := subjobStatsType{} // Tally successful and total processed sub-jobs here
var subJobCounter uint32 // number of total subJobs issued

// This goroutine will read ls results from ch and issue new subJobs
go func() {
defer close(closer) // Close closer when goroutine exits
for {
select {
case data, ok := <-ch:
if !ok {
// Channel closed early: err returned from s3list?
return
}
j := callback(data)
if j != nil {
j.subJobData = &subjobStats
subjobStats.Add(1)
subJobCounter++
select {
case *wp.subJobQueue <- j:
case <-wp.ctx.Done():
return
}
}
if data == nil {
// End of listing

// If channel closed early: err returned from s3list?
for data := range ch {
j := callback(data)
if j != nil {
j.subJobData = &subjobStats
subjobStats.Add(1)
subJobCounter++
select {
case *wp.subJobQueue <- j:
case <-wp.ctx.Done():
return
}
}
if data == nil {
// End of listing
return
}
}
}()

Expand All @@ -900,7 +895,7 @@ func wildOperation(wp *WorkerParams, lister wildLister, callback wildCallback) e
<-closer // Wait for EOF on goroutine
verboseLog("wildOperation all subjobs sent")

closer = make(chan bool)
closer = make(chan struct{})
go func() {
subjobStats.Wait() // Wait for all jobs to finish
close(closer)
Expand Down
Loading

0 comments on commit 3195958

Please sign in to comment.