-
Notifications
You must be signed in to change notification settings - Fork 612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Log Management #1679
Log Management #1679
Conversation
listeners map[*statusReporterKey]struct{} | ||
secrets *secrets | ||
db *bolt.DB | ||
taskevents *events.Broadcaster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be closed once it is no longer being used, otherwise it will leak a goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the LogBroker - however, there's no way to terminate a Worker so it should never leak
return &LogBroker{ | ||
broadcaster: events.NewBroadcaster(), | ||
broadcaster: events.NewBroadcaster(), | ||
subscriptions: events.NewBroadcaster(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with these - they need to be closed when the log broker stops.
Take care when closing them that there are no sinks still attached to the broadcasters; otherwise it can deadlock. We have a wrapper in manager/state/watch
that we use to protect against this, which you may find useful here.
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
|
||
log.G(ctx).Infof("Received subscription %s", subscription.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably a good idea to log the selectors as well.
} | ||
|
||
ctx, cancel := context.WithCancel(pctx) | ||
defer cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the cancel useful for? Do any of the function calls below spawn goroutines that outlive this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
The WithCancel
still seems to be in the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that one in the controller is legit - it cancels out the event stream as soon as we hit a start
event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get it now, thanks.
|
||
func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error { | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the cancel useful for? Do any of the function calls below spawn goroutines that outlive this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right - removed the sub-context
|
||
Data: parts[1], | ||
}, false); err != nil { | ||
return errors.Wrap(err, "failed publisher log message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"failed to publish log message"?
860c496
to
23a5c94
Compare
f0a77e1
to
78e55a4
Compare
@@ -69,3 +91,9 @@ func TestControllerFlowIntegration(t *testing.T) { | |||
t.Fatalf("expected controller to be closed: %v", err) | |||
} | |||
} | |||
|
|||
type logPublisherFn func(ctx context.Context, message api.LogMessage, close bool) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't necessary since we define an exported version in the exec
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exec defines a log publisher, not a publish provider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind, this is old code. Fixed
type LogPublisherFunc func(ctx context.Context, message api.LogMessage, close bool) error | ||
|
||
// Publish calls the wrapped function. | ||
func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage, close bool) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to propagate close
here. Just cancel the context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How?
The only thing that can send a close is agent.Publisher
(LogPublisherProvider
) since down in the controller nobody has access to PublishLogsRequest
, and I don't see how it can watch for context cancellations
for { | ||
// so, message header is 8 bytes, treat as uint64, pull stream off MSB | ||
var header uint64 | ||
if err := binary.Read(brd, binary.BigEndian, &header); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of this should be moved into the adapter.
e5ef9ff
to
1c75bba
Compare
Current coverage is 55.32% (diff: 36.05%)@@ master #1679 diff @@
==========================================
Files 96 94 -2
Lines 15005 15190 +185
Methods 0 0
Messages 0 0
Branches 0 0
==========================================
+ Hits 8390 8404 +14
- Misses 5504 5685 +181
+ Partials 1111 1101 -10
|
51da25f
to
ad6d3b1
Compare
@aaronlehmann @stevvooe PTAL |
I don't understand. Doesn't |
@aluzzardi: Unit tests seemed to hang in CI. |
It just stops the agent. EDIT: Rephrasing. When you I could change the |
UPDATE: Found the problem. It only happens with In the meantime, I had to add a sleep - I tried different other approaches but none had a 100% success rate. |
@@ -712,6 +729,7 @@ func (m *Manager) becomeLeader(ctx context.Context) { | |||
// becomeFollower shuts down the subsystems that are only run by the leader. | |||
func (m *Manager) becomeFollower() { | |||
m.dispatcher.Stop() | |||
m.logbroker.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like calling m.logbroker.Stop
multiple times will panic. Since both becomeFollower
and (*Manager).Stop
can call m.logbroker.Stop
, either logbroker's Stop
function should protect against this (as I believe most Stop
functions do), or the manager should set m.logbroker
to nil
after calling Stop
and check before each use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM
Here are the remaining few things that I think should be addressed:
- If
worker
creates a broadcaster, it needs to have a well-defined lifecycle so the broadcaster's resources can be freed. There are some real-world cases in Docker where multiple workers would be created without a restart in between; for example after encryption-at-rest lands each attempt to unlock the manager will create aworker
. - https://github.com/docker/swarmkit/pull/1679/files#r85861157 needs to be addressed
- I'd like https://github.com/docker/swarmkit/pull/1679/files#r85228089 to be considered before merging.
@aaronlehmann Thanks! Agreed on all counts, will make the changes |
Oh, interesting, CI found a race. |
@aaronlehmann Yeah, right, I was looking into that now. I'm seeing a similar pattern in dispatcher as well (check if running on every RPC). I'll adopt the same pattern but I'm wondering if that's repeated work. Shouldn't the manager basically take care of 1) stop routing traffic to the service 2) call stop and do the opposite on promotion? That way our services could be "dumber" (in terms of init/deinit logic) Building on that, we could actually have a |
Regarding the above - I'm also wondering if the broker really needs a |
How do you stop the server without killing the whole GRPC listener? |
Maybe, but it's not easy. (1) probably involves some codegenned wrappers, unless there's some way I don't know of to "stop traffic" for a service (there is no |
Just pushed an update addressing the remaining comments (I only ran There's a bunch of different changes but the most relevant are:
@aaronlehmann Added stop safeguards in 9a03850
This is about the agent leaking resources (worker event.Broadcaster) because we don't cleanly shut down the worker. I've changed the worker interface to add a @stevvooe Can you verify if the worker interface and agent integration look alright?
That's about logs being unary RPCs rather than streams. I've implemented that in 49e48be @stevvooe @aaronlehmann Are you happy with how it looks? |
@@ -228,7 +235,13 @@ func testLogBrokerEnv(t *testing.T) (context.Context, *LogBroker, *ca.SecurityCo | |||
} | |||
brokerClient := api.NewLogBrokerClient(brokerCc) | |||
|
|||
go func() { | |||
broker.Run(ctx) | |||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Don't need a closure.
})) | ||
} | ||
|
||
func (lb *LogBroker) publish(log *api.PublishLogsRequest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think PublishLogs
needs to use this.
lb.logQueue.Close() | ||
lb.subscriptionQueue.Close() | ||
|
||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's good practice for Stop
to block until all current RPCs are finished. I don't see anything especially dangerous in the RPCs like interactions with Raft, but we may add something in the future. Also, right now it's theoretically possible for an RPC started before a leader reelection to interact with the new queue created by Run
, which feels wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed the same pattern as the dispatcher and keymanager - neither of them has a blocking Stop
(and they do interact with Raft).
Do you want me to make stop blocking or keeping it consistent with the rest?
LogSelector selector = 1; | ||
|
||
LogSubscriptionOptions options = 2; | ||
} | ||
|
||
message SubscribeLogsMessage { | ||
repeated LogMessage messages = 1 [(gogoproto.nullable) = false]; | ||
LogMessage message = 1 [(gogoproto.nullable) = false]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still seems useful for this to be a repeated field so we can send down multiple messages at a time if we later decide to. That will probably cut down some framing overhead.
// the contents of this message. | ||
bool close = 3; | ||
// Messages is the log message for publishing. | ||
LogMessage message = 2 [(gogoproto.nullable) = false]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, I think there's a valid use case for sending many log messages at once even though this is a stream.
@aaronlehmann Addressed your comments except for blocking Stop (waiting your response on whether we should do it or not) |
Sorry for the misinformation about blocking Let's keep |
apiOptions := types.ContainerLogsOptions{ | ||
Follow: options.Follow, | ||
|
||
// TODO(stevvooe): Parse timestamp out of message. This |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is no longer valid. We parse below. :)
Sorry for not removing.
This change adds support for log management in the manager, agent and CLI. The log broker is currently naive and broadcasts subscriptions to all agents, which in turn need to perform filtering to figure out if the subscription is relevant to them. In the future, the broker should be smarter and dispatch subscriptions only to concerned agents. The basic logging functionality works. Fixes moby#1332 Signed-off-by: Andrea Luzzardi <[email protected]>
The broker now keeps track of all active subscriptions. When a new agent joins, it will receive all actives before receiving new subscriptions. Signed-off-by: Andrea Luzzardi <[email protected]>
Signed-off-by: Andrea Luzzardi <[email protected]>
Signed-off-by: Andrea Luzzardi <[email protected]>
Signed-off-by: Andrea Luzzardi <[email protected]>
Signed-off-by: Andrea Luzzardi <[email protected]>
Signed-off-by: Andrea Luzzardi <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This change adds support for log management in the manager, agent and
CLI.
The log broker is currently naive and broadcasts subscriptions to all
agents, which in turn need to perform filtering to figure out if the
subscription is relevant to them.
In the future, the broker should be smarter and dispatch subscriptions
only to concerned agents.
The basic logging functionality works.
Fixes #1332
/cc @stevvooe @aaronlehmann @diogomonica