Skip to content

Commit

Permalink
msgr: discard the local_pipe's queue on shutdown.
Browse files Browse the repository at this point in the history
To facilitate this, we do two things:
1) actually identify the number of special code values we pass around
2) use that to prevent trying to put() those non-pointer values in
Pipe::discard_queue().
Then we just call local_pipe.discard_queue() in wait() like happens
(indirectly, via reaping) with all the normal Pipes in rank_pipe.

But this does make me think that we may be approaching the point
where it's appropriate to create a subclass LocalPipe (against a
RemotePipe like our current Pipe implementation is mostly intended
to be).

Should fix ceph#2086.

Signed-off-by: Greg Farnum <[email protected]>
Reviewed-by: Sage Weil <[email protected]>
  • Loading branch information
gregsfortytwo authored and liewegas committed Feb 29, 2012
1 parent 7690f0b commit 2437ce0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
17 changes: 14 additions & 3 deletions src/msg/SimpleMessenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,9 +602,7 @@ void SimpleMessenger::Pipe::queue_received(Message *m, int priority)

halt:
// don't want to put local-delivery signals
// this magic number should be larger than
// the size of the D_CONNECT et al enum
if (m>(void *)5) {
if (m>(void *)DispatchQueue::D_NUM_CODES) {
msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
m->put();
}
Expand Down Expand Up @@ -1425,18 +1423,27 @@ void SimpleMessenger::Pipe::discard_queue()
q.qlen.sub(in_qlen);

for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++) {
if (*p < (void *) DispatchQueue::D_NUM_CODES) {
continue; // skip non-Message dispatch codes
}
ldout(msgr->cct,20) << " discard " << *p << dendl;
(*p)->put();
}
sent.clear();
for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); p++)
for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++) {
if (*r < (void *) DispatchQueue::D_NUM_CODES) {
continue; // skip non-Message dispatch codes
}
ldout(msgr->cct,20) << " discard " << *r << dendl;
(*r)->put();
}
out_q.clear();
for (map<int,list<Message*> >::iterator p = in_q.begin(); p != in_q.end(); p++)
for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++) {
if (*r < (void *) DispatchQueue::D_NUM_CODES) {
continue; // skip non-Message dispatch codes
}
msgr->dispatch_throttle_release((*r)->get_dispatch_throttle_size());
ldout(msgr->cct,20) << " discard " << *r << dendl;
(*r)->put();
Expand Down Expand Up @@ -2710,6 +2717,10 @@ void SimpleMessenger::wait()
reaper_cond.Wait(lock);
reaper();
}

dispatch_queue.local_pipe->pipe_lock.Lock();
dispatch_queue.local_pipe->discard_queue();
dispatch_queue.local_pipe->pipe_lock.Unlock();
}
lock.Unlock();

Expand Down
4 changes: 2 additions & 2 deletions src/msg/SimpleMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class SimpleMessenger : public Messenger {
map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
atomic_t qlen;

enum { D_CONNECT, D_BAD_REMOTE_RESET, D_BAD_RESET };
enum { D_CONNECT = 0, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
list<Connection*> connect_q;
list<Connection*> remote_reset_q;
list<Connection*> reset_q;
Expand Down Expand Up @@ -362,7 +362,7 @@ class SimpleMessenger : public Messenger {
lock.Unlock();
local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
}

DispatchQueue() :
lock("SimpleMessenger::DispatchQeueu::lock"),
stop(false),
Expand Down

0 comments on commit 2437ce0

Please sign in to comment.