Skip to content

Commit

Permalink
Log Management
Browse files Browse the repository at this point in the history
This change adds support for log management in the manager, agent and
CLI.

The log broker is currently naive and broadcasts subscriptions to all
agents, which in turn need to perform filtering to figure out if the
subscription is relevant to them.

In the future, the broker should be smarter and dispatch subscriptions
only to concerned agents.

The basic logging functionality works.

Fixes #1332

Signed-off-by: Andrea Luzzardi <[email protected]>
  • Loading branch information
aluzzardi committed Oct 22, 2016
1 parent dfa5199 commit d0dcf1b
Show file tree
Hide file tree
Showing 14 changed files with 551 additions and 135 deletions.
53 changes: 38 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"golang.org/x/net/context"
Expand Down Expand Up @@ -145,11 +146,12 @@ func (a *Agent) run(ctx context.Context) {
defer nodeUpdateTicker.Stop()

var (
backoff time.Duration
session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
backoff time.Duration
session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
subscriptions = map[string]context.CancelFunc{}
)

if err := a.worker.Init(ctx); err != nil {
Expand Down Expand Up @@ -184,6 +186,22 @@ func (a *Agent) run(ctx context.Context) {
if err := a.handleSessionMessage(ctx, msg); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
case sub := <-session.subscriptions:
if sub.Close {
if cancel, ok := subscriptions[sub.ID]; ok {
cancel()
}
continue
}

if _, ok := subscriptions[sub.ID]; ok {
// Duplicate subscription
continue
}

subCtx, subCancel := context.WithCancel(ctx)
subscriptions[sub.ID] = subCancel
go a.worker.Subscribe(subCtx, sub)
case <-registered:
log.G(ctx).Debugln("agent: registered")
if ready != nil {
Expand Down Expand Up @@ -385,16 +403,21 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
}
}

func (a *Agent) Publish(ctx context.Context, message api.LogMessage) error {
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
// These should only be best effort and really just buffer until a session is
// ready. Ideally, they would use a separate connection completely.
return a.withSession(ctx, func(session *session) error {
_, err := api.NewLogBrokerClient(session.conn).PublishLogs(ctx,
&api.PublishLogsRequest{
Messages: []api.LogMessage{message},
})
return err
// Publisher returns a LogPublisher for the given subscription
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) exec.LogPublisher {
return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage, close bool) error {
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
// These should only be best effort and really just buffer until a session is
// ready. Ideally, they would use a separate connection completely.
return a.withSession(ctx, func(session *session) error {
_, err := api.NewLogBrokerClient(session.conn).PublishLogs(ctx,
&api.PublishLogsRequest{
SubscriptionID: subscriptionID,
Messages: []api.LogMessage{message},
Close: close,
})
return err
})
})
}

Expand Down
64 changes: 63 additions & 1 deletion agent/exec/container/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,65 @@ func (r *controller) Remove(ctx context.Context) error {
return nil
}

// waitReady waits for a container to be "ready".
// Ready means it's past the started state.
func (r *controller) waitReady(pctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}

ctx, cancel := context.WithCancel(pctx)
defer cancel()

eventq, closed, err := r.adapter.events(ctx)
if err != nil {
return err
}

ctnr, err := r.adapter.inspect(ctx)
if err != nil {
if !isUnknownContainer(err) {
return errors.Wrap(err, "inspect container failed")
}
} else {
switch ctnr.State.Status {
case "running", "exited", "dead":
return nil
}
}

for {
select {
case event := <-eventq:
if !r.matchevent(event) {
continue
}

log.G(ctx).Info(event.Action)

switch event.Action {
case "start":
return nil
}
case <-closed:
// restart!
eventq, closed, err = r.adapter.events(ctx)
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-r.closed:
return r.err
}
}
}

func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
if err := r.checkClosed(); err != nil {
return err
}

apiOptions := types.ContainerLogsOptions{
Follow: options.Follow,

Expand Down Expand Up @@ -435,6 +493,10 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti
}
}

if err := r.waitReady(ctx); err != nil {
return errors.Wrap(err, "container not ready for logs")
}

rc, err := r.adapter.client.ContainerLogs(ctx, r.adapter.container.name(), apiOptions)
if err != nil {
return errors.Wrap(err, "failed getting container logs")
Expand Down Expand Up @@ -494,7 +556,7 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti
Stream: api.LogStream(stream),

Data: parts[1],
}); err != nil {
}, false); err != nil {
return errors.Wrap(err, "failed publisher log message")
}
}
Expand Down
8 changes: 4 additions & 4 deletions agent/exec/container/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestControllerFlowIntegration(t *testing.T) {
}

var receivedLogs bool
publisher := logPublisherFn(func(ctx context.Context, message api.LogMessage) error {
publisher := logPublisherFn(func(ctx context.Context, message api.LogMessage, close bool) error {
receivedLogs = true

switch message.Stream {
Expand Down Expand Up @@ -92,8 +92,8 @@ func TestControllerFlowIntegration(t *testing.T) {
}
}

type logPublisherFn func(ctx context.Context, message api.LogMessage) error
type logPublisherFn func(ctx context.Context, message api.LogMessage, close bool) error

func (fn logPublisherFn) Publish(ctx context.Context, message api.LogMessage) error {
return fn(ctx, message)
func (fn logPublisherFn) Publish(ctx context.Context, message api.LogMessage, close bool) error {
return fn(ctx, message, close)
}
15 changes: 14 additions & 1 deletion agent/exec/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,20 @@ type ControllerLogs interface {

// LogPublisher defines the protocol for receiving a log message.
type LogPublisher interface {
Publish(ctx context.Context, message api.LogMessage) error
Publish(ctx context.Context, message api.LogMessage, close bool) error
}

// LogPublisherFunc implements publisher with just a function.
type LogPublisherFunc func(ctx context.Context, message api.LogMessage, close bool) error

// Publish calls the wrapped function.
func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage, close bool) error {
return fn(ctx, message, close)
}

// LogPublisherProvider defines the protocol for receiving a log publisher
type LogPublisherProvider interface {
Publisher(ctx context.Context, subscriptionID string) LogPublisher
}

// ContainerStatuser reports status of a container.
Expand Down
56 changes: 43 additions & 13 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ type session struct {
conn *grpc.ClientConn
addr string

agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
assignments chan *api.AssignmentsMessage
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
assignments chan *api.AssignmentsMessage
subscriptions chan *api.SubscriptionMessage

registered chan struct{} // closed registration
closed chan struct{}
Expand All @@ -47,13 +48,14 @@ type session struct {

func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session {
s := &session{
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
assignments: make(chan *api.AssignmentsMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
assignments: make(chan *api.AssignmentsMessage),
subscriptions: make(chan *api.SubscriptionMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
}

// TODO(stevvooe): Need to move connection management up a level or create
Expand Down Expand Up @@ -102,6 +104,7 @@ func (s *session) run(ctx context.Context, delay time.Duration, description *api
go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)
go runctx(ctx, s.closed, s.errs, s.logSubscriptions)

close(s.registered)
}
Expand Down Expand Up @@ -217,6 +220,33 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess
}
}

func (s *session) logSubscriptions(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
log.Debugf("")

client := api.NewLogBrokerClient(s.conn)
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
if err != nil {
return err
}
defer subscriptions.CloseSend()

for {
resp, err := subscriptions.Recv()
if err != nil {
return err
}

select {
case s.subscriptions <- resp:
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
}

func (s *session) watch(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"})
log.Debugf("")
Expand Down
51 changes: 22 additions & 29 deletions agent/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,24 @@ import (
// taskManager manages all aspects of task execution and reporting for an agent
// through state management.
type taskManager struct {
task *api.Task
ctlr exec.Controller
reporter StatusReporter
publisher exec.LogPublisher
task *api.Task
ctlr exec.Controller
reporter StatusReporter

updateq chan *api.Task

shutdown chan struct{}
closed chan struct{}
}

func newTaskManager(ctx context.Context, task *api.Task, ctlr exec.Controller, reporter StatusReporter, publisher exec.LogPublisher) *taskManager {
func newTaskManager(ctx context.Context, task *api.Task, ctlr exec.Controller, reporter StatusReporter) *taskManager {
t := &taskManager{
task: task.Copy(),
ctlr: ctlr,
reporter: reporter,
publisher: publisher,
updateq: make(chan *api.Task),
shutdown: make(chan struct{}),
closed: make(chan struct{}),
task: task.Copy(),
ctlr: ctlr,
reporter: reporter,
updateq: make(chan *api.Task),
shutdown: make(chan struct{}),
closed: make(chan struct{}),
}
go t.run(ctx)
return t
Expand Down Expand Up @@ -66,29 +64,24 @@ func (tm *taskManager) Close() error {
}
}

func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) {
ctx = log.WithModule(ctx, "taskmanager")

logCtlr, ok := tm.ctlr.(exec.ControllerLogs)
if !ok {
return // no logs available
}
if err := logCtlr.Logs(ctx, publisher, options); err != nil {
log.G(ctx).WithError(err).Errorf("logs call failed")
}
}

func (tm *taskManager) run(ctx context.Context) {
ctx, cancelAll := context.WithCancel(ctx)
defer cancelAll() // cancel all child operations on exit.

ctx = log.WithModule(ctx, "taskmanager")

go func() {
// TODO(stevvooe): Obviously, this needs to be moved elsewhere such
// that it is only activated by subscriptions.

logCtlr, ok := tm.ctlr.(exec.ControllerLogs)
if !ok {
return // no logs available
}
for {
if err := logCtlr.Logs(ctx, tm.publisher, api.LogSubscriptionOptions{
Follow: true,
}); err != nil {
log.G(ctx).WithError(err).Errorf("logs call failed")
}
}
}()

var (
opctx context.Context
cancel context.CancelFunc
Expand Down
Loading

0 comments on commit d0dcf1b

Please sign in to comment.