Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Management #1679

Merged
merged 11 commits into from
Nov 4, 2016
Merged

Log Management #1679

merged 11 commits into from
Nov 4, 2016

Conversation

aluzzardi
Copy link
Member

@aluzzardi aluzzardi commented Oct 22, 2016

This change adds support for log management in the manager, agent and
CLI.

The log broker is currently naive and broadcasts subscriptions to all
agents, which in turn need to perform filtering to figure out if the
subscription is relevant to them.

In the future, the broker should be smarter and dispatch subscriptions
only to concerned agents.

The basic logging functionality works.

Fixes #1332

/cc @stevvooe @aaronlehmann @diogomonica

listeners map[*statusReporterKey]struct{}
secrets *secrets
db *bolt.DB
taskevents *events.Broadcaster
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be closed once it is no longer being used, otherwise it will leak a goroutine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the LogBroker - however, there's no way to terminate a Worker so it should never leak

return &LogBroker{
broadcaster: events.NewBroadcaster(),
broadcaster: events.NewBroadcaster(),
subscriptions: events.NewBroadcaster(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with these - they need to be closed when the log broker stops.

Take care when closing them that there are no sinks still attached to the broadcasters; otherwise it can deadlock. We have a wrapper in manager/state/watch that we use to protect against this, which you may find useful here.

ctx, cancel := context.WithCancel(ctx)
defer cancel()

log.G(ctx).Infof("Received subscription %s", subscription.ID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably a good idea to log the selectors as well.

}

ctx, cancel := context.WithCancel(pctx)
defer cancel()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the cancel useful for? Do any of the function calls below spawn goroutines that outlive this function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

The WithCancel still seems to be in the code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that one in the controller is legit - it cancels out the event stream as soon as we hit a start event

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it now, thanks.


func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the cancel useful for? Do any of the function calls below spawn goroutines that outlive this function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - removed the sub-context


Data: parts[1],
}, false); err != nil {
return errors.Wrap(err, "failed publisher log message")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"failed to publish log message"?

@aluzzardi aluzzardi force-pushed the logbroker branch 2 times, most recently from 860c496 to 23a5c94 Compare October 22, 2016 01:44
@aluzzardi aluzzardi changed the title [WIP] Log Management Log Management Oct 22, 2016
@aluzzardi aluzzardi force-pushed the logbroker branch 3 times, most recently from f0a77e1 to 78e55a4 Compare October 22, 2016 02:10
@@ -69,3 +91,9 @@ func TestControllerFlowIntegration(t *testing.T) {
t.Fatalf("expected controller to be closed: %v", err)
}
}

type logPublisherFn func(ctx context.Context, message api.LogMessage, close bool) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary since we define an exported version in the exec package.

Copy link
Member Author

@aluzzardi aluzzardi Oct 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exec defines a log publisher, not a publish provider

Copy link
Member Author

@aluzzardi aluzzardi Oct 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, this is old code. Fixed

type LogPublisherFunc func(ctx context.Context, message api.LogMessage, close bool) error

// Publish calls the wrapped function.
func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage, close bool) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to propagate close here. Just cancel the context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How?

The only thing that can send a close is agent.Publisher (LogPublisherProvider) since down in the controller nobody has access to PublishLogsRequest, and I don't see how it can watch for context cancellations

for {
// so, message header is 8 bytes, treat as uint64, pull stream off MSB
var header uint64
if err := binary.Read(brd, binary.BigEndian, &header); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this should be moved into the adapter.

@aluzzardi aluzzardi force-pushed the logbroker branch 2 times, most recently from e5ef9ff to 1c75bba Compare October 24, 2016 19:16
@codecov-io
Copy link

codecov-io commented Oct 24, 2016

Current coverage is 55.32% (diff: 36.05%)

Merging #1679 into master will decrease coverage by 0.58%

@@             master      #1679   diff @@
==========================================
  Files            96         94     -2   
  Lines         15005      15190   +185   
  Methods           0          0          
  Messages          0          0          
  Branches          0          0          
==========================================
+ Hits           8390       8404    +14   
- Misses         5504       5685   +181   
+ Partials       1111       1101    -10   

Sunburst

Powered by Codecov. Last update be97f7f...3058ba9

@aluzzardi aluzzardi force-pushed the logbroker branch 4 times, most recently from 51da25f to ad6d3b1 Compare October 24, 2016 23:54
@aluzzardi
Copy link
Member Author

@aaronlehmann @stevvooe PTAL

@aaronlehmann
Copy link
Collaborator

I've updated the LogBroker - however, there's no way to terminate a Worker so it should never leak

I don't understand. Doesn't docker swarm leave terminate the worker?

@aaronlehmann
Copy link
Collaborator

@aluzzardi: Unit tests seemed to hang in CI.

@aluzzardi
Copy link
Member Author

aluzzardi commented Oct 25, 2016

I don't understand. Doesn't docker swarm leave terminate the worker?

It just stops the agent.

EDIT: Rephrasing. When you docker swarm leave, we stop the agent. All it does is stop the agent from going through its main processing loop, the Worker is never cleanly stopped. We'd need to change that.

I could change the Worker interface, add a Stop there and plumb it into the agent - the problem now is the wrapper. It'd be weird if the agent imported stuff from the manager, any suggestion?

@aluzzardi
Copy link
Member Author

aluzzardi commented Oct 25, 2016

@aluzzardi: Unit tests seemed to hang in CI.

make test works locally :/ I'll try make ci

UPDATE: Found the problem. It only happens with GOMAXPROCS=1 and its due to the tests. Basically, they rely that the "agent" connects to the broker before the "client" sends a logs command. In real life that should never happen (the disconnected agent shouldn't have containers matching the selector anyway) but will be enhanced in a follow up anyway once we implement subscription filtering in the manager.

In the meantime, I had to add a sleep - I tried different other approaches but none had a 100% success rate.

@@ -712,6 +729,7 @@ func (m *Manager) becomeLeader(ctx context.Context) {
// becomeFollower shuts down the subsystems that are only run by the leader.
func (m *Manager) becomeFollower() {
m.dispatcher.Stop()
m.logbroker.Stop()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like calling m.logbroker.Stop multiple times will panic. Since both becomeFollower and (*Manager).Stop can call m.logbroker.Stop, either logbroker's Stop function should protect against this (as I believe most Stop functions do), or the manager should set m.logbroker to nil after calling Stop and check before each use.

Copy link
Collaborator

@aaronlehmann aaronlehmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM

Here are the remaining few things that I think should be addressed:

@aluzzardi
Copy link
Member Author

@aaronlehmann Thanks! Agreed on all counts, will make the changes

@aaronlehmann
Copy link
Collaborator

Oh, interesting, CI found a race. SubscribeLogs uses the queue created in Run without locking. Stop should probably wait for all outstanding RPCs to terminate before returning, and RPCs should error out if the log broker is not currently running. I think I mentioned this before but maybe the comment got lost.

@aluzzardi
Copy link
Member Author

@aaronlehmann Yeah, right, I was looking into that now.

I'm seeing a similar pattern in dispatcher as well (check if running on every RPC).

I'll adopt the same pattern but I'm wondering if that's repeated work. Shouldn't the manager basically take care of 1) stop routing traffic to the service 2) call stop and do the opposite on promotion? That way our services could be "dumber" (in terms of init/deinit logic)

Building on that, we could actually have a Servicer (bad name, I know) interface with Run & Stop and the manager could simply keep a list of services and perform the Run + Mount on startup/promotion and Umount + Stop on demotion. Then adding a new service such as the broker would just be a matter of appending the object to the list.

@aluzzardi
Copy link
Member Author

Regarding the above - I'm also wondering if the broker really needs a Stop function. Wouldn't it be simpler if the demotion code just stopped the actual server altogether, cancelling all active connections?

@aaronlehmann
Copy link
Collaborator

How do you stop the server without killing the whole GRPC listener?

@aaronlehmann
Copy link
Collaborator

I'll adopt the same pattern but I'm wondering if that's repeated work. Shouldn't the manager basically take care of 1) stop routing traffic to the service 2) call stop and do the opposite on promotion? That way our services could be "dumber" (in terms of init/deinit logic)

Maybe, but it's not easy. (1) probably involves some codegenned wrappers, unless there's some way I don't know of to "stop traffic" for a service (there is no Unregister function).

@aluzzardi
Copy link
Member Author

Just pushed an update addressing the remaining comments (I only ran make test so other things may fail, I'll fix that tomorrow).

There's a bunch of different changes but the most relevant are:

If worker creates a broadcaster, it needs to have a well-defined lifecycle so the broadcaster's resources can be freed. There are some real-world cases in Docker where multiple workers would be created without a restart in between; for example after encryption-at-rest lands each attempt to unlock the manager will create a worker.

@aaronlehmann Added stop safeguards in 9a03850

https://github.com/docker/swarmkit/pull/1679/files#r85861157 needs to be addressed

This is about the agent leaking resources (worker event.Broadcaster) because we don't cleanly shut down the worker.

I've changed the worker interface to add a Close function (75a4aa5) and also moved the manager/state/watch into its own package (ab301df).

@stevvooe Can you verify if the worker interface and agent integration look alright?
@aaronlehmann Could you double check if the leaks are gone?

I'd like https://github.com/docker/swarmkit/pull/1679/files#r85228089 to be considered before merging.

That's about logs being unary RPCs rather than streams.

I've implemented that in 49e48be

@stevvooe @aaronlehmann Are you happy with how it looks?
@LK4D4 I had to change the codegen wrapper to support client streaming. Turns out, the interface for both Client and Client&Server is the same. Could you check if it looks alright?

@@ -228,7 +235,13 @@ func testLogBrokerEnv(t *testing.T) (context.Context, *LogBroker, *ca.SecurityCo
}
brokerClient := api.NewLogBrokerClient(brokerCc)

go func() {
broker.Run(ctx)
}()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Don't need a closure.

}))
}

func (lb *LogBroker) publish(log *api.PublishLogsRequest) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think PublishLogs needs to use this.

lb.logQueue.Close()
lb.subscriptionQueue.Close()

return nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good practice for Stop to block until all current RPCs are finished. I don't see anything especially dangerous in the RPCs like interactions with Raft, but we may add something in the future. Also, right now it's theoretically possible for an RPC started before a leader reelection to interact with the new queue created by Run, which feels wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the same pattern as the dispatcher and keymanager - neither of them has a blocking Stop (and they do interact with Raft).

Do you want me to make stop blocking or keeping it consistent with the rest?

LogSelector selector = 1;

LogSubscriptionOptions options = 2;
}

message SubscribeLogsMessage {
repeated LogMessage messages = 1 [(gogoproto.nullable) = false];
LogMessage message = 1 [(gogoproto.nullable) = false];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still seems useful for this to be a repeated field so we can send down multiple messages at a time if we later decide to. That will probably cut down some framing overhead.

// the contents of this message.
bool close = 3;
// Messages is the log message for publishing.
LogMessage message = 2 [(gogoproto.nullable) = false];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I think there's a valid use case for sending many log messages at once even though this is a stream.

@aluzzardi
Copy link
Member Author

@aaronlehmann Addressed your comments except for blocking Stop (waiting your response on whether we should do it or not)

@aaronlehmann
Copy link
Collaborator

Sorry for the misinformation about blocking Stop. I was sure that dispatcher did that way. Turns out it was changed here: 6ec2d00

Let's keep Stop non-blocking for now, to be consistent with dispatcher and ca.

apiOptions := types.ContainerLogsOptions{
Follow: options.Follow,

// TODO(stevvooe): Parse timestamp out of message. This
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is no longer valid. We parse below. :)

Sorry for not removing.

This change adds support for log management in the manager, agent and
CLI.

The log broker is currently naive and broadcasts subscriptions to all
agents, which in turn need to perform filtering to figure out if the
subscription is relevant to them.

In the future, the broker should be smarter and dispatch subscriptions
only to concerned agents.

The basic logging functionality works.

Fixes moby#1332

Signed-off-by: Andrea Luzzardi <[email protected]>
The broker now keeps track of all active subscriptions. When a new agent
joins, it will receive all actives before receiving new subscriptions.

Signed-off-by: Andrea Luzzardi <[email protected]>
Copy link
Collaborator

@aaronlehmann aaronlehmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@aluzzardi aluzzardi merged commit 2eaae1a into moby:master Nov 4, 2016
@aluzzardi aluzzardi deleted the logbroker branch November 4, 2016 19:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants