Skip to content

Commit

Permalink
Implement standalone job producer (peak#65)
Browse files Browse the repository at this point in the history
* job_producer: Init skeleton

* Seperate command and job logic

* Limit concurrency

* worker: Restructure

* Support multiple sources

* job_producer: Init local filesystem

* objurl: Add Join() function

* Simplify job generator

* job_producer: Use urls

* Run jobs directly

* Rename scan.go to scanner.go

* Update comments

* Add recursive flag to job producer

* Move CheckConditions to job_check.go

* Since JobArgument is removed, move job check logic to another file. Use storage.Stat() for both destination and source urls.

* Pass tests excluding stats

* format imports

* Pass stats type into job struct

* Add godoc to op

* Simplify job.Run()

* Return client.Stat()

* job_check: fix condition check logic

* core: dont print job status if successful

* e2e: fix tests

* storage: do s3 multi delete operations at the storage layer

* storage: remove stat bookkeeping

* core: fix cp dir to s3 regression

* e2e: add RemoveTenThousandS3Objects test

* e2e: setup with options

* e2e: add "mem" comment

* vendor: add missing dependency

* storage: support metadata for Storage.Copy

Co-authored-by: H. İbrahim Güngör <[email protected]>
  • Loading branch information
sonmezonur and igungor committed Feb 26, 2020
1 parent 43e3512 commit cf79cab
Show file tree
Hide file tree
Showing 51 changed files with 2,082 additions and 1,721 deletions.
6 changes: 2 additions & 4 deletions complete/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ func getSubCommands() cmp.Commands {
argList := make(map[string]*map[opt.ParamType]struct{})

for _, c := range core.Commands {
if c.Operation.IsInternal() {
continue
}

// Do the flags
flagsForKeyword, ok := flagList[c.Keyword]
Expand Down Expand Up @@ -249,7 +246,8 @@ func s3predictor(a cmp.Args) []string {
url.Prefix = s3key

for object := range client.List(ctx, url, true, s3MaxKeys) {
if object.IsMarker() {
// TODO(ig): handle error
if object.Err != nil {
continue
}

Expand Down
139 changes: 109 additions & 30 deletions core/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,93 @@ import (
"sort"
"strings"

"github.com/peak/s5cmd/objurl"
"github.com/peak/s5cmd/op"
"github.com/peak/s5cmd/opt"
"github.com/peak/s5cmd/storage"
)

// Command is the representation of the command.
type Command struct {
// the original string received from user, such as "cp -h", or "cp -R foo bar"
original string
keyword string
operation op.Operation
args []*objurl.ObjectURL
opts opt.OptionList
}

// String is the representation of command.
func (c Command) String() string {
return c.original
}

// getStorageClass gets storage class from option list.
func (c Command) getStorageClass() storage.StorageClass {
var cls storage.StorageClass
if c.opts.Has(opt.RR) {
cls = storage.StorageReducedRedundancy
} else if c.opts.Has(opt.IA) {
cls = storage.StorageStandardIA
} else {
cls = storage.StorageStandard
}
return cls
}

// SupportsAggregation checks if command is supports aggregation, such as S3 batch deletes.
func (c Command) SupportsAggregation() bool {
return c.opts.Has(opt.SupportsAggregation)
}

// IsBatch checks if command is a batch operation.
func (c Command) IsBatch() bool {
return c.operation.IsBatch()
}

// makeJob creates new Job from the command.
func (c Command) makeJob(cmd string, operation op.Operation, args ...*objurl.ObjectURL) *Job {
return &Job{
command: cmd,
operation: operation,
opts: c.opts,
args: args,
storageClass: c.getStorageClass(),
statType: operation.GetStat(),
}
}

// toJob converts raw command to job.
func (c Command) toJob() *Job {
return &Job{
command: c.keyword,
operation: c.operation,
opts: c.opts,
args: c.args,
storageClass: c.getStorageClass(),
statType: c.operation.GetStat(),
}
}

// displayHelp displays help text.
func (c Command) displayHelp() {
fmt.Fprintf(os.Stderr, "%v\n\n", UsageLine())

cl, opts, cnt := CommandHelps(c.keyword)

if ol := opt.OptionHelps(opts); ol != "" {
fmt.Fprintf(os.Stderr, "\"%v\" command options:\n", c)
fmt.Fprint(os.Stderr, ol)
fmt.Fprint(os.Stderr, "\n\n")
}

if cnt > 1 {
fmt.Fprintf(os.Stderr, "Help for \"%v\" commands:\n", c)
}
fmt.Fprint(os.Stderr, cl)
fmt.Fprint(os.Stderr, "\nTo list available general options, run without arguments.\n")
}

// CommandMap describes each command
type CommandMap struct {
// Keyword is the command's invocation name
Expand All @@ -22,30 +105,32 @@ type CommandMap struct {
Opts opt.OptionList
}

var noOpts = opt.OptionList{}

// Commands is a list of registered commands
var Commands = []CommandMap{
{"exit", op.Abort, []opt.ParamType{}, opt.OptionList{}},
{"exit", op.Abort, []opt.ParamType{opt.Unchecked}, opt.OptionList{}},
{"exit", op.Abort, []opt.ParamType{}, noOpts},
{"exit", op.Abort, []opt.ParamType{opt.Unchecked}, noOpts},

// File to file
{"cp", op.LocalCopy, []opt.ParamType{opt.FileObj, opt.FileOrDir}, opt.OptionList{}},
{"cp", op.BatchLocalCopy, []opt.ParamType{opt.Glob, opt.Dir}, opt.OptionList{}},
{"cp", op.BatchLocalCopy, []opt.ParamType{opt.Dir, opt.Dir}, opt.OptionList{}},
{"cp", op.LocalCopy, []opt.ParamType{opt.FileObj, opt.FileOrDir}, noOpts},
{"cp", op.BatchLocalCopy, []opt.ParamType{opt.Glob, opt.Dir}, noOpts},
{"cp", op.BatchLocalCopy, []opt.ParamType{opt.Dir, opt.Dir}, noOpts},

// S3 to S3
{"cp", op.Copy, []opt.ParamType{opt.S3SimpleObj, opt.S3ObjOrDir}, opt.OptionList{}},
{"cp", op.BatchCopy, []opt.ParamType{opt.S3WildObj, opt.S3Dir}, opt.OptionList{}},
{"cp", op.Copy, []opt.ParamType{opt.S3SimpleObj, opt.S3ObjOrDir}, noOpts},
{"cp", op.BatchCopy, []opt.ParamType{opt.S3WildObj, opt.S3Dir}, noOpts},

// File to S3
{"cp", op.Upload, []opt.ParamType{opt.FileObj, opt.S3ObjOrDir}, opt.OptionList{}},
{"cp", op.BatchUpload, []opt.ParamType{opt.Glob, opt.S3Dir}, opt.OptionList{}},
{"cp", op.BatchUpload, []opt.ParamType{opt.Dir, opt.S3Dir}, opt.OptionList{}},
{"cp", op.Upload, []opt.ParamType{opt.FileObj, opt.S3ObjOrDir}, noOpts},
{"cp", op.BatchUpload, []opt.ParamType{opt.Glob, opt.S3Dir}, opt.OptionList{opt.Recursive}},
{"cp", op.BatchUpload, []opt.ParamType{opt.Dir, opt.S3Dir}, opt.OptionList{opt.Recursive}},

// S3 to file
{"cp", op.Download, []opt.ParamType{opt.S3SimpleObj, opt.FileOrDir}, opt.OptionList{}},
{"get", op.AliasGet, []opt.ParamType{opt.S3SimpleObj, opt.OptionalFileOrDir}, opt.OptionList{}},
{"cp", op.BatchDownload, []opt.ParamType{opt.S3WildObj, opt.Dir}, opt.OptionList{}},
{"get", op.AliasBatchGet, []opt.ParamType{opt.S3WildObj, opt.OptionalDir}, opt.OptionList{}},
{"cp", op.Download, []opt.ParamType{opt.S3SimpleObj, opt.FileOrDir}, noOpts},
{"get", op.AliasGet, []opt.ParamType{opt.S3SimpleObj, opt.OptionalFileOrDir}, noOpts},
{"cp", op.BatchDownload, []opt.ParamType{opt.S3WildObj, opt.Dir}, noOpts},
{"get", op.AliasBatchGet, []opt.ParamType{opt.S3WildObj, opt.OptionalDir}, noOpts},

// File to file
{"mv", op.LocalCopy, []opt.ParamType{opt.FileObj, opt.FileOrDir}, opt.OptionList{opt.DeleteSource}},
Expand All @@ -66,21 +151,21 @@ var Commands = []CommandMap{
{"mv", op.BatchDownload, []opt.ParamType{opt.S3WildObj, opt.Dir}, opt.OptionList{opt.DeleteSource}},

// File
{"rm", op.LocalDelete, []opt.ParamType{opt.FileObj}, opt.OptionList{}},
{"rm", op.LocalDelete, []opt.ParamType{opt.FileObj}, noOpts},

// S3
{"rm", op.Delete, []opt.ParamType{opt.S3SimpleObj}, opt.OptionList{}},
{"rm", op.BatchDelete, []opt.ParamType{opt.S3WildObj}, opt.OptionList{}},
{"batch-rm", op.BatchDeleteActual, []opt.ParamType{opt.S3Obj, opt.UncheckedOneOrMore}, opt.OptionList{}},
{"rm", op.Delete, []opt.ParamType{opt.S3SimpleObj}, noOpts},
{"rm", op.BatchDelete, []opt.ParamType{opt.S3WildObj}, opt.OptionList{opt.SupportsAggregation}},
{"batch-rm", op.BatchDelete, []opt.ParamType{opt.S3Obj, opt.UncheckedOneOrMore}, opt.OptionList{opt.SupportsAggregation}},

{"ls", op.ListBuckets, []opt.ParamType{}, opt.OptionList{}},
{"ls", op.List, []opt.ParamType{opt.S3ObjOrDir}, opt.OptionList{}},
{"ls", op.List, []opt.ParamType{opt.S3WildObj}, opt.OptionList{}},
{"ls", op.ListBuckets, []opt.ParamType{}, noOpts},
{"ls", op.List, []opt.ParamType{opt.S3ObjOrDir}, noOpts},
{"ls", op.List, []opt.ParamType{opt.S3WildObj}, noOpts},

{"du", op.Size, []opt.ParamType{opt.S3ObjOrDir}, opt.OptionList{}},
{"du", op.Size, []opt.ParamType{opt.S3WildObj}, opt.OptionList{}},
{"du", op.Size, []opt.ParamType{opt.S3ObjOrDir}, noOpts},
{"du", op.Size, []opt.ParamType{opt.S3WildObj}, noOpts},

{"!", op.ShellExec, []opt.ParamType{opt.UncheckedOneOrMore}, opt.OptionList{}},
{"!", op.ShellExec, []opt.ParamType{opt.UncheckedOneOrMore}, noOpts},
}

// String formats the CommandMap using its Operation and ParamTypes
Expand Down Expand Up @@ -121,9 +206,6 @@ func CommandHelps(filter string) (string, []opt.OptionType, int) {
var lastDesc string
var l []string
for _, c := range Commands {
if c.Operation.IsInternal() {
continue
}
if filter != "" && c.Keyword != filter {
continue
}
Expand Down Expand Up @@ -193,9 +275,6 @@ func CommandHelps(filter string) (string, []opt.OptionType, int) {
func CommandList() []string {
l := make(map[string]struct{})
for _, c := range Commands {
if c.Operation.IsInternal() {
continue
}
l[c.Keyword] = struct{}{}
}

Expand Down
35 changes: 14 additions & 21 deletions core/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,22 @@ package core

import (
"github.com/peak/s5cmd/op"
"github.com/peak/s5cmd/stats"
)

type commandFunc func(*Job, *WorkerParams) (stats.StatType, *JobResponse)
type commandFunc func(*Job, *WorkerParams) *JobResponse

var globalCmdRegistry = map[op.Operation]commandFunc{
op.LocalCopy: LocalCopy,
op.LocalDelete: LocalDelete,
op.BatchLocalCopy: BatchLocalCopy,
op.BatchUpload: BatchLocalUpload,
op.Download: S3Download,
op.AliasGet: S3Download,
op.Upload: S3Upload,
op.List: S3List,
op.ListBuckets: S3ListBuckets,
op.Size: S3Size,
op.Copy: S3Copy,
op.Delete: S3Delete,
op.BatchDelete: S3BatchDelete,
op.BatchDeleteActual: S3BatchDeleteActual,
op.BatchDownload: S3BatchDownload,
op.AliasBatchGet: S3BatchDownload,
op.BatchCopy: S3BatchCopy,
op.ShellExec: ShellExec,
op.Abort: ShellAbort,
op.LocalCopy: LocalCopy,
op.LocalDelete: LocalDelete,
op.Download: S3Download,
op.AliasGet: S3Download,
op.Upload: S3Upload,
op.List: S3List,
op.ListBuckets: S3ListBuckets,
op.Size: S3Size,
op.Copy: S3Copy,
op.Delete: S3Delete,
op.BatchDelete: S3BatchDelete,
op.ShellExec: ShellExec,
op.Abort: ShellAbort,
}
Loading

0 comments on commit cf79cab

Please sign in to comment.