Skip to content

Commit

Permalink
Merge pull request kayac#351 from uchida/use-next-token-to-dedup-logs
Browse files Browse the repository at this point in the history
use nextToken to deduplicate cloudwatch logs
  • Loading branch information
fujiwara committed Nov 30, 2021
2 parents efc50cd + d616321 commit e3dd249
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 16 deletions.
15 changes: 7 additions & 8 deletions ecspresso.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ func (d *App) DescribeTasksInput(task *ecs.Task) *ecs.DescribeTasksInput {
}
}

func (d *App) GetLogEventsInput(logGroup string, logStream string, startAt int64) *cloudwatchlogs.GetLogEventsInput {
func (d *App) GetLogEventsInput(logGroup string, logStream string, startAt int64, nextToken *string) *cloudwatchlogs.GetLogEventsInput {
return &cloudwatchlogs.GetLogEventsInput{
LogGroupName: aws.String(logGroup),
LogStreamName: aws.String(logStream),
StartTime: aws.Int64(startAt),
NextToken: nextToken,
}
}

Expand Down Expand Up @@ -247,23 +248,21 @@ func (d *App) DescribeTaskDefinition(ctx context.Context, tdArn string) (*TaskDe
return tdToTaskDefinitionInput(out.TaskDefinition, out.Tags), nil
}

func (d *App) GetLogEvents(ctx context.Context, logGroup string, logStream string, startedAt time.Time) (int, error) {
func (d *App) GetLogEvents(ctx context.Context, logGroup string, logStream string, startedAt time.Time, nextToken *string) (*string, error) {
ms := startedAt.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
out, err := d.cwl.GetLogEventsWithContext(ctx, d.GetLogEventsInput(logGroup, logStream, ms))
out, err := d.cwl.GetLogEventsWithContext(ctx, d.GetLogEventsInput(logGroup, logStream, ms, nextToken))
if err != nil {
return 0, err
return nextToken, err
}
if len(out.Events) == 0 {
return 0, nil
return nextToken, nil
}
lines := 0
for _, event := range out.Events {
for _, line := range formatLogEvent(event, TerminalWidth) {
fmt.Println(line)
lines++
}
}
return lines, nil
return out.NextForwardToken, nil
}

func NewApp(conf *Config) (*App, error) {
Expand Down
10 changes: 2 additions & 8 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/morikuni/aec"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -147,18 +146,13 @@ func (d *App) WaitRunTask(ctx context.Context, task *ecs.Task, watchContainer *e

go func() {
ticker := time.NewTicker(5 * time.Second)
var lines int
var nextToken *string
for {
select {
case <-waitCtx.Done():
return
case <-ticker.C:
if isTerminal {
for i := 0; i < lines; i++ {
fmt.Print(aec.EraseLine(aec.EraseModes.All), aec.PreviousLine(1))
}
}
lines, _ = d.GetLogEvents(waitCtx, logGroup, logStream, startedAt)
nextToken, _ = d.GetLogEvents(waitCtx, logGroup, logStream, startedAt, nextToken)
}
}
}()
Expand Down

0 comments on commit e3dd249

Please sign in to comment.