Skip to content

Commit

Permalink
command: implement 'select' command (peak#300)
Browse files Browse the repository at this point in the history
Co-authored-by: İbrahim Güngör <[email protected]>
Co-authored-by: İlkin Balkanay <[email protected]>
Co-authored-by: Onur Sönmez <[email protected]>

Fixes peak#299
  • Loading branch information
skeggse committed Jul 10, 2021
1 parent fbd99c5 commit c2a9d31
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 2 deletions.
23 changes: 21 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ storage services and local filesystems.
- Set Server Side Encryption using AWS Key Management Service (KMS)
- Set Access Control List (ACL) for objects/files on the upload, copy, move.
- Print object contents to stdout
- Select JSON records from objects using SQL expressions
- Create or remove buckets
- Summarize objects sizes, grouping by storage class
- Wildcard support for all operations
Expand Down Expand Up @@ -190,7 +191,7 @@ are not supported by `s5cmd` and result in error (since we have 2 different buck
rm s3://bucket-foo/object
rm s3://bucket-bar/object

more details and examples on `s5cmd run` are presented in a [later section](./README.md#L208).
more details and examples on `s5cmd run` are presented in a [later section](./README.md#L224).

#### Copy objects from S3 to S3

Expand All @@ -204,6 +205,24 @@ folder hierarchy.
⚠️ Copying objects (from S3 to S3) larger than 5GB is not supported yet. We have
an [open ticket](https://github.com/peak/s5cmd/issues/29) to track the issue.

#### Select JSON object content using SQL

`s5cmd` supports the `SelectObjectContent` S3 operation, and will run your
[SQL query](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference.html)
against objects matching normal wildcard syntax and emit matching JSON records via stdout. Records
from multiple objects will be interleaved, and order of the records is not guaranteed (though it's
likely that the records from a single object will arrive in-order, even if interleaved with other
records).

$ s5cmd select --compression GZIP \
--query "SELECT s.timestamp, s.hostname FROM S3Object s WHERE s.ip_address LIKE '10.%' OR s.application='unprivileged'" \
s3://bucket-foo/object/2021/*
{"timestamp":"2021-07-08T18:24:06.665Z","hostname":"application.internal"}
{"timestamp":"2021-07-08T18:24:16.095Z","hostname":"api.github.com"}

At the moment this operation _only_ supports JSON records selected with SQL. S3 calls this
lines-type JSON, but it seems that it works even if the records aren't line-delineated. YMMV.

#### Count objects and determine total size

$ s5cmd du --humanize 's3://bucket/2020/*'
Expand Down Expand Up @@ -406,7 +425,7 @@ For a more practical scenario, let's say we have an [avocado prices](https://www

## Beast Mode s5cmd

`s5cmd` allows to pass in some file, containing list of operations to be performed, as an argument to the `run` command as illustrated in the [above](./README.md#L199) example. Alternatively, one can pipe in commands into
`s5cmd` allows to pass in some file, containing list of operations to be performed, as an argument to the `run` command as illustrated in the [above](./README.md#L224) example. Alternatively, one can pipe in commands into
the `run:`

BUCKET=s5cmd-test; s5cmd ls s3://$BUCKET/*test | grep -v DIR | awk ‘{print $NF}’
Expand Down
1 change: 1 addition & 0 deletions command/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func Main(ctx context.Context, args []string) error {
moveCommand,
makeBucketCommand,
removeBucketCommand,
selectCommand,
sizeCommand,
catCommand,
runCommand,
Expand Down
213 changes: 213 additions & 0 deletions command/select.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package command

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"

"github.com/hashicorp/go-multierror"
"github.com/urfave/cli/v2"

errorpkg "github.com/peak/s5cmd/error"
"github.com/peak/s5cmd/log/stat"
"github.com/peak/s5cmd/parallel"
"github.com/peak/s5cmd/storage"
"github.com/peak/s5cmd/storage/url"
)

var selectHelpTemplate = `Name:
{{.HelpName}} - {{.Usage}}
Usage:
{{.HelpName}} [options] argument
Options:
{{range .VisibleFlags}}{{.}}
{{end}}
Examples:
01. Search for all JSON objects with the foo property set to 'bar' and spit them into stdout
> s5cmd {{.HelpName}} --compression gzip --query "SELECT * FROM S3Object s WHERE s.foo='bar'" s3://bucket/*
`

var selectCommandFlags = []cli.Flag{
&cli.StringFlag{
Name: "query",
Aliases: []string{"e"},
Usage: "SQL expression to use to select from the objects",
},
&cli.StringFlag{
Name: "compression",
Usage: "input compression format",
Value: "NONE",
},
&cli.StringFlag{
Name: "format",
Usage: "input data format (only JSON supported for the moment)",
Value: "JSON",
},
}

var selectCommand = &cli.Command{
Name: "select",
HelpName: "select",
Usage: "run SQL queries on objects",
Flags: selectCommandFlags,
CustomHelpTemplate: selectHelpTemplate,
Before: func(c *cli.Context) error {
err := validateSelectCommand(c)
if err != nil {
printError(givenCommand(c), c.Command.Name, err)
}
return err
},
Action: func(c *cli.Context) (err error) {
defer stat.Collect(c.Command.FullName(), &err)()

return Select{
src: c.Args().Get(0),
op: c.Command.Name,
fullCommand: givenCommand(c),
// flags
query: c.String("query"),
compressionType: c.String("compression"),

storageOpts: NewStorageOpts(c),
}.Run(c.Context)
},
}

// Select holds select operation flags and states.
type Select struct {
src string
op string
fullCommand string

query string
compressionType string

// s3 options
storageOpts storage.Options
}

// Run starts copying given source objects to destination.
func (s Select) Run(ctx context.Context) error {
srcurl, err := url.New(s.src)
if err != nil {
printError(s.fullCommand, s.op, err)
return err
}

client, err := storage.NewRemoteClient(ctx, srcurl, s.storageOpts)
if err != nil {
printError(s.fullCommand, s.op, err)
return err
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

objch, err := expandSource(ctx, client, false, srcurl)
if err != nil {
printError(s.fullCommand, s.op, err)
return err
}

var merror error

waiter := parallel.NewWaiter()
errDoneCh := make(chan bool)
writeDoneCh := make(chan bool)
resultCh := make(chan json.RawMessage, 128)

go func() {
defer close(errDoneCh)
for err := range waiter.Err() {
printError(s.fullCommand, s.op, err)
merror = multierror.Append(merror, err)
}
}()

go func() {
defer close(writeDoneCh)
var fatalError error
for {
record, ok := <-resultCh
if !ok {
break
}
if fatalError != nil {
// Drain the channel.
continue
}
if _, err := os.Stdout.Write(append(record, '\n')); err != nil {
// Stop reading upstream. Notably useful for EPIPE.
cancel()
printError(s.fullCommand, s.op, err)
fatalError = err
}
}
}()

for object := range objch {
if object.Type.IsDir() || errorpkg.IsCancelation(object.Err) {
continue
}

if err := object.Err; err != nil {
printError(s.fullCommand, s.op, err)
continue
}

if object.StorageClass.IsGlacier() {
err := fmt.Errorf("object '%v' is on Glacier storage", object)
printError(s.fullCommand, s.op, err)
continue
}

task := s.prepareTask(ctx, client, object.URL, resultCh)
parallel.Run(task, waiter)
}

waiter.Wait()
<-errDoneCh
<-writeDoneCh

return merror
}

func (s Select) prepareTask(ctx context.Context, client *storage.S3, url *url.URL, resultCh chan<- json.RawMessage) func() error {
return func() error {
query := &storage.SelectQuery{
ExpressionType: "SQL",
Expression: s.query,
CompressionType: s.compressionType,
}

return client.Select(ctx, url, query, resultCh)
}
}

func validateSelectCommand(c *cli.Context) error {
if c.Args().Len() != 1 {
return fmt.Errorf("expected source argument")
}

src := c.Args().Get(0)

srcurl, err := url.New(src)
if err != nil {
return err
}

if !srcurl.IsRemote() {
return fmt.Errorf("source must be remote")
}

if !strings.EqualFold(c.String("format"), "JSON") {
return fmt.Errorf("only json supported")
}

return nil
}
74 changes: 74 additions & 0 deletions storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -383,6 +384,79 @@ func (s *S3) Get(
})
}

type SelectQuery struct {
ExpressionType string
Expression string
CompressionType string
}

func (s *S3) Select(ctx context.Context, url *url.URL, query *SelectQuery, resultCh chan<- json.RawMessage) error {
if s.dryRun {
return nil
}

input := &s3.SelectObjectContentInput{
Bucket: aws.String(url.Bucket),
Key: aws.String(url.Path),
ExpressionType: aws.String(query.ExpressionType),
Expression: aws.String(query.Expression),
InputSerialization: &s3.InputSerialization{
CompressionType: aws.String(query.CompressionType),
JSON: &s3.JSONInput{
Type: aws.String("Lines"),
},
},
OutputSerialization: &s3.OutputSerialization{
JSON: &s3.JSONOutput{},
},
}

resp, err := s.api.SelectObjectContentWithContext(ctx, input)
if err != nil {
return err
}

reader, writer := io.Pipe()

go func() {
defer writer.Close()

eventch := resp.EventStream.Reader.Events()
defer resp.EventStream.Close()

for {
select {
case <-ctx.Done():
return
case event, ok := <-eventch:
if !ok {
return
}

switch e := event.(type) {
case *s3.RecordsEvent:
writer.Write(e.Payload)
}
}
}
}()

decoder := json.NewDecoder(reader)
for {
var record json.RawMessage
err := decoder.Decode(&record)
if err == io.EOF {
break
}
if err != nil {
return err
}
resultCh <- record
}

return resp.EventStream.Reader.Err()
}

// Put is a multipart upload operation to upload resources, which implements
// io.Reader interface, into S3 destination.
func (s *S3) Put(
Expand Down

0 comments on commit c2a9d31

Please sign in to comment.