Skip to content

Commit

Permalink
SUNRPC: Close a race in __rpc_wait_for_completion_task()
Browse files Browse the repository at this point in the history
Although they run as rpciod background tasks, under normal operation
(i.e. no SIGKILL), functions like nfs_sillyrename(), nfs4_proc_unlck()
and nfs4_do_close() want to be fully synchronous. This means that when we
exit, we want all references to the rpc_task to be gone, and we want
any dentry references etc. held by that task to be released.

For this reason these functions call __rpc_wait_for_completion_task(),
followed by rpc_put_task() in the expectation that the latter will be
releasing the last reference to the rpc_task, and thus ensuring that the
callback_ops->rpc_release() has been called synchronously.

This patch fixes a race which exists due to the fact that
rpciod calls rpc_complete_task() (in order to wake up the callers of
__rpc_wait_for_completion_task()) and then subsequently calls
rpc_put_task() without ensuring that these two steps are done atomically.

In order to avoid adding new spin locks, the patch uses the existing
waitqueue spin lock to order the rpc_task reference count releases between
the waiting process and rpciod.
The common case where nobody is waiting for completion is optimised for by
checking if the RPC_TASK_ASYNC flag is cleared and/or if the rpc_task
reference count is 1: in those cases we drop trying to grab the spin lock,
and immediately free up the rpc_task.

Those few processes that need to put the rpc_task from inside an
asynchronous context and that do not care about ordering are given a new
helper: rpc_put_task_async().

Signed-off-by: Trond Myklebust <[email protected]>
  • Loading branch information
Trond Myklebust authored and Trond Myklebust committed Mar 10, 2011
1 parent 214d93b commit bf294b4
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 17 deletions.
4 changes: 2 additions & 2 deletions fs/nfs/nfs4proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -4150,7 +4150,7 @@ static void nfs4_lock_release(void *calldata)
task = nfs4_do_unlck(&data->fl, data->ctx, data->lsp,
data->arg.lock_seqid);
if (!IS_ERR(task))
rpc_put_task(task);
rpc_put_task_async(task);
dprintk("%s: cancelling lock!\n", __func__);
} else
nfs_free_seqid(data->arg.lock_seqid);
Expand Down Expand Up @@ -5227,7 +5227,7 @@ static int nfs41_proc_async_sequence(struct nfs_client *clp, struct rpc_cred *cr
if (IS_ERR(task))
ret = PTR_ERR(task);
else
rpc_put_task(task);
rpc_put_task_async(task);
dprintk("<-- %s status=%d\n", __func__, ret);
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion fs/nfs/unlink.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ static int nfs_do_call_unlink(struct dentry *parent, struct inode *dir, struct n
task_setup_data.rpc_client = NFS_CLIENT(dir);
task = rpc_run_task(&task_setup_data);
if (!IS_ERR(task))
rpc_put_task(task);
rpc_put_task_async(task);
return 1;
}

Expand Down
1 change: 1 addition & 0 deletions include/linux/sunrpc/sched.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ struct rpc_task *rpc_run_task(const struct rpc_task_setup *);
struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
const struct rpc_call_ops *ops);
void rpc_put_task(struct rpc_task *);
void rpc_put_task_async(struct rpc_task *);
void rpc_exit_task(struct rpc_task *);
void rpc_exit(struct rpc_task *, int);
void rpc_release_calldata(const struct rpc_call_ops *, void *);
Expand Down
1 change: 1 addition & 0 deletions kernel/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -4213,6 +4213,7 @@ void __wake_up_locked_key(wait_queue_head_t *q, unsigned int mode, void *key)
{
__wake_up_common(q, mode, 1, 0, key);
}
EXPORT_SYMBOL_GPL(__wake_up_locked_key);

/**
* __wake_up_sync_key - wake up threads blocked on a waitqueue.
Expand Down
75 changes: 61 additions & 14 deletions net/sunrpc/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,23 +252,37 @@ static void rpc_set_active(struct rpc_task *task)

/*
* Mark an RPC call as having completed by clearing the 'active' bit
* and then waking up all tasks that were sleeping.
*/
static void rpc_mark_complete_task(struct rpc_task *task)
static int rpc_complete_task(struct rpc_task *task)
{
smp_mb__before_clear_bit();
void *m = &task->tk_runstate;
wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
unsigned long flags;
int ret;

spin_lock_irqsave(&wq->lock, flags);
clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
smp_mb__after_clear_bit();
wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
ret = atomic_dec_and_test(&task->tk_count);
if (waitqueue_active(wq))
__wake_up_locked_key(wq, TASK_NORMAL, &k);
spin_unlock_irqrestore(&wq->lock, flags);
return ret;
}

/*
* Allow callers to wait for completion of an RPC call
*
* Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
* to enforce taking of the wq->lock and hence avoid races with
* rpc_complete_task().
*/
int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
{
if (action == NULL)
action = rpc_wait_bit_killable;
return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
action, TASK_KILLABLE);
}
EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
Expand Down Expand Up @@ -857,34 +871,67 @@ static void rpc_async_release(struct work_struct *work)
rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
}

void rpc_put_task(struct rpc_task *task)
static void rpc_release_resources_task(struct rpc_task *task)
{
if (!atomic_dec_and_test(&task->tk_count))
return;
/* Release resources */
if (task->tk_rqstp)
xprt_release(task);
if (task->tk_msg.rpc_cred)
put_rpccred(task->tk_msg.rpc_cred);
rpc_task_release_client(task);
if (task->tk_workqueue != NULL) {
}

static void rpc_final_put_task(struct rpc_task *task,
struct workqueue_struct *q)
{
if (q != NULL) {
INIT_WORK(&task->u.tk_work, rpc_async_release);
queue_work(task->tk_workqueue, &task->u.tk_work);
queue_work(q, &task->u.tk_work);
} else
rpc_free_task(task);
}

static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
{
if (atomic_dec_and_test(&task->tk_count)) {
rpc_release_resources_task(task);
rpc_final_put_task(task, q);
}
}

void rpc_put_task(struct rpc_task *task)
{
rpc_do_put_task(task, NULL);
}
EXPORT_SYMBOL_GPL(rpc_put_task);

void rpc_put_task_async(struct rpc_task *task)
{
rpc_do_put_task(task, task->tk_workqueue);
}
EXPORT_SYMBOL_GPL(rpc_put_task_async);

static void rpc_release_task(struct rpc_task *task)
{
dprintk("RPC: %5u release task\n", task->tk_pid);

BUG_ON (RPC_IS_QUEUED(task));

/* Wake up anyone who is waiting for task completion */
rpc_mark_complete_task(task);
rpc_release_resources_task(task);

rpc_put_task(task);
/*
* Note: at this point we have been removed from rpc_clnt->cl_tasks,
* so it should be safe to use task->tk_count as a test for whether
* or not any other processes still hold references to our rpc_task.
*/
if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
/* Wake up anyone who may be waiting for task completion */
if (!rpc_complete_task(task))
return;
} else {
if (!atomic_dec_and_test(&task->tk_count))
return;
}
rpc_final_put_task(task, task->tk_workqueue);
}

int rpciod_up(void)
Expand Down

0 comments on commit bf294b4

Please sign in to comment.