Skip to content

Commit

Permalink
Merge branch 'mds' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
liewegas committed Mar 30, 2010
2 parents 3ab157f + 47286cb commit 9b552c7
Show file tree
Hide file tree
Showing 18 changed files with 332 additions and 159 deletions.
6 changes: 2 additions & 4 deletions src/TODO
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ filestore
- need an osdmap cache layer?

bugs
- mds states
- closing -> opening transition
- mds prepare_force_open_sessions, then import aborts.. session is still OPENING but no client_session is sent...
- mds rejoin: invented dirfrags (MDCache.cc:3469) have version=0; subsequent modification of dentries/inodes predirty a bad (small) version #.
- rm -r failure (on kernel tree)
- dbench 1, restart mds (may take a few times), dbench will error out.

Expand Down Expand Up @@ -542,4 +540,4 @@ radosgw


-- for nicer kclient debug output (everything but messenger, but including msg in/out)
echo 'module ceph +p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file fs/ceph/messenger.c -p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- --- /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- === /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control
echo 'module ceph +p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file fs/ceph/messenger.c -p' > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- --- /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control ; echo 'file ' `grep -- === /sys/kernel/debug/dynamic_debug/control | grep ceph | awk '{print $1}' | sed 's/:/ line /'` +p > /sys/kernel/debug/dynamic_debug/control
53 changes: 50 additions & 3 deletions src/mds/CInode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1530,8 +1530,8 @@ void CInode::open_snaprealm(bool nosplit)
if (parent) {
dout(10) << "open_snaprealm " << snaprealm
<< " parent is " << parent
<< " siblings are " << parent->open_children
<< dendl;
dout(30) << " siblings are " << parent->open_children << dendl;
snaprealm->parent = parent;
if (!nosplit &&
is_dir() &&
Expand Down Expand Up @@ -1920,7 +1920,7 @@ void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<Context*>& waite

void CInode::encode_export(bufferlist& bl)
{
__u8 struct_v = 1;
__u8 struct_v = 2;
::encode(struct_v, bl);
_encode_base(bl);

Expand All @@ -1931,6 +1931,24 @@ void CInode::encode_export(bufferlist& bl)

::encode(replica_map, bl);

// include scatterlock info for any bounding CDirs
bufferlist bounding;
if (inode.is_dir())
for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
p != dirfrags.end();
++p) {
CDir *dir = p->second;
if (dir->state_test(CDir::STATE_EXPORTBOUND)) {
::encode(p->first, bounding);
::encode(dir->fnode.fragstat, bounding);
::encode(dir->fnode.accounted_fragstat, bounding);
::encode(dir->fnode.rstat, bounding);
::encode(dir->fnode.accounted_rstat, bounding);
dout(10) << " encoded fragstat/rstat info for " << *dir << dendl;
}
}
::encode(bounding, bl);

_encode_locks_full(bl);
get(PIN_TEMPEXPORTING);
}
Expand Down Expand Up @@ -1963,7 +1981,36 @@ void CInode::decode_import(bufferlist::iterator& p,
::decode(pop, p);

::decode(replica_map, p);
if (!replica_map.empty()) get(PIN_REPLICATED);
if (!replica_map.empty())
get(PIN_REPLICATED);

if (struct_v >= 2) {
// decode fragstat info on bounding cdirs
bufferlist bounding;
::decode(bounding, p);
bufferlist::iterator q = bounding.begin();
while (!q.end()) {
frag_t fg;
::decode(fg, q);
CDir *dir = get_dirfrag(fg);
assert(dir); // we should have all bounds open
if (!dir->is_auth()) {
::decode(dir->fnode.fragstat, q);
::decode(dir->fnode.accounted_fragstat, q);
::decode(dir->fnode.rstat, q);
::decode(dir->fnode.accounted_rstat, q);
dout(10) << " took fragstat/rstat info for " << *dir << dendl;
} else {
dout(10) << " skipped fragstat/rstat info for " << *dir << dendl;
frag_info_t f;
nest_info_t n;
::decode(f, q);
::decode(f, q);
::decode(n, q);
::decode(n, q);
}
}
}

_decode_locks_full(p);
}
67 changes: 52 additions & 15 deletions src/mds/Locker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ bool Locker::acquire_locks(MDRequest *mdr,
sorted.insert(*p);
if ((*p)->get_parent()->is_auth())
mustpin.insert(*p);
else if ((*p)->get_type() == CEPH_LOCK_IFILE &&
!(*p)->get_parent()->is_auth() && !(*p)->can_rdlock(client)) { // we might have to request an rdlock
dout(15) << " will also auth_pin " << *(*p)->get_parent() << " in case we need to request a rdlock" << dendl;
mustpin.insert(*p);
}
}


Expand Down Expand Up @@ -680,16 +685,24 @@ void Locker::eval(SimpleLock *lock, bool *need_issue)
bool Locker::_rdlock_kick(SimpleLock *lock)
{
// kick the lock
if (lock->is_stable() &&
lock->get_parent()->is_auth()) {
if (lock->get_sm() == &sm_scatterlock) {
if (lock->get_parent()->is_replicated())
scatter_tempsync((ScatterLock*)lock);
else
if (lock->is_stable()) {
if (lock->get_parent()->is_auth()) {
if (lock->get_sm() == &sm_scatterlock) {
if (lock->get_parent()->is_replicated())
scatter_tempsync((ScatterLock*)lock);
else
simple_sync(lock);
} else
simple_sync(lock);
} else
simple_sync(lock);
return true;
return true;
} else {
// request rdlock state change from auth
int auth = lock->get_parent()->authority().first;
dout(10) << "requesting rdlock from auth on "
<< *lock << " on " << *lock->get_parent() << dendl;
mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
return false;
}
}
return false;
}
Expand All @@ -709,7 +722,8 @@ bool Locker::rdlock_try(SimpleLock *lock, client_t client, Context *con)

// wait!
dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
if (con) lock->add_waiter(SimpleLock::WAIT_RD, con);
if (con)
lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
return false;
}

Expand Down Expand Up @@ -740,8 +754,13 @@ bool Locker::rdlock_start(SimpleLock *lock, MDRequest *mut)
}

// wait!
int wait_on;
if (lock->get_parent()->is_auth() && lock->is_stable())
wait_on = SimpleLock::WAIT_RD;
else
wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
lock->add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mut));
lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
nudge_log(lock);
return false;
}
Expand Down Expand Up @@ -1482,6 +1501,11 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
<< " update_size " << update_size
<< " on " << *in << dendl;

if (in->is_frozen()) {
dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDL_CheckMaxSize(this, in));
return false;
}
if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
// lock?
if (in->filelock.is_stable()) {
Expand Down Expand Up @@ -3377,17 +3401,18 @@ void Locker::file_recover(ScatterLock *lock)
assert(in->is_auth());
assert(lock->is_stable());

int oldstate = lock->get_state();
lock->set_state(LOCK_PRE_SCAN);
assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()

int gather = 0;

if (in->is_replicated() &&
/*
if (in->is_replicated()
lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
send_lock_message(lock, LOCK_AC_LOCK);
lock->init_gather();
gather++;
}
*/
if (in->issued_caps_need_gather(lock)) {
issue_caps(in);
gather++;
Expand Down Expand Up @@ -3560,6 +3585,19 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
}
break;

case LOCK_AC_REQRDLOCK:
if (lock->is_stable()) {
dout(7) << "handle_file_lock got rdlock request on " << *lock
<< " on " << *lock->get_parent() << dendl;
assert(in->is_auth()); // replica auth pinned if they're doing this!
simple_sync(lock);
} else {
dout(7) << "handle_file_lock ignoring rdlock request on " << *lock
<< " on " << *lock->get_parent() << dendl;
// replica will retry.
}
break;

case LOCK_AC_NUDGE:
if (lock->get_parent()->is_auth()) {
dout(7) << "handle_file_lock trying nudge on " << *lock
Expand All @@ -3572,7 +3610,6 @@ void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
}
break;


default:
assert(0);
}
Expand Down
37 changes: 27 additions & 10 deletions src/mds/MDCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3460,12 +3460,14 @@ void MDCache::handle_cache_rejoin_strong(MMDSCacheRejoin *strong)
CDir *dir = get_dirfrag(p->first);
if (!dir) {
CInode *in = get_inode(p->first.ino);
if (!in) in = rejoin_invent_inode(p->first.ino, CEPH_NOSNAP);
if (!in)
in = rejoin_invent_inode(p->first.ino, CEPH_NOSNAP);
if (!in->is_dir()) {
assert(in->state_test(CInode::STATE_REJOINUNDEF));
in->inode.mode = S_IFDIR;
}
dir = in->get_or_open_dirfrag(this, p->first.frag);
dout(10) << " invented " << *dir << dendl;
} else {
dout(10) << " have " << *dir << dendl;
}
Expand Down Expand Up @@ -3889,8 +3891,11 @@ void MDCache::rejoin_gather_finish()

process_imported_caps();
process_reconnected_caps();
identify_files_to_recover();

vector<CInode*> recover_q, check_q;
identify_files_to_recover(recover_q, check_q);
rejoin_send_acks();
start_files_to_recover(recover_q, check_q);

// signal completion of fetches, rejoin_gather_finish, etc.
assert(rejoin_ack_gather.count(mds->whoami));
Expand Down Expand Up @@ -4444,10 +4449,9 @@ void MDCache::unqueue_file_recover(CInode *in)
* called after recovery to recover file sizes for previously opened (for write)
* files. that is, those where max_size > size.
*/
void MDCache::identify_files_to_recover()
void MDCache::identify_files_to_recover(vector<CInode*>& recover_q, vector<CInode*>& check_q)
{
dout(10) << "identify_files_to_recover" << dendl;
vector<CInode*> q; // put inodes in list first: queue_file_discover modifies inode_map
for (hash_map<vinodeno_t,CInode*>::iterator p = inode_map.begin();
p != inode_map.end();
++p) {
Expand All @@ -4467,13 +4471,26 @@ void MDCache::identify_files_to_recover()
}
}

if (recover)
q.push_back(in);
else
mds->locker->check_inode_max_size(in);
if (recover) {
in->filelock.set_state(LOCK_PRE_SCAN);
recover_q.push_back(in);
} else {
check_q.push_back(in);
}
}
}

void MDCache::start_files_to_recover(vector<CInode*>& recover_q, vector<CInode*>& check_q)
{
for (vector<CInode*>::iterator p = check_q.begin(); p != check_q.end(); p++) {
CInode *in = *p;
in->filelock.set_state(LOCK_LOCK);
mds->locker->check_inode_max_size(in);
}
for (vector<CInode*>::iterator p = recover_q.begin(); p != recover_q.end(); p++) {
CInode *in = *p;
mds->locker->file_recover(&in->filelock);
}
for (vector<CInode*>::iterator p = q.begin(); p != q.end(); p++)
mds->locker->file_recover(&(*p)->filelock);
}

struct C_MDC_Recover : public Context {
Expand Down
5 changes: 4 additions & 1 deletion src/mds/MDCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ struct MDRequest : public Mutation {
bool was_link_merge;

map<client_t,entity_inst_t> imported_client_map;
map<client_t,__u64> sseq_map;
map<CInode*, map<client_t,Capability::Export> > cap_imports;

// for snaps
Expand Down Expand Up @@ -784,7 +785,9 @@ class MDCache {
void unqueue_file_recover(CInode *in);
void _queued_file_recover_cow(CInode *in, Mutation *mut);
void _queue_file_recover(CInode *in);
void identify_files_to_recover();
void identify_files_to_recover(vector<CInode*>& recover_q, vector<CInode*>& check_q);
void start_files_to_recover(vector<CInode*>& recover_q, vector<CInode*>& check_q);

void do_file_recover();
void _recovered(CInode *in, int r, __u64 size, utime_t mtime);

Expand Down
27 changes: 15 additions & 12 deletions src/mds/MDS.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,12 @@ void MDS::handle_command(MMonCommand *m)
mdcache->dump_cache();
}
else if (m->cmd[0] == "session" && m->cmd[1] == "kill") {
Session *session = sessionmap.
get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
strtol(m->cmd[2].c_str(), 0, 10)));
if (session) {
dout(20) << "killing session " << session << dendl;
server->end_session(session);
} else dout(15) << "session " << session << " not in sessionmap!" << dendl;
Session *session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
strtol(m->cmd[2].c_str(), 0, 10)));
if (session)
server->kill_session(session);
else
dout(15) << "session " << session << " not in sessionmap!" << dendl;
} else if (m->cmd[0] == "issue_caps") {
long inum = strtol(m->cmd[1].c_str(), 0, 10);
CInode * ino = mdcache->get_inode(inodeno_t(inum));
Expand Down Expand Up @@ -1559,12 +1558,15 @@ bool MDS::ms_handle_reset(Connection *con)
objecter->ms_handle_reset(con);
} else if (con->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT) {
Session *session = (Session *)con->get_priv();
if (!session || session->is_closed() || session->is_new())
messenger->mark_down(con->get_peer_addr());
if (session->is_new())
sessionmap.remove_session(session);
if (session)
if (session) {
if (session->is_closed()) {
messenger->mark_down(con->get_peer_addr());
sessionmap.remove_session(session);
}
session->put();
} else {
messenger->mark_down(con->get_peer_addr());
}
}
return false;
}
Expand Down Expand Up @@ -1597,6 +1599,7 @@ bool MDS::ms_verify_authorizer(Connection *con, int peer_type,
authorizer_data, authorizer_reply, name, global_id, caps_info);

if (is_valid) {
// wire up a Session* to this connection, and add it to the session map
entity_name_t n(con->get_peer_type(), global_id);
Session *s = sessionmap.get_session(n);
if (!s) {
Expand Down
8 changes: 5 additions & 3 deletions src/mds/MDSTableClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ void MDSTableClient::handle_request(class MMDSTableRequest *m)
break;

case TABLESERVER_OP_ACK:
if (pending_commit.count(tid)) {
if (pending_commit.count(tid) &&
pending_commit[tid]->pending_commit_tids[table].count(tid)) {
dout(10) << "got ack on tid " << tid << ", logging" << dendl;

assert(g_conf.mds_kill_mdstable_at != 7);
Expand Down Expand Up @@ -175,9 +176,10 @@ void MDSTableClient::got_journaled_agree(version_t tid, LogSegment *ls)
void MDSTableClient::got_journaled_ack(version_t tid)
{
dout(10) << "got_journaled_ack " << tid << dendl;
if (pending_commit.count(tid))
if (pending_commit.count(tid)) {
pending_commit[tid]->pending_commit_tids[table].erase(tid);
pending_commit.erase(tid);
pending_commit.erase(tid);
}
}

void MDSTableClient::finish_recovery()
Expand Down
Loading

0 comments on commit 9b552c7

Please sign in to comment.