Skip to content

Commit

Permalink
Log Management
Browse files Browse the repository at this point in the history
This change adds support for log management in the manager, agent and
CLI.

The log broker is currently naive and broadcasts subscriptions to all
agents, which in turn need to perform filtering to figure out if the
subscription is relevant to them.

In the future, the broker should be smarter and dispatch subscriptions
only to concerned agents.

The basic logging functionality works.

Fixes #1332

Signed-off-by: Andrea Luzzardi <[email protected]>
  • Loading branch information
aluzzardi committed Oct 24, 2016
1 parent 9f799d5 commit 1c75bba
Show file tree
Hide file tree
Showing 16 changed files with 590 additions and 160 deletions.
53 changes: 38 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"golang.org/x/net/context"
Expand Down Expand Up @@ -145,11 +146,12 @@ func (a *Agent) run(ctx context.Context) {
defer nodeUpdateTicker.Stop()

var (
backoff time.Duration
session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
backoff time.Duration
session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
subscriptions = map[string]context.CancelFunc{}
)

if err := a.worker.Init(ctx); err != nil {
Expand Down Expand Up @@ -184,6 +186,22 @@ func (a *Agent) run(ctx context.Context) {
if err := a.handleSessionMessage(ctx, msg); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
case sub := <-session.subscriptions:
if sub.Close {
if cancel, ok := subscriptions[sub.ID]; ok {
cancel()
}
continue
}

if _, ok := subscriptions[sub.ID]; ok {
// Duplicate subscription
continue
}

subCtx, subCancel := context.WithCancel(ctx)
subscriptions[sub.ID] = subCancel
go a.worker.Subscribe(subCtx, sub)
case <-registered:
log.G(ctx).Debugln("agent: registered")
if ready != nil {
Expand Down Expand Up @@ -385,16 +403,21 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
}
}

func (a *Agent) Publish(ctx context.Context, message api.LogMessage) error {
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
// These should only be best effort and really just buffer until a session is
// ready. Ideally, they would use a separate connection completely.
return a.withSession(ctx, func(session *session) error {
_, err := api.NewLogBrokerClient(session.conn).PublishLogs(ctx,
&api.PublishLogsRequest{
Messages: []api.LogMessage{message},
})
return err
// Publisher returns a LogPublisher for the given subscription
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) exec.LogPublisher {
return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage, close bool) error {
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
// These should only be best effort and really just buffer until a session is
// ready. Ideally, they would use a separate connection completely.
return a.withSession(ctx, func(session *session) error {
_, err := api.NewLogBrokerClient(session.conn).PublishLogs(ctx,
&api.PublishLogsRequest{
SubscriptionID: subscriptionID,
Messages: []api.LogMessage{message},
Close: close,
})
return err
})
})
}

Expand Down
66 changes: 64 additions & 2 deletions agent/exec/container/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,65 @@ func (r *controller) Remove(ctx context.Context) error {
return nil
}

// waitReady waits for a container to be "ready".
// Ready means it's past the started state.
func (r *controller) waitReady(pctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}

ctx, cancel := context.WithCancel(pctx)
defer cancel()

eventq, closed, err := r.adapter.events(ctx)
if err != nil {
return err
}

ctnr, err := r.adapter.inspect(ctx)
if err != nil {
if !isUnknownContainer(err) {
return errors.Wrap(err, "inspect container failed")
}
} else {
switch ctnr.State.Status {
case "running", "exited", "dead":
return nil
}
}

for {
select {
case event := <-eventq:
if !r.matchevent(event) {
continue
}

log.G(ctx).Info(event.Action)

switch event.Action {
case "start":
return nil
}
case <-closed:
// restart!
eventq, closed, err = r.adapter.events(ctx)
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-r.closed:
return r.err
}
}
}

func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
if err := r.checkClosed(); err != nil {
return err
}

apiOptions := types.ContainerLogsOptions{
Follow: options.Follow,

Expand Down Expand Up @@ -435,6 +493,10 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti
}
}

if err := r.waitReady(ctx); err != nil {
return errors.Wrap(err, "container not ready for logs")
}

rc, err := r.adapter.client.ContainerLogs(ctx, r.adapter.container.name(), apiOptions)
if err != nil {
return errors.Wrap(err, "failed getting container logs")
Expand Down Expand Up @@ -494,8 +556,8 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti
Stream: api.LogStream(stream),

Data: parts[1],
}); err != nil {
return errors.Wrap(err, "failed publisher log message")
}, false); err != nil {
return errors.Wrap(err, "failed to publish log message")
}
}
}
Expand Down
8 changes: 1 addition & 7 deletions agent/exec/container/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestControllerFlowIntegration(t *testing.T) {
}

var receivedLogs bool
publisher := logPublisherFn(func(ctx context.Context, message api.LogMessage) error {
publisher := exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage, close bool) error {
receivedLogs = true

switch message.Stream {
Expand Down Expand Up @@ -91,9 +91,3 @@ func TestControllerFlowIntegration(t *testing.T) {
t.Fatalf("expected controller to be closed: %v", err)
}
}

type logPublisherFn func(ctx context.Context, message api.LogMessage) error

func (fn logPublisherFn) Publish(ctx context.Context, message api.LogMessage) error {
return fn(ctx, message)
}
15 changes: 14 additions & 1 deletion agent/exec/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,20 @@ type ControllerLogs interface {

// LogPublisher defines the protocol for receiving a log message.
type LogPublisher interface {
Publish(ctx context.Context, message api.LogMessage) error
Publish(ctx context.Context, message api.LogMessage, close bool) error
}

// LogPublisherFunc implements publisher with just a function.
type LogPublisherFunc func(ctx context.Context, message api.LogMessage, close bool) error

// Publish calls the wrapped function.
func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage, close bool) error {
return fn(ctx, message, close)
}

// LogPublisherProvider defines the protocol for receiving a log publisher
type LogPublisherProvider interface {
Publisher(ctx context.Context, subscriptionID string) LogPublisher
}

// ContainerStatuser reports status of a container.
Expand Down
39 changes: 35 additions & 4 deletions agent/exec/controller_test.mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,45 @@ func (_m *MockLogPublisher) EXPECT() *_MockLogPublisherRecorder {
return _m.recorder
}

func (_m *MockLogPublisher) Publish(ctx context.Context, message api.LogMessage) error {
ret := _m.ctrl.Call(_m, "Publish", ctx, message)
func (_m *MockLogPublisher) Publish(ctx context.Context, message api.LogMessage, close bool) error {
ret := _m.ctrl.Call(_m, "Publish", ctx, message, close)
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockLogPublisherRecorder) Publish(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Publish", arg0, arg1)
func (_mr *_MockLogPublisherRecorder) Publish(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Publish", arg0, arg1, arg2)
}

// Mock of LogPublisherProvider interface
type MockLogPublisherProvider struct {
ctrl *gomock.Controller
recorder *_MockLogPublisherProviderRecorder
}

// Recorder for MockLogPublisherProvider (not exported)
type _MockLogPublisherProviderRecorder struct {
mock *MockLogPublisherProvider
}

func NewMockLogPublisherProvider(ctrl *gomock.Controller) *MockLogPublisherProvider {
mock := &MockLogPublisherProvider{ctrl: ctrl}
mock.recorder = &_MockLogPublisherProviderRecorder{mock}
return mock
}

func (_m *MockLogPublisherProvider) EXPECT() *_MockLogPublisherProviderRecorder {
return _m.recorder
}

func (_m *MockLogPublisherProvider) Publisher(ctx context.Context, subscriptionID string) LogPublisher {
ret := _m.ctrl.Call(_m, "Publisher", ctx, subscriptionID)
ret0, _ := ret[0].(LogPublisher)
return ret0
}

func (_mr *_MockLogPublisherProviderRecorder) Publisher(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Publisher", arg0, arg1)
}

// Mock of ContainerStatuser interface
Expand Down
56 changes: 43 additions & 13 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ type session struct {
conn *grpc.ClientConn
addr string

agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
assignments chan *api.AssignmentsMessage
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
assignments chan *api.AssignmentsMessage
subscriptions chan *api.SubscriptionMessage

registered chan struct{} // closed registration
closed chan struct{}
Expand All @@ -47,13 +48,14 @@ type session struct {

func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session {
s := &session{
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
assignments: make(chan *api.AssignmentsMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
assignments: make(chan *api.AssignmentsMessage),
subscriptions: make(chan *api.SubscriptionMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
}

// TODO(stevvooe): Need to move connection management up a level or create
Expand Down Expand Up @@ -102,6 +104,7 @@ func (s *session) run(ctx context.Context, delay time.Duration, description *api
go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)
go runctx(ctx, s.closed, s.errs, s.logSubscriptions)

close(s.registered)
}
Expand Down Expand Up @@ -217,6 +220,33 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess
}
}

func (s *session) logSubscriptions(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
log.Debugf("")

client := api.NewLogBrokerClient(s.conn)
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
if err != nil {
return err
}
defer subscriptions.CloseSend()

for {
resp, err := subscriptions.Recv()
if err != nil {
return err
}

select {
case s.subscriptions <- resp:
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
return ctx.Err()
}
}
}

func (s *session) watch(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"})
log.Debugf("")
Expand Down
Loading

0 comments on commit 1c75bba

Please sign in to comment.