diff --git a/fs/direct-io.c b/fs/direct-io.c index b296942ff7d586..bc1cbf9149f7c2 100644 --- a/fs/direct-io.c +++ b/fs/direct-io.c @@ -121,9 +121,8 @@ struct dio { int page_errors; /* errno from get_user_pages() */ /* BIO completion state */ + atomic_t refcount; /* direct_io_worker() and bios */ spinlock_t bio_lock; /* protects BIO fields below */ - int bio_count; /* nr bios to be completed */ - int bios_in_flight; /* nr bios in flight */ struct bio *bio_list; /* singly linked via bi_private */ struct task_struct *waiter; /* waiting task (NULL if none) */ @@ -256,44 +255,27 @@ static int dio_complete(struct dio *dio, loff_t offset, int ret) * Called when a BIO has been processed. If the count goes to zero then IO is * complete and we can signal this to the AIO layer. */ -static void finished_one_bio(struct dio *dio) +static void dio_complete_aio(struct dio *dio) { unsigned long flags; + int ret; - spin_lock_irqsave(&dio->bio_lock, flags); - if (dio->bio_count == 1) { - if (dio->is_async) { - int ret; - - /* - * Last reference to the dio is going away. - * Drop spinlock and complete the DIO. - */ - spin_unlock_irqrestore(&dio->bio_lock, flags); - - ret = dio_complete(dio, dio->iocb->ki_pos, 0); + ret = dio_complete(dio, dio->iocb->ki_pos, 0); - /* Complete AIO later if falling back to buffered i/o */ - if (dio->result == dio->size || - ((dio->rw == READ) && dio->result)) { - aio_complete(dio->iocb, ret, 0); - kfree(dio); - return; - } else { - /* - * Falling back to buffered - */ - spin_lock_irqsave(&dio->bio_lock, flags); - dio->bio_count--; - if (dio->waiter) - wake_up_process(dio->waiter); - spin_unlock_irqrestore(&dio->bio_lock, flags); - return; - } - } + /* Complete AIO later if falling back to buffered i/o */ + if (dio->result == dio->size || + ((dio->rw == READ) && dio->result)) { + aio_complete(dio->iocb, ret, 0); + kfree(dio); + } else { + /* + * Falling back to buffered + */ + spin_lock_irqsave(&dio->bio_lock, flags); + if (dio->waiter) + wake_up_process(dio->waiter); + spin_unlock_irqrestore(&dio->bio_lock, flags); } - dio->bio_count--; - spin_unlock_irqrestore(&dio->bio_lock, flags); } static int dio_bio_complete(struct dio *dio, struct bio *bio); @@ -309,6 +291,10 @@ static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error) /* cleanup the bio */ dio_bio_complete(dio, bio); + + if (atomic_dec_and_test(&dio->refcount)) + dio_complete_aio(dio); + return 0; } @@ -330,8 +316,7 @@ static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error) spin_lock_irqsave(&dio->bio_lock, flags); bio->bi_private = dio->bio_list; dio->bio_list = bio; - dio->bios_in_flight--; - if (dio->waiter && dio->bios_in_flight == 0) + if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter) wake_up_process(dio->waiter); spin_unlock_irqrestore(&dio->bio_lock, flags); return 0; @@ -362,17 +347,15 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev, * In the AIO read case we speculatively dirty the pages before starting IO. * During IO completion, any of these pages which happen to have been written * back will be redirtied by bio_check_pages_dirty(). + * + * bios hold a dio reference between submit_bio and ->end_io. */ static void dio_bio_submit(struct dio *dio) { struct bio *bio = dio->bio; - unsigned long flags; bio->bi_private = dio; - spin_lock_irqsave(&dio->bio_lock, flags); - dio->bio_count++; - dio->bios_in_flight++; - spin_unlock_irqrestore(&dio->bio_lock, flags); + atomic_inc(&dio->refcount); if (dio->is_async && dio->rw == READ) bio_set_pages_dirty(bio); submit_bio(dio->rw, bio); @@ -390,18 +373,28 @@ static void dio_cleanup(struct dio *dio) page_cache_release(dio_get_page(dio)); } +static int wait_for_more_bios(struct dio *dio) +{ + assert_spin_locked(&dio->bio_lock); + + return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL); +} + /* - * Wait for the next BIO to complete. Remove it and return it. + * Wait for the next BIO to complete. Remove it and return it. NULL is + * returned once all BIOs have been completed. This must only be called once + * all bios have been issued so that dio->refcount can only decrease. This + * requires that that the caller hold a reference on the dio. */ static struct bio *dio_await_one(struct dio *dio) { unsigned long flags; - struct bio *bio; + struct bio *bio = NULL; spin_lock_irqsave(&dio->bio_lock, flags); - while (dio->bio_list == NULL) { + while (wait_for_more_bios(dio)) { set_current_state(TASK_UNINTERRUPTIBLE); - if (dio->bio_list == NULL) { + if (wait_for_more_bios(dio)) { dio->waiter = current; spin_unlock_irqrestore(&dio->bio_lock, flags); io_schedule(); @@ -410,8 +403,10 @@ static struct bio *dio_await_one(struct dio *dio) } set_current_state(TASK_RUNNING); } - bio = dio->bio_list; - dio->bio_list = bio->bi_private; + if (dio->bio_list) { + bio = dio->bio_list; + dio->bio_list = bio->bi_private; + } spin_unlock_irqrestore(&dio->bio_lock, flags); return bio; } @@ -440,25 +435,24 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio) } bio_put(bio); } - finished_one_bio(dio); return uptodate ? 0 : -EIO; } /* - * Wait on and process all in-flight BIOs. + * Wait on and process all in-flight BIOs. This must only be called once + * all bios have been issued so that the refcount can only decrease. + * This just waits for all bios to make it through dio_bio_complete. IO + * errors are propogated through dio->io_error and should be propogated via + * dio_complete(). */ static void dio_await_completion(struct dio *dio) { - /* - * The bio_lock is not held for the read of bio_count. - * This is ok since it is the dio_bio_complete() that changes - * bio_count. - */ - while (dio->bio_count) { - struct bio *bio = dio_await_one(dio); - /* io errors are propogated through dio->io_error */ - dio_bio_complete(dio, bio); - } + struct bio *bio; + do { + bio = dio_await_one(dio); + if (bio) + dio_bio_complete(dio, bio); + } while (bio); } /* @@ -995,16 +989,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, dio->iocb = iocb; dio->i_size = i_size_read(inode); - /* - * BIO completion state. - * - * ->bio_count starts out at one, and we decrement it to zero after all - * BIOs are submitted. This to avoid the situation where a really fast - * (or synchronous) device could take the count to zero while we're - * still submitting BIOs. - */ - dio->bio_count = 1; - dio->bios_in_flight = 0; + atomic_set(&dio->refcount, 1); spin_lock_init(&dio->bio_lock); dio->bio_list = NULL; dio->waiter = NULL; @@ -1111,7 +1096,11 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, } if (ret == 0) ret = dio->result; - finished_one_bio(dio); /* This can free the dio */ + + /* this can free the dio */ + if (atomic_dec_and_test(&dio->refcount)) + dio_complete_aio(dio); + if (should_wait) { unsigned long flags; /* @@ -1122,7 +1111,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, spin_lock_irqsave(&dio->bio_lock, flags); set_current_state(TASK_UNINTERRUPTIBLE); - while (dio->bio_count) { + while (atomic_read(&dio->refcount)) { spin_unlock_irqrestore(&dio->bio_lock, flags); io_schedule(); spin_lock_irqsave(&dio->bio_lock, flags); @@ -1133,7 +1122,6 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, kfree(dio); } } else { - finished_one_bio(dio); dio_await_completion(dio); ret = dio_complete(dio, offset, ret); @@ -1146,7 +1134,11 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, * i/o, we have to mark the the aio complete. */ aio_complete(iocb, ret, 0); - kfree(dio); + + if (atomic_dec_and_test(&dio->refcount)) + kfree(dio); + else + BUG(); } return ret; }