Skip to content

Commit

Permalink
logs: Switch logs publishing to a stream.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Luzzardi <[email protected]>
  • Loading branch information
aluzzardi committed Nov 3, 2016
1 parent 4c435f0 commit 3058ba9
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 236 deletions.
40 changes: 23 additions & 17 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,31 +406,37 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
}

// Publisher returns a LogPublisher for the given subscription
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) exec.LogPublisher {
return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
// These should only be best effort and really just buffer until a session is
// ready. Ideally, they would use a separate connection completely.
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, error) {
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
// These should only be best effort and really just buffer until a session is
// ready. Ideally, they would use a separate connection completely.

// If the context got cancelled, send a close.
var (
err error
client api.LogBroker_PublishLogsClient
)

close := false
err = a.withSession(ctx, func(session *session) error {
client, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
return err
})
if err != nil {
return nil, err
}

return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
select {
case <-ctx.Done():
close = true
client.CloseSend()
return ctx.Err()
default:
}

return a.withSession(ctx, func(session *session) error {
_, err := api.NewLogBrokerClient(session.conn).PublishLogs(ctx,
&api.PublishLogsRequest{
SubscriptionID: subscriptionID,
Messages: []api.LogMessage{message},
Close: close,
})
return err
return client.Send(&api.PublishLogsMessage{
SubscriptionID: subscriptionID,
Messages: []api.LogMessage{message},
})
})
}), nil
}

// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
Expand Down
2 changes: 1 addition & 1 deletion agent/exec/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage)

// LogPublisherProvider defines the protocol for receiving a log publisher
type LogPublisherProvider interface {
Publisher(ctx context.Context, subscriptionID string) LogPublisher
Publisher(ctx context.Context, subscriptionID string) (LogPublisher, error)
}

// ContainerStatuser reports status of a container.
Expand Down
5 changes: 3 additions & 2 deletions agent/exec/controller_test.mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,11 @@ func (_m *MockLogPublisherProvider) EXPECT() *_MockLogPublisherProviderRecorder
return _m.recorder
}

func (_m *MockLogPublisherProvider) Publisher(ctx context.Context, subscriptionID string) LogPublisher {
func (_m *MockLogPublisherProvider) Publisher(ctx context.Context, subscriptionID string) (LogPublisher, error) {
ret := _m.ctrl.Call(_m, "Publisher", ctx, subscriptionID)
ret0, _ := ret[0].(LogPublisher)
return ret0
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockLogPublisherProviderRecorder) Publisher(arg0, arg1 interface{}) *gomock.Call {
Expand Down
5 changes: 4 additions & 1 deletion agent/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,10 @@ func (w *worker) updateTaskStatus(ctx context.Context, tx *bolt.Tx, taskID strin
func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error {
log.G(ctx).Debugf("Received subscription %s (selector: %v)", subscription.ID, subscription.Selector)

publisher := w.publisherProvider.Publisher(ctx, subscription.ID)
publisher, err := w.publisherProvider.Publisher(ctx, subscription.ID)
if err != nil {
return err
}
// Send a close once we're done
defer publisher.Publish(ctx, api.LogMessage{})

Expand Down
4 changes: 2 additions & 2 deletions agent/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
type testPublisherProvider struct {
}

func (tpp *testPublisherProvider) Publisher(ctx context.Context, subscriptionID string) exec.LogPublisher {
func (tpp *testPublisherProvider) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, error) {
return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
log.G(ctx).WithFields(logrus.Fields{
"subscription": subscriptionID,
Expand All @@ -25,7 +25,7 @@ func (tpp *testPublisherProvider) Publisher(ctx context.Context, subscriptionID
"service.id": message.Context.ServiceID,
}).Info(message.Data)
return nil
})
}), nil
}

func TestWorkerAssign(t *testing.T) {
Expand Down
Loading

0 comments on commit 3058ba9

Please sign in to comment.